I currently use a python operator to extract values from athena to s3, then run queries on s3, etc. But I'm currently migrating to use the proper Athena Operators, and I'm having a bit of a problem with parameters.
The way I do it with the Python Operator is like this:
def extractor(**kwargs):
try:
etl = ETL(kwargs['dag_run'].conf.get('redshift_secretid'))
etl.extractor(kwargs['base'],kwargs['tabela'])
except Exception as e:
raise e
table_name_task = PythonOperator(
task_id='table_name_task',
python_callable=extractor,
op_kwargs={
'base': 'database',
'tabela': 'table_name'},
provide_context=True)
This calls a function on a class that I created to get the queries I use on a py file. I use a lot of queries, so I store them there and use the arguments "base" and "table" to find the query, the database, the s3 path, etc. All of that is stored in that file that I call objects.py
My problem is: How can I pass the values in a similar way to the Athena Operator like I did with the python operator? I tried:
def getQuery(**kwargs):
try:
etl = ETL(kwargs['dag_run'].conf.get('redshift_secretid'))
query, path, connection = etl.extractor(kwargs['base'], kwargs['tabela'], kwargs['connection'])
return query, path, connection
except Exception as e:
raise e
with DAG(
dag_id="athena",
schedule_interval=None,
start_date=datetime(2018, 11, 1),
catchup=False,
params={"redshift_secretid": config.redshift_secretid},
tags=['raw_extractor']
) as dag:
#DUMMIES
start = DummyOperator(task_id='START')
copy = DummyOperator(task_id='COPY')
end = DummyOperator(task_id='END')
table_name = AWSAthenaOperator(
task_id="table_name",
query, output_location, database = getQuery('table_name', 'database', 'connection') <- the problem
)
I'm trying to learn and apply best practices for my airflow dags, but I wasn't able to find a solution for this problem.