Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
794 views
in Technique[技术] by (71.8m points)

python - google dataflow read from spanner

I am trying to read a table from a Google spanner database, and write it to a text file to do a backup, using google dataflow with the python sdk. I have written the following script:

    from __future__ import absolute_import

import argparse
import itertools
import logging
import re
import time
import datetime as dt
import logging

import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io import WriteToText
from apache_beam.io.range_trackers import OffsetRangeTracker, UnsplittableRangeTracker
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions

from google.cloud.spanner.client import Client
from google.cloud.spanner.keyset import KeySet

BUCKET_URL = 'gs://my_bucket'
OUTPUT = '%s/output/' % BUCKET_URL
PROJECT_ID = 'my_project'
INSTANCE_ID = 'my_instance'
DATABASE_ID = 'my_db'
JOB_NAME = 'spanner-backup'
TABLE = 'my_table'


class SpannerSource(iobase.BoundedSource):
    def __init__(self):
    logging.info('Enter __init__')

    self.spannerOptions = {
        "id": PROJECT_ID,
        "instance": INSTANCE_ID,
        "database": DATABASE_ID
    }
    self.SpannerClient = Client

    def estimate_size(self):
    logging.info('Enter estimate_size')
    return 1

    def get_range_tracker(self, start_position=None, stop_position=None):
    logging.info('Enter get_range_tracker')
    if start_position is None:
       start_position = 0
    if stop_position is None:
       stop_position = OffsetRangeTracker.OFFSET_INFINITY

    range_tracker = OffsetRangeTracker(start_position, stop_position)
    return UnsplittableRangeTracker(range_tracker)

    def read(self, range_tracker):  # This is not called when using the dataflowRunner !
    logging.info('Enter read')
    # instantiate spanner client
    spanner_client = self.SpannerClient(self.spannerOptions["id"])
    instance = spanner_client.instance(self.spannerOptions["instance"])
    database = instance.database(self.spannerOptions["database"])

    # read from table
    table_fields = database.execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % TABLE)
    table_fields.consume_all()
    self.columns = [x[0] for x in table_fields]
    keyset = KeySet(all_=True)
    results = database.read(table=TABLE, columns=self.columns, keyset=keyset)

    # iterator over rows
    results.consume_all()
    for row in results:
        JSON_row = {
        self.columns[i]: row[i] for i in range(len(self.columns))
        }
        yield JSON_row

    def split(self, start_position=None, stop_position=None):
    # this should not be called since the source is unspittable
    logging.info('Enter split')
    if start_position is None:
        start_position = 0
    if stop_position is None:
        stop_position = 1

    # Because the source is unsplittable (for now), only a single source is returned
    yield iobase.SourceBundle(
        weight=1,
        source=self,
        start_position=start_position,
        stop_position=stop_position)


def run(argv=None):
  """Main entry point"""
  pipeline_options = PipelineOptions()
  google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
  google_cloud_options.project = PROJECT_ID
  google_cloud_options.job_name = JOB_NAME
  google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
  google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL

  #pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
  pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
  p = beam.Pipeline(options=pipeline_options)

  output = p | 'Get Rows from Spanner' >> beam.io.Read(SpannerSource())
  iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat()
  output | 'Store in GCS' >> WriteToText(file_path_prefix=OUTPUT + iso_datetime + '-' + TABLE, file_name_suffix='')  # if this line is commented, job completes but does not do anything


  result = p.run()
  result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

However, this script runs correctly only on the DirectRunner: when I let it run on the DataflowRunner, it runs for a while without any output, before exiting with an error:

"Executing failure step failure14 [...] Workflow failed. Causes: [...] The worker lost contact with the service."

Sometimes, it just goes on forever, without creating an output.

Moreover, if I comment the line 'output = ...', the job completes, but without actually reading the data.

It also appears that the dataflowRunner calls the function 'estimate_size' of the source, but not the functions 'read' or 'get_range_tracker'.

Does anyone have any ideas about what may cause this ? I know there is a (more complete) java SDK with an experimental spanner source/sink available, but if possible I'd rather stick with python.

Thanks

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...