Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
415 views
in Technique[技术] by (71.8m points)

Dataflow / BigQuery FILE_LOADS: Job did not reach to a terminal state after waiting indefinitely

I have following pipeline

 with beam.Pipeline(options=pipeline_options) as pipeline:

        (
                p
                | "Read Pub/Sub Messages" >> beam.io.ReadFromPubSub(subscription=pubsub_subscription).with_output_types(bytes)
                | 'Fetch from API 1' >> beam.Map(fetch_1)
                | 'Filter out invalid data' >> beam.Filter(lambda item: item is not None)
                | 'Fetch from API 2' >> beam.Map(fetch_1)
                | 'Filter out invalid data' >> beam.Filter(lambda item: item is not None)
                | 'Parse Article to BQ json' >> beam.Map(parse_to_bq_json)
                | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table='BQ_TABLE_NAME',
                                                               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                               method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
                                                               triggering_frequency=5
                                                               )
        )

Which runs as expected when I run it with DirectRunner but ends with

 Job did not reach to a terminal state after waiting indefinitely.

Nothing more, nothing less. Docs or other mentions about similar case very limited, so any feedback more than welcome.

Sample from last lines:

INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.574Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/GroupByKey/WriteStream into WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/PairWithVoidKey
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.603Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/GroupByKey/MergeBuckets into WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/GroupByKey/ReadStream
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.637Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/Values into WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/GroupByKey/MergeBuckets
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.672Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/StreamingPCollectionViewWriter into WriteToBigQuery/BigQueryBatchFileLoads/WaitForDestinationLoadJobs/_UnpickledSideInput(MapToVoidKey0.out.0)/Values
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.705Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/PassTables/PassTables into WriteToBigQuery/BigQueryBatchFileLoads/WaitForCopyJobs/WaitForCopyJobs
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.739Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/AddUselessValue into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/PassTables/PassTables
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.772Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/DeduplicateTables/WriteStream into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/AddUselessValue
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.822Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/DeduplicateTables/MergeBuckets into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/DeduplicateTables/ReadStream
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.848Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/GetTableNames into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/DeduplicateTables/MergeBuckets
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.880Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/Delete into WriteToBigQuery/BigQueryBatchFileLoads/RemoveTempTables/GetTableNames
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.915Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/ImpulseEmptyPC/FlatMap(<lambda at core.py:3024>) into WriteToBigQuery/BigQueryBatchFileLoads/ImpulseEmptyPC/Impulse
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:50.939Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/ImpulseEmptyPC/Map(decode) into WriteToBigQuery/BigQueryBatchFileLoads/ImpulseEmptyPC/FlatMap(<lambda at core.py:3024>)
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.008Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/Flatten/FlattenReplace/WriteStream into WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithTempTables/ParDo(TriggerLoadJobs)/ParDo(TriggerLoadJobs)
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.033Z: JOB_MESSAGE_DETAILED: Fusing consumer WriteToBigQuery/BigQueryBatchFileLoads/Flatten/FlattenReplace/WriteStream into WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/TriggerLoadJobsWithoutTempTables
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.165Z: JOB_MESSAGE_ERROR: Workflow failed.
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.205Z: JOB_MESSAGE_DETAILED: Cleaning up.
INFO:apache_beam.runners.dataflow.dataflow_runner:2021-01-22T17:27:51.252Z: JOB_MESSAGE_BASIC: Worker pool stopped.
Traceback (most recent call last):
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/pydevd.py", line 1477, in _exec
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"
", file, 'exec'), glob, loc)
  File "/Users/XXXX/dev/XXXX/app/app.py", line 151, in <module>
    run(args, pipeline_args)
  File "/Users/XXXX/dev/XXXX/app/app.py", line 108, in run
    p.run().wait_until_finish()
  File "/Users/XXXX/.virtualenvs/app/lib/python3.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1675, in wait_until_finish
    'Job did not reach to a terminal state after waiting indefinitely.')
AssertionError: Job did not reach to a terminal state after waiting indefinitely.

Edit 1: Adding output from console log (unfortunately not much info there):

{
textPayload: "Workflow failed."
insertId: "1rtvonbcgg5"
resource: {
type: "dataflow_step"
labels: {
project_id: "437008213460"
job_name: "app-test"
step_id: ""
region: "europe-west1"
job_id: "2021-01-22_11_22_27-2214838125974198028"
}
}
timestamp: "2021-01-22T19:22:37.425862432Z"
severity: "ERROR"
labels: {
dataflow.googleapis.com/job_id: "2021-01-22_11_22_27-2214838125974198028"
dataflow.googleapis.com/job_name: "app-test"
dataflow.googleapis.com/log_type: "system"
dataflow.googleapis.com/region: "europe-west1"
}
logName: "projects/some-project-eu/logs/dataflow.googleapis.com%2Fjob-message"
receiveTimestamp: "2021-01-22T19:22:39.086520796Z"
}

