Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
593 views
in Technique[技术] by (71.8m points)

deadlock - Airflow celery worker will be blocked if sensor number large than concurrency?

Let's say, I set celery concurrency to n, but I have m(m>n) ExternalTaskSensor in a dag, it will check another dag named do_sth, these ExternalTaskSensor will consume all celery worker, so that no one will work in fact.

But I can't set concurreny too high(like 2*m), because dag do_sth may start too many process which will lead to out of memory.

I am confused what number I should set to celery concurrency?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

In ETL best practices with Airflow's Gotchas section the author addresses this general problem. One of the suggestions is to setup a pool for your sensor tasks so that your other tasks don't get starved. For your situation determine the number of sensor tasks that you want running at one time (less than your concurrency level) and setup a pool with that as a limit. Once your pool is setup pass the pool argument to each of your sensor operators. For more on pools see Airflow's documentation on concepts. Here is an example of passing a pool argument to an operator:

aggregate_db_message_job = BashOperator( 
    task_id='aggregate_db_message_job', 
    execution_timeout=timedelta(hours=3), 
    pool='ep_data_pipeline_db_msg_agg',
    bash_command=aggregate_db_message_job_cmd, dag=dag)

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...