airflow - reuse a task

1.9k Views Asked by At

Im exporting some tables from PostgreSQL to GCS. To make it looks simple, I have created the dag looks like below. enter image description here The export dag is this.

from airflow.models import DAG
from airflow.contrib.operators.postgres_to_gcs_operator import  PostgresToGoogleCloudStorageOperator

def sub_dag_export(parent_dag_name, child_dag_name, args, export_suffix):
    dag = DAG(
      '%s.%s' % (parent_dag_name, child_dag_name),
      default_args=args,
      start_date=args['start_date'],
      max_active_runs=1,
    )
    export_tbl1 = PostgresToGoogleCloudStorageOperator(
        task_id='export_tbl1',
        postgres_conn_id='cloudsqlpg',
        google_cloud_storage_conn_id='gcsconn',
        sql='SELECT * FROM tbl1',
        export_format='csv',
        field_delimiter='|',
        bucket='dsrestoretest',
        filename='file/export_tbl1/tbl1_{}.csv',
        schema_filename='file/schema/tbl1.json',
        dag=dag)

        export_tbl1 = PostgresToGoogleCloudStorageOperator(
        task_id='export_tbl2',
        postgres_conn_id='cloudsqlpg',
        google_cloud_storage_conn_id='gcsconn',
        sql='SELECT * FROM tbl2',
        export_format='csv',
        field_delimiter='|',
        bucket='dsrestoretest',
        filename='file/export_tbl1/tbl2_{}.csv',
        schema_filename='file/schema/tbl2.json',
        dag=dag)

Both task 1 and 2 doing the same work,so I want to reuse my export1 task for all the table. But it should not change the flow. (Start --> export table1 --> table2 -->table3 --end), because due to some reasons, if the task is failed, I need to re-run the task from where its failed. So even Im going to use a single task, the DAG diagram should be the same.

I saw there is a way(from this link), but still, I'm not able to understand this fully.

1

There are 1 best solutions below

1
On

Simply you could extract the common code in to a function and have it create operator instances for you.

def pg_table_to_gcs(table_name: str) -> PostgresToGoogleCloudStorageOperator:
  return PostgresToGoogleCloudStorageOperator(
      task_id=f"export_{table_name}",
      postgres_conn_id="cloudsqlpg",
      google_cloud_storage_conn_id="gcsconn",
      sql=f"SELECT * FROM {table_name}",
      export_format="csv",
      field_delimiter="|",
      bucket="dsrestoretest",
      filename=f"file/export_{table_name}/{table_name}.csv",
      schema_filename=f"file/schema/{table_name}.json",
      dag=dag)

tables = ["table0", "table1", "table2"]

with DAG(dag_id="kube_example", default_args=default_args) as dag:
    reduce(lambda t0, t1: t0 >> t1, [pg_table_to_gcs(table, dag) for table in table_names])