Edit 2: Adding simplified version:

def foo(stream_data):
    return str(datetime.now())

with beam.Pipeline(options=pipeline_options) as p:
    (
        p
        | "Read Pub/Sub Messages" >> beam.io.ReadFromPubSub(subscription=pubsub_subscription).with_output_types(bytes)
        | 'Do foo' >> beam.Map(foo)
        | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(table=bq_project + ':' + bq_dataset + '.' + TABLE_NAME,
                                                       schema={"fields": [{"name": "foo_ts", "type": "TIMESTAMP"}]},
                                                       create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                       write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                                                       method=beam.io.WriteToBigQuery.Method.FILE_LOADS,
                                                       triggering_frequency=5,
                                                       )
    )

and my run commands:

streaming_app.py 
  --input_subscription projects/awesome_project/subscriptions/sub-test 
  --runner DataflowRunner 
  --bq_project awesome_project 
  --bq_dataset awesome_dataset 
  --region europe-west1 
  --temp_location gs://awesome-nlp 
  --job_name hope-it-works-test 
  --setup_file ./setup.py 
  --max_num_workers 10

Edit 3: Adding also job id of one of the failed jobs: 2021-01-24_06_31_49-168256842937211337

question from:https://stackoverflow.com/questions/65850110/dataflow-bigquery-file-loads-job-did-not-reach-to-a-terminal-state-after-wait

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Can you try comparing your code with sample Dataflow runner code given over example. As I cannot see your complete code, but if you try to fit your code over sample given above, It will run over Dataflow runner.

EDIT 1:

Please find below a working example:-


#------------Import Lib-----------------------#
import apache_beam as beam
from apache_beam import window
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import os, sys, time
import argparse
import logging
from apache_beam.options.pipeline_options import SetupOptions
from datetime import datetime

#------------Set up BQ parameters-----------------------#
# Replace with Project Id
project = 'xxxxxxxxxxx'
Pubsub_subscription='projects/xxxxxxxxxxx/subscriptions/Pubsubdemo_subscription'
#plitting Of Records----------------------#

class Transaction_ECOM(beam.DoFn):
    def process(self, element):
        logging.info(element)

        result = json.loads(element)
        data_bkt = result.get('_bkt','null')
        data_cd=result.get('_cd','null')
        data_indextime=result.get('_indextime','0')
        data_kv=result.get('_kv','null')
        data_raw=result['_raw']
        data_raw1=data_raw.replace("
", "")
        data_serial=result.get('_serial','null')
        data_si = str(result.get('_si','null'))
        data_sourcetype =result.get('_sourcetype','null')
        data_subsecond = result.get('_subsecond','null')
        data_time=result.get('_time','null')
        data_host=result.get('host','null')
        data_index=result.get('index','null')
        data_linecount=result.get('linecount','null')
        data_source=result.get('source','null')
        data_sourcetype1=result.get('sourcetype','null')
        data_splunk_server=result.get('splunk_server','null')

        return [{"datetime_indextime": time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(int(data_indextime))), "_bkt": data_bkt, "_cd": data_cd,  "_indextime": data_indextime,  "_kv": data_kv,  "_raw": data_raw1,  "_serial": data_serial,  "_si": data_si, "_sourcetype": data_sourcetype, "_subsecond": data_subsecond, "_time": data_time, "host": data_host, "index": data_index, "linecount": data_linecount, "source": data_source, "sourcetype": data_sourcetype1, "splunk_server": data_splunk_server}]



def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()

    known_args, pipeline_args = parser.parse_known_args(argv)


    pipeline_options = PipelineOptions(pipeline_args, streaming=True)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    p1 = beam.Pipeline(options=pipeline_options)



    data_loading = (
        p1
        | "Read Pub/Sub Messages" >> beam.io.ReadFromPubSub(subscription=Pubsub_subscription)

    )


    project_id = "xxxxxxxxxxx"
    dataset_id = 'test123'
    table_schema_ECOM = ('datetime_indextime:DATETIME, _bkt:STRING, _cd:STRING, _indextime:STRING, _kv:STRING, _raw:STRING, _serial:STRING, _si:STRING, _sourcetype:STRING, _subsecond:STRING, _time:STRING, host:STRING, index:STRING, linecount:STRING, source:STRING, sourcetype:STRING, splunk_server:STRING')

        # Persist to BigQuery
        # WriteToBigQuery accepts the data as list of JSON objects

