I am trying to create a dag where I get table names from a table tracker in one task and then I want to run different SQL operations(tasks) on each table I am able to pass table names from one task to another task using xcom but task in the for loop are not getting executed I am not even able to see those tasks in web UI
from __future__ import print_function
from airflow import DAG
from airflow.operators.sensors import SqlSensor
from airflow import hooks
from airflow.operators import BashOperator, PostgresOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime,date, timedelta
import logging
import time
from time import sleep
import psycopg2
from pprint import pprint
default_args = {
'owner' : 'navjot',
'depends_on_past': False,
'start_date': datetime(2016,12,29,23,0),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
#'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('sensesql_v5', default_args=default_args, schedule_interval = '@hourly')
""" This function gives all the tables which are ready to be enriched, List of tables names would be generated using taskt2"""
def get_tablename_enrich():
conn = psycopg2.connect(database='xxx',user=config['user'],password=config['password'],host=config['host'],port = config['port'])
cursor = conn.cursor()
query = """Select fore_table_name from results.forecasting_run_tabt where enrich_status = 'pending';"""
cursor.execute(query)
row = cursor.fetchall()
table_name=[]
for r in row:
table_name.append(r[0])
conn.close()
return table_name
#def testpo():
# value1 = 4
# return value1
#value=4
#def push_function(value):
# return value
""" sensing table tracket in t1"""
t1 = SqlSensor(
task_id='sqlsensing',
poke_interval=10,
timeout = 3200,
sql = """Select * from results.forecasting_run_tabt where enrich_status = 'pending';""",
conn_id = 'postgresterra',
dag=dag
)
"""getting table names with a condition"""
t2 = PythonOperator(
task_id='testpo1',
python_callable=get_tablename_enrich,
dag=dag)
"""run enrichment SQL operations on each table which we got from task t2. There is just on task in this function
we are going to run 11-12 tasks on each table"""
def run_enrich_ontables(*args, **kwargs):
ti = kwargs['ti']
pprint(kwargs)
tablenames = ti.xcom_pull(task_ids='testpo1')
pprint(tablenames)
for i in range(len(tablenames)):
t4 = PostgresOperator(
task_id='testxcom'+str(i),
sql = "update results.forecasting_run_tabt set enrich_status = 'running' where fore_table_name = '{}';".format(ti.xcom_pull(task_ids='testpo1')[i]),
postgres_conn_id = 'postgrestest',
autocommit = True,
dag=dag)
t4.set_upstream(t3)
"""This task is calling function which is enriching tables"""
t3 = PythonOperator(
task_id='run_all_tables',
provide_context=True,
python_callable=run_enrich_ontables,
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t2)
I think
dagin the execution context of yourrun_enrich_ontablesfunction isNone. In order to reference macros in the context within a function passed to aPythonOperatoryou should use thekwargsdict like you did withti. Either addto the top of your function so you can reference
daglater or change your t4 definition to use kwargs directly