Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
405 views
in Technique[技术] by (71.8m points)

kotlin - How to add a column to query results in Google Dataflow

I am trying to read a query from BigQuery and then with Apache Beam / Dataflow in Kotlin I want to add a column with the current date as a timestamp. I don't want to do it inside the query itself because I want to reuse this code for a big amount of queries and it looks like a better design.

This is the pipeline code I wrote:

val pipeline = Pipeline.create(options)
  .apply("Retrieve query", BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
  .apply("Add date", ParDo.of(AddDate()))
  .apply("Store data", BigQueryIO.writeTableRows().withSchema(tableSchema)
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    .to(TableReference().setProjectId(gcpProject).setDatasetId(datasetId).setTableId(tableId))

For some reason it does not advance from the Add date transformation. This is the code which is most likely to have the bug / error:

class AddDate : DoFn<TableRow, TableRow>() {

    @ProcessElement
    fun processElement(context: ProcessContext) {
        val tableRow = context.element() as TableRow
        tableRow.set("process_date", Instant.now())
        context.output(tableRow)
    }
}

I also tried with this code instead inside processElement, but still does not work.

context.outputWithTimestamp(context.element(), Instant.now())

The error is the following:

Input values must not be mutated in any way.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The problem is solved by using a new object and being careful with the type used for the date (for DATE or TIMESTAMP types)

@ProcessElement
fun processElement(context: ProcessContext) {
    val tableRow = TableRow()
    tableRow.set("process_date", Instant.now().toString())
    val input = context.element() as TableRow
    input.keys.forEach { tableRow.set(it, input[it]) }
    context.output(tableRow)
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...