I am trying to read a query from BigQuery and then with Apache Beam / Dataflow in Kotlin I want to add a column with the current date as a timestamp. I don't want to do it inside the query itself because I want to reuse this code for a big amount of queries and it looks like a better design.
This is the pipeline code I wrote:
val pipeline = Pipeline.create(options)
.apply("Retrieve query", BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
.apply("Add date", ParDo.of(AddDate()))
.apply("Store data", BigQueryIO.writeTableRows().withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.to(TableReference().setProjectId(gcpProject).setDatasetId(datasetId).setTableId(tableId))
For some reason it does not advance from the Add date
transformation.
This is the code which is most likely to have the bug / error:
class AddDate : DoFn<TableRow, TableRow>() {
@ProcessElement
fun processElement(context: ProcessContext) {
val tableRow = context.element() as TableRow
tableRow.set("process_date", Instant.now())
context.output(tableRow)
}
}
I also tried with this code instead inside processElement
, but still does not work.
context.outputWithTimestamp(context.element(), Instant.now())
The error is the following:
Input values must not be mutated in any way.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…