I am trying to run a python Dataflow job which will read from pubsub subscription and write it to BQ.
The Dataflow job is getting triggered and is then goes in running state without fetching any data from a subscription. Any idea/clue, what could be going wrong? Same code is running fine on python 2.7
Python version is 3.8 &
apache-beam is 2.25.0.
Running code via Pycharm on macOS 11.2
Code (As job is not processing anything, I just kept print element in pardo function to see if it works)
import argparse
import logging
import apache_beam as beam
from apache_beam import DoFn
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
class DecodeMsgs(DoFn):
def process(self, element):
print(element)
return [element]
def run(subscription, pipeline_args=None):
pipeline_options = PipelineOptions()
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
p | beam.io.ReadFromPubSub(subscription=subscription) | beam.ParDo(DecodeMsgs())
result = p.run()
result.wait_until_finish()
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument(
'--subscription', required=True
)
known_args, pipeline_args = parser.parse_known_args()
run(known_args.subscription, pipeline_args)
Arguments passed to a job
--subscription
<<projects/PROJECT/subscriptions/SUBSCRIPTION>>
--runner
DataflowRunner
--project
PROJECT
--temp_location
gs://temp_location
--staging_location
gs://staging_location
--subnetwork
<<regions/REGION/subnetworks/SUBNETWORK>>
--service_account_email
<<SERVICE_ACCOUNT_EMAIL>>
Here is python dependency list
apache-beam==2.25.0
apitools==0.1.4
avro-python3==1.9.2.1
bcrypt==3.2.0
bigquery==0.0.8
BigQuery-Python==1.15.0
cachetools==4.2.1
certifi==2020.12.5
cffi==1.14.4
chardet==4.0.0
crcmod==1.7
cryptography==3.3.1
dacktool==0.0.7
dbstream==0.0.30
dill==0.3.1.1
docopt==0.6.2
fastavro==1.3.0
fasteners==0.16
flatten-json==0.1.7
future==0.18.2
google-api-core==1.25.1
google-api-python-client==1.7.11
google-apitools==0.5.31
google-auth==1.24.0
google-auth-httplib2==0.0.3
google-auth-oauthlib==0.4.2
google-cloud==0.34.0
google-cloud-bigquery==1.28.0
google-cloud-bigquery-storage==2.2.1
google-cloud-bigtable==1.6.1
google-cloud-build==2.0.0
google-cloud-core==1.5.0
google-cloud-datastore==1.15.3
google-cloud-dlp==1.0.0
google-cloud-language==1.3.0
google-cloud-pubsub==1.7.0
google-cloud-spanner==1.19.1
google-cloud-videointelligence==1.16.1
google-cloud-vision==1.0.0
google-crc32c==1.1.2
google-resumable-media==1.2.0
googleapis-common-protos==1.52.0
googleauthentication==0.0.14
grpc-google-iam-v1==0.12.3
grpcio==1.35.0
grpcio-gcp==0.2.2
hdfs==2.5.8
httplib2==0.17.4
idna==2.10
json-flatten==0.1
libcst==0.3.16
mock==2.0.0
mypy-extensions==0.4.3
numpy==1.19.5
oauth2client==4.1.3
oauthlib==3.1.0
pandas==1.2.1
paramiko==2.7.2
pbr==5.5.1
proto-plus==1.13.0
protobuf==3.14.0
pyarrow==0.17.1
pyasn1==0.4.8
pyasn1-modules==0.2.8
pycparser==2.20
pydot==1.4.1
pymongo==3.11.2
PyNaCl==1.4.0
pyparsing==2.4.7
python-dateutil==2.8.1
pytz==2020.5
PyYAML==5.4.1
requests==2.25.1
requests-oauthlib==1.3.0
rsa==4.7
six==1.15.0
sshtunnel==0.1.5
typing-extensions==3.7.4.3
typing-inspect==0.6.0
uritemplate==3.0.1
urllib3==1.26.3
question from:
https://stackoverflow.com/questions/66062935/dataflow-job-is-in-running-state-but-do-not-process-any-element-from-a-pubsub-su 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…