I want logging in the POSTGRES-logging-tables. And to load data from the MSSQL-table to the POSTGRES-table.
After inserting a row into the POSTGRES-logging-tables, I get the ID - log.t_date.i_dt_log_key I need to add log.t_date.i_dt_log_key as an additional column to the MSSQL-table How to do it?
Now I have a pipeline, but logging is configured badly. I get the ID - log.t_data.i_dt_log_key, but can not add this value to column MSSQL-table
/*1*/
--Source.MSSQL-table
IF OBJECT_ID(N'[dbo].[tCtgBrand]', N'u') IS NOT NULL DROP TABLE [dbo].[tCtgBrand]
GO
CREATE TABLE [dbo].[tCtgBrand] (
[b16BrandId] BINARY(16)
,[bDeletedFlag] BIT
,[nvch50Brand] NVARCHAR(50)
)
--Destination.POSTGRES-table
DROP TABLE IF EXISTS "stg"."t_utd_ctg_brand";
CREATE TABLE "stg"."t_utd_ctg_brand" (
i_dt_log_key INT NOT NULL,
,"bt_brand_id" BYTEA
,"b_deleted_flag" BOOLEAN
,"txt_brand" TEXT
);
--POSTGRES-logging-tables
DROP TABLE IF EXISTS log.t_date;
CREATE TABLE log.t_date(
i_dt_log_key INT GENERATED ALWAYS AS IDENTITY NOT NULL,
tstz_dt_log TIMESTAMP(3) WITH TIME ZONE NOT NULL,
i_dt_log INT NOT NULL
) ;
DROP TABLE IF EXISTS log.t_log;
CREATE TABLE log.t_log(
i_dt_log_key INT NOT NULL,
txt_table_name TEXT NULL
) ;
--POSTGRES-logging-procedure
CREATE OR REPLACE PROCEDURE log.sp_gen_log_data(
txt_table_name text,
tstz_dt timestamp with time zone)
LANGUAGE 'plpgsql'
AS $BODY$
DECLARE
i_dt INTEGER := EXTRACT(YEAR FROM tstz_dt)*10000 + EXTRACT(MONTH FROM tstz_dt)*100 + EXTRACT(DAY FROM tstz_dt);
i INTEGER := -1;
BEGIN
INSERT INTO log.t_date (tstz_dt_log, i_dt_log) VALUES (tstz_dt, i_dt);
i := COALESCE((SELECT i_dt_log_key FROM log.t_date WHERE tstz_dt_log = tstz_dt), -1);
INSERT INTO log.t_log (i_dt_log_key, txt_table_name) VALUES (i, txt_table_name);
END;
$BODY$;
import datetime as dt
from airflow.decorators import dag
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.postgres.operators.postgres import PostgresOperator
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2023, 5, 1),
'tags': 'Vasya',
}
@dag(
default_args=default_args,
schedule_interval='@daily',
dag_id='brand',
catchup=False,
)
def brand():
log = PostgresOperator(
task_id='log_brand',
postgres_conn_id='dwh',
sql='sql/sql_log.sql',
params={
'table_name': 'stg.t_utd_ctg_brand1',
'dt_log_key': -1
}
)
@task
def truncate():
pg = PostgresOperator(
task_id='truncate',
postgres_conn_id='dwh',
sql='truncate table "stg"."t_utd_ctg_brand";',
autocommit=False)
pg.execute(dict())
@task
def df():
src = MsSqlHook(mssql_conn_id='upp')
dest = PostgresHook(postgres_conn_id='dwh')
src_conn = src.get_conn()
cursor = src_conn.cursor()
cursor.execute("SELECT * FROM [dbo].[tCtgBrand]")
dest.insert_rows(table="stg.t_utd_ctg_brand", rows=cursor)
execute_truncate = truncate()
execute_df = df()
log >> execute_truncate >> execute_df
main_dag = brand()
/*2 sql/sql_log.sql*/
DO $$
DECLARE
tstz_dt TIMESTAMP(3) WITH TIME ZONE := CURRENT_TIMESTAMP;
txt_table_name TEXT := '{{ params.table_name }}';
i_dt_log INTEGER;
BEGIN
CALL log.sp_gen_log_data (txt_table_name, tstz_dt);
i_dt_log := (
SELECT i_dt_log_key
FROM log.t_date
WHERE tstz_dt_log = tstz_dt
);
END
$$;