I have created a DAG on the Composer(Airflow), that successfully creates a cluster:
with airflow.DAG(dag_id='......',
default_args=default_args,
catchup=False,
description='.......',
schedule_interval=None) as dag:
start_of_the_dag = DummyOperator(task_id='start_of_the_dag')
create_dataproc_cluster = DataprocClusterCreateOperator(
task_id='............',
cluster_name='..........',
storage_bucket=os.environ['gcs_bucket'],
project_id=os.environ['gcp_project'],
service_account=os.environ['service_account'],
master_machine_type='n1-standard-32',
worker_machine_type='n1-standard-32',
num_workers=4,
num_preemptible_workers=0,
image_version='1.3-debian10',
metadata={"enable-oslogin":"true"},
# custom_image=job['cluster_config']['image'],
internal_ip_only=True,
region=os.environ['gcp_region'],
zone=os.environ['gce_zone'],
subnetwork_uri=os.environ['subnetwork_uri'],
tags=os.environ['firewall_rules_tags'].split(','),
autoscaling_policy=None,
enable_http_port_access=True,
enable_optional_components=True,
init_actions_uris=None,
auto_delete_ttl=3600,
dag=dag
)
start_of_the_dag>>create_dataproc_cluster
What I want to do is create a python or shell script that will be executed from the master node of the cluster that I just created. Using BashOperator of PythonOperator the execution is made on the composer cluster. (Is that correct?)
To sum up, I want to use an operator after create_dataproc_cluster
that will execute some commands from the master node of the dataproc cluster.
question from:
https://stackoverflow.com/questions/65830467/execute-a-bash-or-python-script-on-dataproc-cluster-using-dag 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…