Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
320 views
in Technique[技术] by (71.8m points)

apache beam - Dataflow job is in running state but do not process any element from a pubsub subscription

I am trying to run a python Dataflow job which will read from pubsub subscription and write it to BQ.

The Dataflow job is getting triggered and is then goes in running state without fetching any data from a subscription. Any idea/clue, what could be going wrong? Same code is running fine on python 2.7

Python version is 3.8 & apache-beam is 2.25.0. Running code via Pycharm on macOS 11.2

Code (As job is not processing anything, I just kept print element in pardo function to see if it works)

import argparse
import logging

import apache_beam as beam
from apache_beam import DoFn
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

class DecodeMsgs(DoFn):
    def process(self, element):
        print(element)
        return [element]

def run(subscription, pipeline_args=None):
    pipeline_options = PipelineOptions()
    pipeline_options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=pipeline_options)
    p | beam.io.ReadFromPubSub(subscription=subscription) | beam.ParDo(DecodeMsgs())
    result = p.run()
    result.wait_until_finish()


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--subscription', required=True
    )

    known_args, pipeline_args = parser.parse_known_args()
    run(known_args.subscription, pipeline_args)

Arguments passed to a job

--subscription
<<projects/PROJECT/subscriptions/SUBSCRIPTION>>
--runner
DataflowRunner
--project
PROJECT
--temp_location
gs://temp_location
--staging_location
gs://staging_location
--subnetwork
<<regions/REGION/subnetworks/SUBNETWORK>>
--service_account_email
<<SERVICE_ACCOUNT_EMAIL>>

Here is python dependency list

apache-beam==2.25.0
apitools==0.1.4
avro-python3==1.9.2.1
bcrypt==3.2.0
bigquery==0.0.8
BigQuery-Python==1.15.0
cachetools==4.2.1
certifi==2020.12.5
cffi==1.14.4
chardet==4.0.0
crcmod==1.7
cryptography==3.3.1
dacktool==0.0.7
dbstream==0.0.30
dill==0.3.1.1
docopt==0.6.2
fastavro==1.3.0
fasteners==0.16
flatten-json==0.1.7
future==0.18.2
google-api-core==1.25.1
google-api-python-client==1.7.11
google-apitools==0.5.31
google-auth==1.24.0
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.2
google-cloud==0.34.0
google-cloud-bigquery==1.28.0
google-cloud-bigquery-storage==2.2.1
google-cloud-bigtable==1.6.1
google-cloud-build==2.0.0
google-cloud-core==1.5.0
google-cloud-datastore==1.15.3
google-cloud-dlp==1.0.0
google-cloud-language==1.3.0
google-cloud-pubsub==1.7.0
google-cloud-spanner==1.19.1
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-crc32c==1.1.2
google-resumable-media==1.2.0
googleapis-common-protos==1.52.0
googleauthentication==0.0.14
grpc-google-iam-v1==0.12.3
grpcio==1.35.0
grpcio-gcp==0.2.2
hdfs==2.5.8
httplib2==0.17.4
idna==2.10
json-flatten==0.1
libcst==0.3.16
mock==2.0.0
mypy-extensions==0.4.3
numpy==1.19.5
oauth2client==4.1.3
oauthlib==3.1.0
pandas==1.2.1
paramiko==2.7.2
pbr==5.5.1
proto-plus==1.13.0
protobuf==3.14.0
pyarrow==0.17.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.20
pydot==1.4.1
pymongo==3.11.2
PyNaCl==1.4.0
pyparsing==2.4.7
python-dateutil==2.8.1
pytz==2020.5
PyYAML==5.4.1
requests==2.25.1
requests-oauthlib==1.3.0
rsa==4.7
six==1.15.0
sshtunnel==0.1.5
typing-extensions==3.7.4.3
typing-inspect==0.6.0
uritemplate==3.0.1
urllib3==1.26.3
question from:https://stackoverflow.com/questions/66062935/dataflow-job-is-in-running-state-but-do-not-process-any-element-from-a-pubsub-su

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...