SCD Type 2 merge logic resulting in duplicates in destination table

1.2k Views Asked by At

I am trying to implement SCD Type 2 merge logic in big query.

I am doing this in 3 setup approach but every time i run my merge logic script but it seems to push records into my destination table even if there is no new record.

  • Step 1: when records match on natural keys then deactivate active records in destination table
  • Step 2: inserting fresh records into the destination table which are not present earlier
  • Step 3: Inserting updated records from source table into target table (i am getting duplicates here)

Here is my code

CREATE
    OR REPLACE PROCEDURE `processed.sp_merge_example` (job_run_id INT64) OPTIONS (strict_mode = false)

BEGIN
    DECLARE run_id INT64 DEFAULT job_run_id;

    ---checking for natural keys for updates
    MERGE INTO `processed.<Destination table>` AS T
    USING `transient.<Source table>` AS S
        ON T.<Destination table>_fingerprint = S.<Source table>_fingerprint
    WHEN MATCHED
        AND S.type2_checksum != T.type2_checksum
        AND T.current_flg = 'Y'
        THEN
            UPDATE
            ---updating the records in the final table which has updates in the incremental load.
            SET T.modified_datetime = current_datetime(),
                T.modified_by = CAST(run_id AS STRING),
                T.end_datetime = current_datetime(),
                T.current_flg = 'N'
    WHEN NOT MATCHED
        THEN
            ---inserting new records.
            INSERT (
                <Source table columns>
                source_type,
                material_sales_fingerprint,
                type2_checksum,
                start_datetime,
                end_datetime,
                current_flg,
                created_by,
                created_datetime,
                modified_by,
                modified_datetime
                )
            VALUES (
                <Source table values>
                S.source_type,
                S.material_sales_fingerprint,
                S.type2_checksum,
                current_datetime(),
                parse_date('%Y%m%d', '99991231'),
                'Y',
                CAST(run_id AS STRING),
                current_datetime(),
                CAST(run_id AS STRING),
                current_datetime()
                );

    -- insert into newly updated records
    INSERT `processed.dim_material_sales` (
        <Source table columns>
        source_type,
        material_sales_fingerprint,
        type2_checksum,
        start_datetime,
        end_datetime,
        current_flg,
        created_by,
        created_datetime,
        modified_by,
        modified_datetime
        )
    SELECT 
        <Source table columns>
        ST.source_type,
        ST.material_sales_fingerprint,
        ST.type2_checksum,
        current_datetime(),
        parse_date('%Y%m%d', '99991231'),
        'Y',
        CAST(run_id AS STRING),
        current_datetime(),
        CAST(run_id AS STRING),
        current_datetime()
    FROM (
        SELECT x.*
        FROM `transient.<Source table>` x
        JOIN `processed.<Destination table>` y 
        ON x.<Source table>_fingerprint = y.<Source table>_fingerprint
        WHERE x.type2_checksum != y.type2_checksum and y.current_flg = 'N' 
        ) ST
END; 

--This piece of logic at the end is causing duplicates

ON x.<Source table>_fingerprint = y.<Source table>_fingerprint
            WHERE x.type2_checksum != y.type2_checksum and y.current_flg = 'N' 

If i run my merger logic n times with same records in source expectation is that the destination table should have same records but with the above logic the updated record in source is always getting inserted into destination table every time merge logic is ran.i have tested many times not sure where i am going wrong. Can someone please help me understand .

1

There are 1 best solutions below

0
On

Overcame this issue with the following logic

CREATE OR REPLACE PROCEDURE `<stored_proc_name >`(job_run_id INT64) OPTIONS(strict_mode=false)
BEGIN
DECLARE run_id INT64 DEFAULT job_run_id;

-- Inserting updated records coming as part of incremental load to Destination first.
INSERT
  `<destination_table>`(
    <Target_table_columns>,
    tablename_fingerprint,
    type2_checksum,
    start_datetime,
    end_datetime,
    current_flg,
    created_by,
    created_datetime,
    modified_by,
    modified_datetime)

SELECT
   ST.<Target_table_columns>
  current_datetime(),
  cast(parse_timestamp('%Y%m%d%H%M%S', '99991231235959') as datetime),
  'Y',
  CAST(run_id AS STRING),
  current_datetime(),
  CAST(run_id AS STRING),
  current_datetime()
FROM (
  SELECT x.*
  FROM
    `<source_table>` x
  JOIN
    `<destination_table>` y
  ON
    y.<destination_table_fingerprint_column> = x.<source_temp_table_fingerprint_column>
    Where x.type2_checksum != y.type2_checksum and y.current_flg = 'Y') ST;
    
---Merge Process Starts
    
MERGE INTO
  `<destination_table>` AS T
USING
  `<source_table>` AS S
ON
  T.<destination_table_fingerprint_column> = S.<source_temp_table_fingerprint_column>

  -- Updating older version of the records.
  WHEN MATCHED
        AND S.type2_checksum != T.type2_checksum
        AND T.current_flg = 'Y'
        THEN
            UPDATE
            SET T.modified_datetime = current_datetime(),
                T.modified_by = CAST(run_id AS STRING),
                T.end_datetime = current_datetime(),
                T.current_flg = 'N'

    -- Inserting new records from temp to final table
  WHEN NOT MATCHED
  THEN
INSERT
  (
    <Target_table_columns>,
    tablename_fingerprint,
    type2_checksum,
    start_datetime,
    end_datetime,
    current_flg,
    created_by,
    created_datetime,
    modified_by,
    modified_datetime
    )
VALUES
  (
    <S.Target_table_columns>,
    current_datetime(),
    cast(parse_timestamp('%Y%m%d%H%M%S', '99991231235959') as datetime),
    'Y',
    CAST(run_id AS STRING),
    current_datetime(),
    CAST(run_id AS STRING),
    current_datetime()
        );
END;