Pass Arguments from Airflow-default_args to Snowflake Query in KubernetesPodOperator

44 Views Asked by At

We have a DAG in Airflow that extracts data from a database and writes it to a file.

The DAG uses a KubernetesPodOperator task which calls another repository that has all the queries in .sql files. The query has all the parameters hard coded but now the requirement is to pass the arguments when triggering the DAG to make it more useful. We will replace the hard coded values with the ones passed by the user.

The problem is that we cannot access the passed arguments in the query.

Here is how we set the params in the default_args in the DAG:

default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2020, 10, 30),
        'retries': 2,
        'retry_delay': timedelta(minutes=2),
        'params': {
            "has_limit": "True",
            "sponsor_list": ""
        }
    }

How we pass the arguments:

enter image description here

And this is the query in the file used by the task. As you can see, we've tried different ways to retrieve the arguments passed but, no success:

DECLARE
    res RESULTSET;
    has_limit BOOLEAN DEFAULT TRUE;
    sponsor_list VARCHAR DEFAULT '';
    error_message VARCHAR DEFAULT '';
BEGIN
    -- We tried this retrieve the arguments passed:
    has_limit := $has_limit;
    sponsor_list := $sponsor_list;
    -- And this:
    has_limit := '$has_limit';
    sponsor_list := '$sponsor_list';
    -- And this:
    has_limit := {{$params.has_limit}};
    sponsor_list := {{$params.sponsor_list}};
    -- And this:
    has_limit := {$params.has_limit};
    sponsor_list := {$params.sponsor_list};
    -- And this:
    has_limit := $params.has_limit;
    sponsor_list := $params.sponsor_list;
    -- And this:
    has_limit := GETVARIABLE($PARAMS.HAS_LIMIT);
    sponsor_list := GETVARIABLE($PARAMS.SPONSOR_LIST);
    -- And this:
    has_limit := GETVARIABLE(PARAMS.HAS_LIMIT);
    sponsor_list := GETVARIABLE(PARAMS.SPONSOR_LIST);
    -- And this:
    has_limit := GETVARIABLE(HAS_LIMIT);
    sponsor_list := GETVARIABLE(SPONSOR_LIST);
    -- And this:
    has_limit := GETVARIABLE(HAS_LIMIT::boolean);
    sponsor_list := GETVARIABLE(SPONSOR_LIST);
    
    IF (has_limit) THEN
        res := 
        (
            select...rest of the query
        
            limit 1000
        );
        RETURN TABLE(res);
    ELSE
        IF (LENGTH(TRIM(sponsor_list)) > 0) THEN
            res := 
            (
                select...rest of the query
            );
            RETURN TABLE(res);
        ELSE
            error_message:= 'Error: The sponsor list is empty.';
            RETURN error_message;
        END IF;
    END IF;
END;

How can we retrieve the arguments passed by default_args or by the user when triggering the DAG? Thanks in advance.

0

There are 0 best solutions below