Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
367 views
in Technique[技术] by (71.8m points)

google cloud platform - Write files to GCS using Dataflow

Dataflow is being pre-processed by reading batch data.

The workload is read from Google Cloud Storage (GCS) to process Dataflow and upload it back to GCS.

But after processing the data, I checked the GCS.

result-001.csv

result-002.csv

result-003.csv

This is how the data is divided and stored. Can't I combine these files into one?

#-*- coding: utf-8 -*-
import apache_beam as beam
import csv
import json
import os
import re
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

def preprocessing(fields):
    fields = fields.split(",")
    header = "label"
    for i in range(0, 784):
        header += (",pixel" + str(i))
    label_list_str = "["
    label_list = []
    for i in range(0,10) :
        if fields[0] == str(i) :
            label_list_str+=str(i)
        else :
            label_list_str+=("0")
        if i!=9 :
            label_list_str+=","
    label_list_str+="],"
    for i in range(1,len(fields)) :
        label_list_str+=fields[i]
        if i!=len(fields)-1:
            label_list_str+=","
    yield label_list_str


def run(project, bucket, dataset) :
        argv = [
            "--project={0}".format(project),
            "--job_name=kaggle-dataflow",
            "--save_main_session",
            "--region=asia-northeast1",
            "--staging_location=gs://{0}/kaggle-bucket-v1/".format(bucket),
            "--temp_location=gs://{0}/kaggle-bucket-v1/".format(bucket),
            "--max_num_workers=8",
            "--worker_region=asia-northeast3",
            "--worker_disk_type=compute.googleapis.com/projects//zones//diskTypes/pd-ssd",
            "--autoscaling_algorithm=THROUGHPUT_BASED",
            "--runner=DataflowRunner",
            "--worker_region=asia-northeast3"
        ]

        result_output = 'gs://kaggle-bucket-v1/result/result.csv'
        filename = "gs://{}/train.csv".format(bucket)
        pipeline = beam.Pipeline(argv=argv)
        ptransform = (pipeline
                      | "Read from GCS" >> beam.io.ReadFromText(filename)
                      | "Kaggle data pre-processing" >> beam.FlatMap(preprocessing)
                      )
   
  
    (ptransform
    | "events:out" >> beam.io.WriteToText(
            result_output
        )
     )
    
    pipeline.run()

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Run pipeline on the cloud")
    parser.add_argument("--project", dest="project", help="Unique project ID", required=True)
    parser.add_argument("--bucket", dest="bucket", help="Bucket where your data were ingested", required=True)
    parser.add_argument("--dataset", dest="dataset", help="BigQuery dataset")

    args = vars(parser.parse_args())

    print("Correcting timestamps and writing to BigQuery dataset {}".format(args["dataset"]))

    run(project=args["project"], bucket=args["bucket"], dataset=args["dataset"])

Thank you for reading :)

question from:https://stackoverflow.com/questions/65837969/write-files-to-gcs-using-dataflow

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

Method beam.io.WriteToText automatically splits files when writing for best performance. You can explicitly add parameter num_shards = 1 if you want 1 file only.

num_shards (int) – The number of files (shards) used for output. If not set, the service will decide on the optimal number of shards. Constraining the number of shards is likely to reduce the performance of a pipeline. Setting this value is not recommended unless you require a specific number of output files.

Your writing to text should look like this:

(ptransform
    | "events:out" >> beam.io.WriteToText(
            result_output,num_shards=1
        )
     )

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...