Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.3k views
in Technique[技术] by (71.8m points)

airflow - How to pull xcom value from other task instance in the same DAG run (not the most recent one)?

I have 3 DAG runs:

  1. DAGR 1 executed at 2019-02-13 16:00:00
  2. DAGR 2 executed at 2019-02-13 17:00:00
  3. DAGR 3 executed at 2019-02-13 18:00:00

In a task instance X of DAGR 1 I want to get xcom value of task instance Y. I did this:

kwargs['task_instance'].xcom_pull(task_ids='Y')

I expected to get value of xcom from task instance Y in DAGR 1. Instead I got from DAGR 3.

From Airflow documentation

If xcom_pull is passed a single string for task_ids, then the most recent XCom value from that task is returned; ...

  1. Why Airflow xcom_pull return the most recent xcom value?
  2. What if I want to pull from the same DAG run?
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

This asnwers your question [How to pull xcom value from other task instance in the same DAG run (not the most recent one)? ]
See the example below :

t1 = SomeOperator(
        task_id='Your_t1_Task_ID',
        xcom_push = True,
        ...
        ...
        dag=dag)

    def get_records(**kwargs):
        ti = kwargs['ti']
        xcom = ti.xcom_pull(task_ids='Your_t1_Task_ID')
        string_to_print = 'Value in xcom is: {}'.format(xcom)
        #string_to_print holds that value, you can also print it in the logs
        logging.info(string_to_print)

    t2 = PythonOperator(
        task_id='records',
        provide_context=True,
        python_callable=get_records,
        dag=dag)

    t1 >> t2

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...