Cannot access task instance to get xcom

232 Views Asked by At

I am new to using xcoms in Airflow.

Here's what my code looks like:

with DAG(default_args=default_args,
         max_active_runs=1,
         dag_id="my_dag",
         catchup=False,
         start_date=start_date,
         render_template_as_native_obj=True,
         schedule_interval=None) as my_dag:

    init_task = GKEStartPodOperator(
        task_id="init_task",
        name="init_task",
        retries=0,
        do_xcom_push=True,
        project_id=PROJECT_ID,
        location=REGION,
        cluster_name=CLUSTER,
        image=image,
        arguments=[],
        in_cluster=False,
        is_delete_operator_pod=True,
        startup_timeout_seconds=300,
        namespace=NAMESPACE,
        service_account_name=SA,
    )


    task_1 = PythonOperator(
        task_id="task_1",
        do_xcom_push=False,
        op_kwargs={
            'connector_key':"e_conns",
        },
        python_callable=lambda: do_task(),
    )

    task_2 = PythonOperator(
        task_id="task_2",
        do_xcom_push=False,
        op_kwargs={
            'connector_key':"f_conns",
        },
        python_callable=lambda: do_task(),
    )
    init_task >> task_1 >> task_2

def do_task(**kwargs) -> int:
    ti = kwargs['ti']   
    connectors = ti.xcom_pull(task_ids='init_task')[kwargs['connector_key']]
    #remainder removed (not relevant)

I can see that the xcom push from init_task is successful, but the do_task method is unable to pull xcom values. I get an error KeyError: 'ti' on ti = kwargs['ti'].

I've seen numerous ways to handle xcoms and I'm a little confused. I thought that Airflow would inject the task instance for the key ti into kwargs, but it doesn't seem like the task instance is ever getting to the method for the PythonOperator.

Is there a way to get the task instance into the method?

(Airflow is version 2.4.3 on GCP Composer)

4

There are 4 best solutions below

0
On BEST ANSWER

Well, I feel silly and a little embarrassed. It seems I don't know Python as well as I thought I did.

I had declared my methods after the DAG, which was a mistake. Also, I had referenced the method as lambda, which wasn't necessary.

Code like this works fine:


def do_task(ti, **kwargs) -> int:
    connectors = ti.xcom_pull(task_ids='init_task')[kwargs['connector_key']]


with DAG(default_args=default_args,
         max_active_runs=1,
         dag_id="my_dag",
         catchup=False,
         start_date=start_date,
         render_template_as_native_obj=True,
         schedule_interval=None) as my_dag:

    init_task = GKEStartPodOperator(
        task_id="init_task",
        name="init_task",
        retries=0,
        do_xcom_push=True,
        project_id=PROJECT_ID,
        location=REGION,
        cluster_name=CLUSTER,
        image=image,
        arguments=[],
        in_cluster=False,
        is_delete_operator_pod=True,
        startup_timeout_seconds=300,
        namespace=NAMESPACE,
        service_account_name=SA,
    )


    task_1 = PythonOperator(
        task_id="task_1",
        do_xcom_push=False,
        op_kwargs={
            'connector_key':"e_conns",
        },
        python_callable=do_task,
    )

    task_2 = PythonOperator(
        task_id="task_2",
        do_xcom_push=False,
        op_kwargs={
            'connector_key':"f_conns",
        },
        python_callable=do_task,
    )
    init_task >> task_1 >> task_2

2
On

You can directly reference the output of one task as the input of another:


def do_task(some_value, connector_key):
    print(f"The output of init_task is {some_value[connector_key]}")

with DAG(...) as dag:
    init_task = GKEStartPodOperator(
        ...
    )


    task_1 = PythonOperator(
        task_id="task_1",
        do_xcom_push=False,
        op_kwargs={
            "some_value": init_task.output, # referenced here
            'connector_key':"e_conns",
        },
        python_callable=do_task,
    )

    task_2 = PythonOperator(
        task_id="task_2",
        do_xcom_push=False,
        op_kwargs={
            "some_value": init_task.output, # referenced here
            'connector_key':"f_conns",
        },
        python_callable=do_task,
    )

0
On

Based on what @Yusuf Ganiyu just posted, here's a sample code

In your DAG:

sampleTask = PythonOperator(
    task_id="task_id_goes_here",
    python_callable=pythonFunction,
    provide_context=True, # very important
    op_kwargs={"test": "testing"}, #if you have parameters
    dag=dag
)

In your python function:

def pythonFunction(**kwargs):
    ti = kwargs['ti']
    result = ti.xcom_pull(key='here_is_the_key', task_ids='task_id_where_you_push_to_x_com')
3
On

task.output works really well but if you want it to be handled in a different way, usually you pull from x_com by:

result = ti.xcom_pull(task_ids='TASK_ID_HERE', key='KEY_HERE')

Those are the only two required params.

You may also want to ensure you're able serialise the data you're pushing to x_com so it's easily deserialised.