Jinja Airflow 2.2

40 Views Asked by At

I'm currently trying to find the best way of creating multiple PodOperators in Airflow 2.2.3 (Partial/Expand Pods not available yet) using on a xcom.pull.

The content of "{{ task_instance.xcom_pull(task_ids='any_previous_task') }}" is a list, which is already handled as str and I already tried couple of ways to convert it, like

  • ast.literal_eval
  • eval
  • Adding Jinja type, | list

None of above methods worked. The only way which works is by calling a PythonOperator, however, defining an Operator inside another operator is not a good practice

Here's a piece of code

    operators = []
    prev_val = "{{ task_instance.xcom_pull(task_ids='any_previous_task') }}"
    if prev_val:
        o = BashOperator(
            task_id='task_test',
            bash_command="echo 'TYPE {}, CONTENT {} <<<< was executed '".format(type(prev_val), prev_val),
            dag=dag
        )
    operators.append(o)   

start >> operators
0

There are 0 best solutions below