I am trying to implement SCD Type 2 merge logic in big query.
I am doing this in 3 setup approach but every time i run my merge logic script but it seems to push records into my destination table even if there is no new record.
- Step 1: when records match on natural keys then deactivate active records in destination table
- Step 2: inserting fresh records into the destination table which are not present earlier
- Step 3: Inserting updated records from source table into target table (i am getting duplicates here)
Here is my code
CREATE
OR REPLACE PROCEDURE `processed.sp_merge_example` (job_run_id INT64) OPTIONS (strict_mode = false)
BEGIN
DECLARE run_id INT64 DEFAULT job_run_id;
---checking for natural keys for updates
MERGE INTO `processed.<Destination table>` AS T
USING `transient.<Source table>` AS S
ON T.<Destination table>_fingerprint = S.<Source table>_fingerprint
WHEN MATCHED
AND S.type2_checksum != T.type2_checksum
AND T.current_flg = 'Y'
THEN
UPDATE
---updating the records in the final table which has updates in the incremental load.
SET T.modified_datetime = current_datetime(),
T.modified_by = CAST(run_id AS STRING),
T.end_datetime = current_datetime(),
T.current_flg = 'N'
WHEN NOT MATCHED
THEN
---inserting new records.
INSERT (
<Source table columns>
source_type,
material_sales_fingerprint,
type2_checksum,
start_datetime,
end_datetime,
current_flg,
created_by,
created_datetime,
modified_by,
modified_datetime
)
VALUES (
<Source table values>
S.source_type,
S.material_sales_fingerprint,
S.type2_checksum,
current_datetime(),
parse_date('%Y%m%d', '99991231'),
'Y',
CAST(run_id AS STRING),
current_datetime(),
CAST(run_id AS STRING),
current_datetime()
);
-- insert into newly updated records
INSERT `processed.dim_material_sales` (
<Source table columns>
source_type,
material_sales_fingerprint,
type2_checksum,
start_datetime,
end_datetime,
current_flg,
created_by,
created_datetime,
modified_by,
modified_datetime
)
SELECT
<Source table columns>
ST.source_type,
ST.material_sales_fingerprint,
ST.type2_checksum,
current_datetime(),
parse_date('%Y%m%d', '99991231'),
'Y',
CAST(run_id AS STRING),
current_datetime(),
CAST(run_id AS STRING),
current_datetime()
FROM (
SELECT x.*
FROM `transient.<Source table>` x
JOIN `processed.<Destination table>` y
ON x.<Source table>_fingerprint = y.<Source table>_fingerprint
WHERE x.type2_checksum != y.type2_checksum and y.current_flg = 'N'
) ST
END;
--This piece of logic at the end is causing duplicates
ON x.<Source table>_fingerprint = y.<Source table>_fingerprint
WHERE x.type2_checksum != y.type2_checksum and y.current_flg = 'N'
If i run my merger logic n times with same records in source expectation is that the destination table should have same records but with the above logic the updated record in source is always getting inserted into destination table every time merge logic is ran.i have tested many times not sure where i am going wrong. Can someone please help me understand .
Overcame this issue with the following logic