Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
307 views
in Technique[技术] by (71.8m points)

google cloud platform - Upload CSV format files to GCS as a Dataflow work

Upload the file to Google Cloud Storage.

Dataflow is being pre-processed by reading batch data.

The workload is read from Google Cloud Storage (GCS) to process Dataflow and upload it back to GCS.

But after processing the data, I checked the GCS.

 #-*- coding: utf-8 -*-
import apache_beam as beam
import csv
import json
import os
import re
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

def preprocessing(fields):
    fields = fields.split(",")
    header = "label"
    for i in range(0, 784):
        header += (",pixel" + str(i))
    label_list_str = "["
    label_list = []
    for i in range(0,10) :
        if fields[0] == str(i) :
            label_list_str+=str(i)
        else :
            label_list_str+=("0")
        if i!=9 :
            label_list_str+=","
    label_list_str+="],"
    for i in range(1,len(fields)) :
        label_list_str+=fields[i]
        if i!=len(fields)-1:
            label_list_str+=","
    yield label_list_str


def run(project, bucket, dataset) :
        argv = [
            "--project={0}".format(project),
            "--job_name=kaggle-dataflow",
            "--save_main_session",
            "--region=asia-northeast1",
            "--staging_location=gs://{0}/kaggle-bucket-v1/".format(bucket),
            "--temp_location=gs://{0}/kaggle-bucket-v1/".format(bucket),
            "--max_num_workers=8",
            "--worker_region=asia-northeast3",
            "--worker_disk_type=compute.googleapis.com/projects//zones//diskTypes/pd-ssd",
            "--autoscaling_algorithm=THROUGHPUT_BASED",
            "--runner=DataflowRunner",
            "--worker_region=asia-northeast3"
        ]

        result_output = 'gs://kaggle-bucket-v1/result/result.csv'
        filename = "gs://{}/train.csv".format(bucket)
        pipeline = beam.Pipeline(argv=argv)
        ptransform = (pipeline
                      | "Read from GCS" >> beam.io.ReadFromText(filename)
                      | "Kaggle data pre-processing" >> beam.FlatMap(preprocessing)
                      )
   
  
    (ptransform
    | "events:out" >> beam.io.WriteToText(
            result_output
        )
     )
    
    pipeline.run()

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Run pipeline on the cloud")
    parser.add_argument("--project", dest="project", help="Unique project ID", required=True)
    parser.add_argument("--bucket", dest="bucket", help="Bucket where your data were ingested", required=True)
    parser.add_argument("--dataset", dest="dataset", help="BigQuery dataset")

    args = vars(parser.parse_args())

    print("Correcting timestamps and writing to BigQuery dataset {}".format(args["dataset"]))

    run(project=args["project"], bucket=args["bucket"], dataset=args["dataset"])

I am trying to do now is to upload a file containing a header to GCS and add data by working with the name of the file. However, it doesn't work well.

How can I include the header in the CSV file?

question from:https://stackoverflow.com/questions/65842211/upload-csv-format-files-to-gcs-as-a-dataflow-work

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)
Waitting for answers

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...