#---------------------Index = ITF----------------------------------------------------------------------------------------------------------------------
    result = (
    data_loading
        | 'Clean-ITF' >> beam.ParDo(Transaction_ECOM())
        | 'Write-ITF' >> beam.io.WriteToBigQuery(
                                                    table='CFF_ABC',
                                                    dataset=dataset_id,
                                                    project=project_id,
                                                    schema=table_schema_ECOM,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                                                    ))

    result = p1.run()
    result.wait_until_finish()


if __name__ == '__main__':
  path_service_account = '/home/vibhg/Splunk/CFF/xxxxxxxxxxx-abcder125.json'
  os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account
  run()


It has few additional libraries so just ignore it.

Some key features which are :-

  1. Set 'streaming' as 'True'
  2. Subscription name should be of format ''projects/>xxxxxxxxxxx>/subscriptions/>subscription name>''

Sample data which is published on topic, that will be captured from Subscription is given below:-

{"_bkt": "A1E8-A5370FECA146", "_cd": "412:140787687", "_indextime": "1611584940", "_kv": "1", "_raw": "2021-01-25 14:28:59,126 INFO  [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType="BsCall", fulName="EBCMFSSALES02", BusinessServiceName="BsSalesOrderCreated", Locality="NA", Success="True", BsExecutionTime="00:00:00.005", OrderNo="374941817", Locality="NA" , [fulName="EBCMFSSALES02"], [bsName="BsSalesOrderCreated"], [userId="s-oitp-u-global"], [userIdRegion="NA"], [msgId="aaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbcccc"], [msgIdSeq="2"], [originator="ISOM"] ", "_serial": "0", "_si": ["9ttr-bfc-gcp-europe-besti1", "itf"], "_sourcetype": "BBClog", "_subsecond": ".126", "_time": "2021-01-25 14:28:59.126 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount": "1", "source": "/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log", "sourcetype": "BBClog", "web_server": "9ttr-bfc-gcp-europe-besti1"}
[vibhg@aiclassificationdev8 jobrun]$ head -2 ITF_202101251435
{"_bkt": "itf~412~2EE5428B-7CEA-4C49-A1E8-A5370FECA146", "_cd": "412:140787687", "_indextime": "1611584940", "_kv": "1", "_raw": "2021-01-25 14:28:59,126 INFO  [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType="BsCall", fulName="EBCMFSSALES02", BusinessServiceName="BsSalesOrderCreated", Locality="NA", Success="True", BsExecutionTime="00:00:00.005", OrderNo="374941817", Locality="NA" , [fulName="EBCMFSSALES02"], [bsName="BsSalesOrderCreated"], [userId="s-oitp-u-global"], [userIdRegion="NA"], [msgId="aaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbcccc"], [msgIdSeq="2"], [originator="ISOM"] ", "_serial": "0", "_si": ["9ttr-bfc-gcp-europe-besti1", "itf"], "_sourcetype": "BBClog", "_subsecond": ".126", "_time": "2021-01-25 14:28:59.126 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount": "1", "source": "/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log", "sourcetype": "BBClog", "web_server": "9ttr-bfc-gcp-europe-besti1"}
{"_bkt": "9-A1E8-A5370FECA146", "_cd": "412:140787671", "_indextime": "1611584940", "_kv": "1", "_raw": "2021-01-25 14:28:58,659 INFO  [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType="BsCall", fulName="EBCMFSSALES02", BusinessServiceName="BsCreateOrderV2", BsExecutionTime="00:00:01.568", OrderNo="374942155", CountryCode="US", ClientSystem="owfe-webapp" , [fulName="EBCMFSSALES02"], [bsName="BsCreateOrderV2"], [userId="s-salja1-u-irssemal"], [userIdRegion="NA"], [msgId="6652311fece28966"], [msgIdSeq="25"], [originator="SellingApi"] ", "_serial": "1", "_si": ["9ttr-bfc-gcp-europe-besti1", "itf"], "_sourcetype": "BBClog", "_subsecond": ".659", "_time": "2021-01-25 14:28:58.659 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount": "1", "source": "/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log", "sourcetype": "BBClog", "web_server": "9ttr-bfc-gcp-europe-besti1"}

You can execute script with following command :-

python script.py --region europe-west1 --project xxxxxxx --temp_location gs://temp/temp --runner DataflowRunner --job_name name

As It looks like you have missed to set Streaming parameter in your code.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...