Dataflow is being pre-processed by reading batch data.
The workload is read from Google Cloud Storage (GCS) to process Dataflow and upload it back to GCS.
But after processing the data, I checked the GCS.
result-001.csv
result-002.csv
result-003.csv
This is how the data is divided and stored.
Can't I combine these files into one?
#-*- coding: utf-8 -*-
import apache_beam as beam
import csv
import json
import os
import re
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
def preprocessing(fields):
fields = fields.split(",")
header = "label"
for i in range(0, 784):
header += (",pixel" + str(i))
label_list_str = "["
label_list = []
for i in range(0,10) :
if fields[0] == str(i) :
label_list_str+=str(i)
else :
label_list_str+=("0")
if i!=9 :
label_list_str+=","
label_list_str+="],"
for i in range(1,len(fields)) :
label_list_str+=fields[i]
if i!=len(fields)-1:
label_list_str+=","
yield label_list_str
def run(project, bucket, dataset) :
argv = [
"--project={0}".format(project),
"--job_name=kaggle-dataflow",
"--save_main_session",
"--region=asia-northeast1",
"--staging_location=gs://{0}/kaggle-bucket-v1/".format(bucket),
"--temp_location=gs://{0}/kaggle-bucket-v1/".format(bucket),
"--max_num_workers=8",
"--worker_region=asia-northeast3",
"--worker_disk_type=compute.googleapis.com/projects//zones//diskTypes/pd-ssd",
"--autoscaling_algorithm=THROUGHPUT_BASED",
"--runner=DataflowRunner",
"--worker_region=asia-northeast3"
]
result_output = 'gs://kaggle-bucket-v1/result/result.csv'
filename = "gs://{}/train.csv".format(bucket)
pipeline = beam.Pipeline(argv=argv)
ptransform = (pipeline
| "Read from GCS" >> beam.io.ReadFromText(filename)
| "Kaggle data pre-processing" >> beam.FlatMap(preprocessing)
)
(ptransform
| "events:out" >> beam.io.WriteToText(
result_output
)
)
pipeline.run()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Run pipeline on the cloud")
parser.add_argument("--project", dest="project", help="Unique project ID", required=True)
parser.add_argument("--bucket", dest="bucket", help="Bucket where your data were ingested", required=True)
parser.add_argument("--dataset", dest="dataset", help="BigQuery dataset")
args = vars(parser.parse_args())
print("Correcting timestamps and writing to BigQuery dataset {}".format(args["dataset"]))
run(project=args["project"], bucket=args["bucket"], dataset=args["dataset"])
Thank you for reading :)
question from:
https://stackoverflow.com/questions/65837969/write-files-to-gcs-using-dataflow 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…