Airflow XCOM Pull in Taskgroup

466 Views Asked by At

I'm new to Airflow and got some problems exchanging variables between a Python function and a Taskgroup. (I understand that this is not the Main Point of Airflow, but its necessary in my case). For a better understanding I added this codesnippet, which should convey, what I am intending to do:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task_group
import logging
from pendulum import datetime

def push_function(**kwargs):
    files = ['a','b','c']
    kwargs['ti'].xcom_push(key='files', value=files)

with DAG(
    "tst",
    start_date=datetime(2023, 11, 7),
    schedule_interval="30 6 * * *",
    catchup=False,
) as dag:
    push = PythonOperator(
        task_id='push_task',
        python_callable=push_function,
        dag=dag,
    )

    @task_group(group_id="group")
    def pull_task(**kwargs):
        data= kwargs['ti'].xcom_pull(task_ids='push_task', key='files')
        logging.info(f"transfaired Variable: {folders_today}")
        for item in data:
                filepath = f"/tmp/{item}.xml"
                extract_load = SFTPOperator(
                    task_id=f"download_{item}",
                    ssh_conn_id="sftp",
                    remote_filepath=f"{item}.xml",
                    local_filepath=filepath,
                    operation="get",
                    create_intermediate_dirs=True
)
      
    push >> pull_task()

The provided Code So basically my question is: Is there any way to get my Variable in this case: 'files' from the function 'push_function' to the function 'pull_task'?

Like this the Line:

folders_today= kwargs['ti'].xcom_pull(task_ids='push_task', key='files')

gives me the error:

KeyError: 'ti'

but I've tried also other methods and got to understand, that the task group context is apparently not available in this function. But I am not sure, what that means and how I can make it available.

2

There are 2 best solutions below

3
On

The section you annotate with @task_group needs to specify a subdag of tasks to run in the group:

  @task_group(group_id='group')
  def pull_group():

    @task
    def pull_function(**kwargs):
      data = kwargs['ti'].xcom_pull(task_ids='push_task', key='files')
      print(data)

    ... [more tasks]

    # Routing
    pull_function()                        # >> another_task_in_pull_group()

and then in the outer dag: context, route to the group:

push >> pull_group()
0
On

As far as I understood, you want to iterate over the output from previous task. In this case you'd need to use task decorators.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.decorators import task_group, task
from airflow.operators.dummy_operator import DummyOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    "my_dag",
    default_args=default_args,
    description="A simple DAG",
    schedule_interval=None,
    start_date=datetime(2024, 2, 20),
) as dag:

    start_task = DummyOperator(task_id="start_task")

    @task_group(group_id="my_group")
    def my_group():

        @task
        def push_task():
            return ["record1", "record2", "record3", "record4", "record5"]

        @task
        def pull_task1(value):
            # insert sftp operation code here #
            print("iterated value:", value)

        # @task
        # def pull_task2(value):
        #     # insert sftp operation code here #
        #     print("iterated value:", value)

        def executioner():

            value_tsk = push_task()
            t1 = pull_task1.expand(value=value_tsk)
            # t2 = pull_task2.expand(value=value_tsk)
            # for parallel execution -> 't1 >> t2'

        executioner()

    end_task = DummyOperator(task_id="end_task")

    start_task >> my_group() >> end_task

However, If we're using task decorators then we can't use the operators directly (in your case SFTPOperator).

Hope this helps!