I'm new to Airflow and got some problems exchanging variables between a Python function and a Taskgroup. (I understand that this is not the Main Point of Airflow, but its necessary in my case). For a better understanding I added this codesnippet, which should convey, what I am intending to do:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task_group
import logging
from pendulum import datetime
def push_function(**kwargs):
files = ['a','b','c']
kwargs['ti'].xcom_push(key='files', value=files)
with DAG(
"tst",
start_date=datetime(2023, 11, 7),
schedule_interval="30 6 * * *",
catchup=False,
) as dag:
push = PythonOperator(
task_id='push_task',
python_callable=push_function,
dag=dag,
)
@task_group(group_id="group")
def pull_task(**kwargs):
data= kwargs['ti'].xcom_pull(task_ids='push_task', key='files')
logging.info(f"transfaired Variable: {folders_today}")
for item in data:
filepath = f"/tmp/{item}.xml"
extract_load = SFTPOperator(
task_id=f"download_{item}",
ssh_conn_id="sftp",
remote_filepath=f"{item}.xml",
local_filepath=filepath,
operation="get",
create_intermediate_dirs=True
)
push >> pull_task()
The provided Code So basically my question is: Is there any way to get my Variable in this case: 'files' from the function 'push_function' to the function 'pull_task'?
Like this the Line:
folders_today= kwargs['ti'].xcom_pull(task_ids='push_task', key='files')
gives me the error:
KeyError: 'ti'
but I've tried also other methods and got to understand, that the task group context is apparently not available in this function. But I am not sure, what that means and how I can make it available.
The section you annotate with
@task_group
needs to specify a subdag of tasks to run in the group:and then in the outer
dag:
context, route to the group: