Im exporting some tables from PostgreSQL to GCS. To make it looks simple, I have created the dag looks like below.
The export dag is this.
from airflow.models import DAG
from airflow.contrib.operators.postgres_to_gcs_operator import PostgresToGoogleCloudStorageOperator
def sub_dag_export(parent_dag_name, child_dag_name, args, export_suffix):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
start_date=args['start_date'],
max_active_runs=1,
)
export_tbl1 = PostgresToGoogleCloudStorageOperator(
task_id='export_tbl1',
postgres_conn_id='cloudsqlpg',
google_cloud_storage_conn_id='gcsconn',
sql='SELECT * FROM tbl1',
export_format='csv',
field_delimiter='|',
bucket='dsrestoretest',
filename='file/export_tbl1/tbl1_{}.csv',
schema_filename='file/schema/tbl1.json',
dag=dag)
export_tbl1 = PostgresToGoogleCloudStorageOperator(
task_id='export_tbl2',
postgres_conn_id='cloudsqlpg',
google_cloud_storage_conn_id='gcsconn',
sql='SELECT * FROM tbl2',
export_format='csv',
field_delimiter='|',
bucket='dsrestoretest',
filename='file/export_tbl1/tbl2_{}.csv',
schema_filename='file/schema/tbl2.json',
dag=dag)
Both task 1 and 2 doing the same work,so I want to reuse my export1 task for all the table. But it should not change the flow. (Start --> export table1 --> table2 -->table3 --end
), because due to some reasons, if the task is failed, I need to re-run the task from where its failed. So even Im going to use a single task, the DAG diagram should be the same.
I saw there is a way(from this link), but still, I'm not able to understand this fully.
Simply you could extract the common code in to a function and have it create operator instances for you.