How can I pass values for operator parameters from a python function?

62 Views Asked by At

I currently use a python operator to extract values from athena to s3, then run queries on s3, etc. But I'm currently migrating to use the proper Athena Operators, and I'm having a bit of a problem with parameters.

The way I do it with the Python Operator is like this:

def extractor(**kwargs):
try:
    etl = ETL(kwargs['dag_run'].conf.get('redshift_secretid'))
    etl.extractor(kwargs['base'],kwargs['tabela'])
except Exception as e:
    raise e

table_name_task = PythonOperator(
    task_id='table_name_task', 
    python_callable=extractor, 
    op_kwargs={
        'base': 'database', 
        'tabela': 'table_name'}, 
    provide_context=True)

This calls a function on a class that I created to get the queries I use on a py file. I use a lot of queries, so I store them there and use the arguments "base" and "table" to find the query, the database, the s3 path, etc. All of that is stored in that file that I call objects.py

My problem is: How can I pass the values in a similar way to the Athena Operator like I did with the python operator? I tried:

def getQuery(**kwargs):
try:
    etl = ETL(kwargs['dag_run'].conf.get('redshift_secretid'))
    query, path, connection = etl.extractor(kwargs['base'], kwargs['tabela'], kwargs['connection'])

    return query, path, connection

except Exception as e:
    raise e

with DAG(
    dag_id="athena",
    schedule_interval=None,
    start_date=datetime(2018, 11, 1),
    catchup=False,
    params={"redshift_secretid": config.redshift_secretid},
    tags=['raw_extractor']
) as dag:

    #DUMMIES
    start = DummyOperator(task_id='START')
    copy = DummyOperator(task_id='COPY')
    end = DummyOperator(task_id='END')

    table_name = AWSAthenaOperator(
            task_id="table_name",
            query, output_location, database = getQuery('table_name', 'database', 'connection') <- the problem
    )

I'm trying to learn and apply best practices for my airflow dags, but I wasn't able to find a solution for this problem.

0

There are 0 best solutions below