Airflow. Pass the result of the previous Task Flow to the next Task Flow

270 Views Asked by At

I want logging in the POSTGRES-logging-tables. And to load data from the MSSQL-table to the POSTGRES-table.

After inserting a row into the POSTGRES-logging-tables, I get the ID - log.t_date.i_dt_log_key I need to add log.t_date.i_dt_log_key as an additional column to the MSSQL-table How to do it?

Now I have a pipeline, but logging is configured badly. I get the ID - log.t_data.i_dt_log_key, but can not add this value to column MSSQL-table

/*1*/
--Source.MSSQL-table
IF OBJECT_ID(N'[dbo].[tCtgBrand]', N'u') IS NOT NULL DROP TABLE [dbo].[tCtgBrand]
GO
CREATE TABLE [dbo].[tCtgBrand] (
    [b16BrandId] BINARY(16)
    ,[bDeletedFlag] BIT
    ,[nvch50Brand] NVARCHAR(50)
)
--Destination.POSTGRES-table
DROP TABLE IF EXISTS "stg"."t_utd_ctg_brand";
CREATE TABLE "stg"."t_utd_ctg_brand" (
    i_dt_log_key INT NOT NULL,
    ,"bt_brand_id" BYTEA
    ,"b_deleted_flag" BOOLEAN
    ,"txt_brand" TEXT
);
--POSTGRES-logging-tables
DROP TABLE IF EXISTS log.t_date;
CREATE TABLE log.t_date(
    i_dt_log_key INT GENERATED ALWAYS AS IDENTITY NOT NULL,
    tstz_dt_log TIMESTAMP(3) WITH TIME ZONE NOT NULL,
    i_dt_log INT NOT NULL
) ;
DROP TABLE IF EXISTS log.t_log;
CREATE TABLE log.t_log(
    i_dt_log_key INT NOT NULL,
    txt_table_name TEXT NULL
) ;
--POSTGRES-logging-procedure
CREATE OR REPLACE PROCEDURE log.sp_gen_log_data(
    txt_table_name text,
    tstz_dt timestamp with time zone)
LANGUAGE 'plpgsql'
AS $BODY$
    DECLARE 
        i_dt INTEGER := EXTRACT(YEAR FROM tstz_dt)*10000 + EXTRACT(MONTH FROM tstz_dt)*100 + EXTRACT(DAY FROM tstz_dt);
        i INTEGER := -1;
BEGIN
    INSERT INTO log.t_date (tstz_dt_log, i_dt_log) VALUES (tstz_dt, i_dt);
    i := COALESCE((SELECT   i_dt_log_key FROM log.t_date WHERE tstz_dt_log = tstz_dt), -1);
    INSERT INTO log.t_log (i_dt_log_key, txt_table_name) VALUES (i, txt_table_name);
END;
$BODY$;
import datetime as dt
from airflow.decorators import dag
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2023, 5, 1),
'tags': 'Vasya',
}
@dag(
default_args=default_args,
schedule_interval='@daily',
dag_id='brand',
catchup=False,
)
def brand():

    log = PostgresOperator(
            task_id='log_brand',
            postgres_conn_id='dwh',
            sql='sql/sql_log.sql',
            params={
                'table_name': 'stg.t_utd_ctg_brand1',
                'dt_log_key': -1
            }
        )
    
    @task
    def truncate():
        pg = PostgresOperator(
            task_id='truncate',
            postgres_conn_id='dwh',
            sql='truncate table "stg"."t_utd_ctg_brand";',
            autocommit=False)
        pg.execute(dict())
    
    @task
    def df():
        src = MsSqlHook(mssql_conn_id='upp')
        dest = PostgresHook(postgres_conn_id='dwh')
        src_conn = src.get_conn()
        cursor = src_conn.cursor()
        cursor.execute("SELECT * FROM [dbo].[tCtgBrand]")
        dest.insert_rows(table="stg.t_utd_ctg_brand", rows=cursor)
    
    execute_truncate = truncate()
    execute_df = df()
    log >> execute_truncate >> execute_df
    
    main_dag = brand()
/*2 sql/sql_log.sql*/

DO $$
DECLARE
    tstz_dt TIMESTAMP(3) WITH TIME ZONE := CURRENT_TIMESTAMP;
    txt_table_name TEXT := '{{ params.table_name }}';
    i_dt_log INTEGER;
BEGIN
    CALL log.sp_gen_log_data (txt_table_name, tstz_dt);
    i_dt_log :=     (
            SELECT  i_dt_log_key
            FROM    log.t_date
            WHERE   tstz_dt_log = tstz_dt
        );

END
$$;
0

There are 0 best solutions below