We have just started including airflow for scheduling.
(我们刚刚开始包括气流以进行调度。)
One of my scripts runs daily. (我的脚本之一每天运行。)
It uses the template parameter ({{ ds_nodash }}) to get the dates. (它使用模板参数({{ds_nodash}})获取日期。)
But I have to rerun for one day load (Past dated), how can I provide input parameter. (但是我必须重新运行一天的负载(过去的日期),如何提供输入参数。)
Input parameter will override the ds_nodash. (输入参数将覆盖ds_nodash。)
I have :
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade {{ ds_nodash }} "
Would like to run for
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade **20190601** "
Code snippet Below:
(下面的代码段:)
import os
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.contrib.operators.bigquery_check_operator import BigQueryCheckOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 6, 19),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('create-data-set-job', default_args=default_args)
projct_dr='/home/airflow/projects/'
trade_acx_ld="/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh"
trade_acx_ld_cmd = "/home/airflow/projects/wrapper/gen_bq_gcs_file_load.sh trade_acx.csv l1_gcb_trxn trade {{ ds_nodash }} "
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
if os.path.exists(trade_acx_ld):
t2 = BashOperator(
task_id= 'Dataset_create',
bash_command=trade_acx_ld_cmd,
dag=dag
)
else:
raise Exception("Cannot locate {0}".format(trade_acx_ld_cmd))
t2.set_upstream(t1)
ask by user3858193 translate from so 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…