I am using the Snowflake Kafka Sink Connector to ingest data from Debezium into a Snowflake table. I have created a Stream and a Task on this table. As the data from Kafka lands into the source table, the stream gets populated and the task runs a MERGE
command to write the data into a final table.
However, as the stream has grown moderately large with about 50 million rows, the task fails to run to completion and times out.
To address this, I have tried the following:
- Increase the timeout of the task from 1 hour to 24 hours.
- Increase the warehouse size to Medium.
The task still doesn't finish after 24 hours and times out.
Is it the case that ingesting 50M rows requires an even larger warehouse to ingest these rows? How do I get the task to run to completion?
MERGE
statement
MERGE INTO TARGET.MESSAGE AS P
USING (SELECT RECORD_CONTENT:payload:before.id::VARCHAR AS BEFORE_ID,
RECORD_CONTENT:payload:before.agency_id::VARCHAR AS BEFORE_AGENCY_ID,
RECORD_CONTENT:payload:after.id::VARCHAR AS AFTER_ID,
RECORD_CONTENT:payload:after.agency_id::VARCHAR AS AFTER_AGENCY_ID,
RECORD_CONTENT:payload:after::VARIANT AS PAYLOAD,
RECORD_CONTENT:payload:source.ts_ms::INT AS TS_MS,
RECORD_CONTENT:payload:op::VARCHAR AS OP
FROM RAW.MESSAGE_STREAM
QUALIFY ROW_NUMBER() OVER (
PARTITION BY COALESCE(AFTER_ID, BEFORE_ID), COALESCE(AFTER_AGENCY_ID, BEFORE_AGENCY_ID)
ORDER BY TS_MS DESC
) = 1) PS ON (P.ID = PS.AFTER_ID AND P.AGENCY_ID = PS.AFTER_AGENCY_ID) OR
(P.ID = PS.BEFORE_ID AND P.AGENCY_ID = PS.BEFORE_AGENCY_ID)
WHEN MATCHED AND PS.OP = 'd' THEN DELETE
WHEN MATCHED AND PS.OP IN ('u', 'r') THEN UPDATE SET P.PAYLOAD = PS.PAYLOAD, P.TS_MS = PS.TS_MS
WHEN NOT MATCHED AND PS.OP IN ('c', 'r', 'u') THEN INSERT (P.ID, P.AGENCY_ID, P.PAYLOAD, P.TS_MS) VALUES (PS.AFTER_ID, PS.AFTER_AGENCY_ID, PS.PAYLOAD, PS.TS_MS);
EXPLAIN
Plan
GlobalStats:
partitionsTotal=742
partitionsAssigned=742
bytesAssigned=3596441600
Operations:
1:0 ->Result number of rows inserted, number of rows updated, number of rows deleted
1:1 ->WindowFunction ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST)
1:2 ->LeftOuterJoin joinFilter: ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id'))))) OR ((P.ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id')))) AND (P.AGENCY_ID = (TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id')))))
1:3 ->Filter ROW_NUMBER() OVER (PARTITION BY IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'id'))), IFNULL(TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'after'), 'agency_id')), TO_CHAR(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'before'), 'agency_id'))) ORDER BY TO_NUMBER(GET(GET(GET(UNION_ALL(CHANGES."a_RECORD_CONTENT", CHANGES."d_RECORD_CONTENT"), 'payload'), 'source'), 'ts_ms')) DESC NULLS FIRST) = 1
1:4 ->UnionAll
1:5 ->Filter CHANGES.A_METADATA$ACTION IS NOT NULL
1:6 ->WithReference
1:7 ->WithClause CHANGES
1:8 ->Filter (A.METADATA$SHORTNAME IS NULL) OR (D.METADATA$SHORTNAME IS NULL) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_METADATA, SCAN_FDN_FILES.RECORD_METADATA))) OR (NOT(EQUAL_NULL(SCAN_FDN_FILES.RECORD_CONTENT, SCAN_FDN_FILES.RECORD_CONTENT)))
1:9 ->FullOuterJoin joinKey: (D.METADATA$ROW_ID = A.METADATA$ROW_ID) AND (D.METADATA$SHORTNAME = A.METADATA$SHORTNAME)
1:10 ->TableScan DATABASE.RAW.MESSAGE as SCAN_FDN_FILES METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER {partitionsTotal=17, partitionsAssigned=17, bytesAssigned=20623360}
1:11 ->TableScan DATABASE.RAW.MESSAGE as SCAN_FDN_FILES METADATA$PARTITION_ROW_NUMBER, METADATA$PARTITION_NAME, RECORD_METADATA, RECORD_CONTENT, METADATA$ORIGINAL_PARTITION_NAME, METADATA$ORIGINAL_PARTITION_ROW_NUMBER {partitionsTotal=507, partitionsAssigned=507, bytesAssigned=3519694336}
1:12 ->Filter CHANGES.D_METADATA$ACTION IS NOT NULL
1:13 ->WithReference
1:14 ->TableScan DATABASE.TARGET.MESSAGE as P ID, AGENCY_ID {partitionsTotal=218, partitionsAssigned=218, bytesAssigned=56123904}
I have re-jiggled you SQL just so it's more readable to me..
I don't see anything super horrible here. I push the COALESCE of the two values used in the QUALIFY into the SELECT just so I can read it simpler.
But looking the ON logic, you are prepared to match before's if after's don't match, but mixing that with the COALESCE logic are the two after values both null at the same time? aka if
after_id
is null,after_agency_id
will also be null. Because then if ALSO you don't want to care about check "before" if the "after" are not null but don't match. Then you could use:albeit, you might want to name them better then. That should improve it a simdge.
Back to the JOIN logic, another reason I think the above might apply is you are grouping/partitioning the ROW_NUMBER by the after values if present, which implies if you had values with the same after values, and different before values, due to the current ROW_NUMBER the later might be getting thrown away.
But otherwise it doesn't look like it's doing anything "truely bad" at which point you might want to run a 4-8 times bigger warehouse, and let it for 24/8 hours and see if it completes in 10% extra time. The cost of the bigger warehouse should be offset in the small real clock time.
Silly Idea:
on the smaller data set you mention, try the SQL made real nice and simple:
and with the joins as I suspect would work for you data.. on cloned tables, just to see how the performance impact is:
Another thing to try to "get thought the backlog"
Split the task into to steps, just for now, make a temp table that is the first half:
then merge that into your main table.
which will give you an idea "where the problem is" the first or the second operation. and it will also let you merge into clones, and test if the equi join version runs faster, and the results are the same.