Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.2k views
in Technique[技术] by (71.8m points)

airflow - Wiring top-level DAGs together

I need to have several identical (differing only in arguments) top-level DAGs that can also be triggered together with following constraints / assumptions:

  • Individual top-level DAGs will have schedule_interval=None as they will only need occasional manual triggering
  • The series of DAGs, however, needs to run daily
  • Order and number of DAGs in series is fixed (known ahead of writing code) and changes rarely (once in a few months)
  • Irrespective of whether a DAG fails or succeeds, the chain of triggering must not break
  • Currently they must be run together in series; in future they may require parallel triggering

So I created one file for each DAG in my dags directory and now I must wire them up for sequential execution. I have identified two ways this could be done:

  1. SubDagOperator

  2. TriggerDagRunOperator

    • Works in my demo but runs in parallel (not sequentially) as it doesn't wait for triggered DAG to finish before moving onto next one
    • ExternalTaskSensor might help overcome above limitation but it would make things very messy

My questions are

  • How to overcome limitation of parent_id prefix in dag_id of SubDags?
  • How to force TriggerDagRunOperators to await completion of DAG?
  • Any alternate / better way to wire-up independent (top-level) DAGs together?
  • Is there a workaround for my approach of creating separate files (for DAGs that differ only in input) for each top-level DAG?

I'm using puckel/docker-airflow with

  • Airflow 1.9.0-4
  • Python 3.6-slim
  • CeleryExecutor with redis:3.2.7

EDIT-1

Clarifying @Viraj Parekh's queries

Can you give some more detail on what you mean by awaiting completion of the DAG before getting triggered?

When I trigger the import_parent_v1 DAG, all the 3 external DAGs that it is supposed to fire using TriggerDagRunOperator start running parallely even when I chain them sequentially. Actually the logs indicate that while they are fired one-after another, the execution moves onto next DAG (TriggerDagRunOperator) before the previous one has finished. enter image description here enter image description here NOTE: In this example, the top-level DAGs are named as importer_child_v1_db_X and their corresponding task_ids (for TriggerDagRunOperator) are named as importer_v1_db_X

Would it be possible to just have the TriggerDagRunOperator be the last task in a DAG?

I have to chain several similar (differing only in arguments) DAGs together in a workflow that triggers them one-by-one. So there isn't just one TriggerDagRunOperator that I could put at last, there are many (here 3, but would be upto 15 in production)

Question&Answers:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Taking hints from @Viraj Parekh's answer, I was able to make TriggerDagRunOperator work in the intended fashion. I'm hereby posting my (partial) answer; will update as and when things become clear.


How to overcome limitation of parent_id prefix in dag_id of SubDags?

As told @Viraj, there's no straight way of achieving this. Extending SubDagOperator to remove this check might work but I decided to steer clear of it


How to force TriggerDagRunOperators to await completion of DAG?

  • Looking at the implementation, it becomes clear that the job of TriggerDagRunOperator is just to trigger external DAG; and that's about it. By default, it is not supposed to wait for completion of DAG. Therefore the behaviour I'm observing is understandable.

  • ExternalTaskSensor is the obvious way out. However while learning basics of Airflow I was relying on manual triggering of DAGs (schedule_interval=None). In such case, ExternalTaskSensor makes it difficult to accurately specify execution_date for the external task (who's completion is being awaited), failing which the sensor gets stuck.

  • So taking hint from implementation, I made minor adjustment to behaviour of ExternalTaskSensor by awaiting completion of all task_instances of concerned task having

    execution_date[external_task] >= execution_date[TriggerDagRunOperator] + execution_delta

    This achieves the desired result: external DAGs run one-after-other in sequence.


Is there a workaround for my approach of creating separate files (for DAGs that differ only in input) for each top-level DAG?

Again going by @Viraj this can be done by assigning DAGs to global scope using globals()[dag_id] = DAG(..)


EDIT-1

Maybe I was referring to incorrect resource (the link above is already dead), but ExternalTaskSensor already includes the params execution_delta & execution_date_fn to easily restrict execution_date(s) for the task being sensed.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...