I'm investigating taking data from Kafka -> SnowFlake/Kafka connector -> SnowFlake. Unfortunately, the connector seems to use just two columns (and put the entire JSON payload into a single column). So I created a stream/task to periodically copy data from the landing table to the destination table (using insert). Everything works beautifully except for deleting data in the landing table once it has landed in the destination table. Using streams, I know what has landed. How do I delete the rest of the data? Truncate seems so much faster. Do I just periodically run a delete task that deletes these entries? I am also concerned about warehouse time to perform these deletes. Thanks
SnowFlake-Kafka connector -> Landing Table -> Destination table. How to cleanup Landing Table
412 Views Asked by Surendar Chandra At
1
There are 1 best solutions below
Related Questions in SNOWFLAKE-TASK
- joining two dataset to get optimal result
- snowflake copy into table from S3
- How to Suspend Child Task
- Execution of SQL Statements inside Stored Procedure in Parallel in SnowFlake DB
- Is it possible in Snowflake to automate a merge?
- Snowflake Tasks multiple schedule
- Correlated Sub Queries in Snowflake
- Passing list of tuples as values inside a cte in Snowflake
- Schedule snowflake task to run once a month
- How to run pySpark with snowflake JDBC connection driver in AWS glue
- Error while loading csv file to Snowflake Table as "Timestamp '9/15/2020 1:28:00 AM' is not recognized"
- Snowflake Alert Long Running Queries
- Snowflake Copy Command Result Scan Inconsistency
- Snowflake IF ELSE Stored Procedure logic
- How to Convert SQL code to Snowflake Code
Related Questions in SNOWFLAKE-CLOUD-DATA-PLATFORM
- Snowflake subquery
- Error in granting ownership in snowflake tables
- Snowflake - Performance when column size is not specified
- snowflake json lateral subquery
- Looking to either Explode or unnest into an array in Snowflake SQL
- I am getting a Pipe Notifications bind failure
- How does run queue work in Snowflake? Is there a concept timeslice at all?
- TO_CHAR and SSSS (hours past midnight)
- Snowflake warehouse cache
- Power bi snowflake Default_role setting
- Error when installing `snowflake-connector-python` to GCP Cloud Composer
- How to Restart & Run All code if there is a Key Error during a ! pip install in Google Colab?
- Count number of records based on last updated date + null
- Task without Virtual Warehouse: Is the query failing or the task not starting?
- Need to include the offset value as expr in LAG functions
Related Questions in SNOWFLAKE-PIPE
- Snowflake ON_ERROR=CONTINUE abort the COPY command for file
- PySpark Streaming, when writing producing error
- How to get Data from a Mysql Database to Snowflake
- Snowflake Alert Long Running Queries
- Access to Snowflake Internal Stage for Non Owner Role
- increase snowpipe speed to process 1 record per file
- snowflake drop all pipes like pattern
- How to call SQL scripts from snowpipe
- How to build a data catalog in Glue for Snowflake?
- Snowflake Validate Option does not return Failed records When using TO_DATE function in Copy Command
- Is there a way to reduce the time for the snowpipe load time?
- How do we stream a data pipeline for data transfer from snowflake to kafka?
- Can we have task start condition dependent on Sucess condition of PIPE in SNOWFLAKE
- Snowflake Data Pipeline problems - in particular stream issue
- Snowflake pass dynamic value binding and constant value
Trending Questions
- UIImageView Frame Doesn't Reflect Constraints
- Is it possible to use adb commands to click on a view by finding its ID?
- How to create a new web character symbol recognizable by html/javascript?
- Why isn't my CSS3 animation smooth in Google Chrome (but very smooth on other browsers)?
- Heap Gives Page Fault
- Connect ffmpeg to Visual Studio 2008
- Both Object- and ValueAnimator jumps when Duration is set above API LvL 24
- How to avoid default initialization of objects in std::vector?
- second argument of the command line arguments in a format other than char** argv or char* argv[]
- How to improve efficiency of algorithm which generates next lexicographic permutation?
- Navigating to the another actvity app getting crash in android
- How to read the particular message format in android and store in sqlite database?
- Resetting inventory status after order is cancelled
- Efficiently compute powers of X in SSE/AVX
- Insert into an external database using ajax and php : POST 500 (Internal Server Error)
Popular Questions
- How do I undo the most recent local commits in Git?
- How can I remove a specific item from an array in JavaScript?
- How do I delete a Git branch locally and remotely?
- Find all files containing a specific text (string) on Linux?
- How do I revert a Git repository to a previous commit?
- How do I create an HTML button that acts like a link?
- How do I check out a remote Git branch?
- How do I force "git pull" to overwrite local files?
- How do I list all files of a directory?
- How to check whether a string contains a substring in JavaScript?
- How do I redirect to another webpage?
- How can I iterate over rows in a Pandas DataFrame?
- How do I convert a String to an int in Java?
- Does Python have a string 'contains' substring method?
- How do I check if a string contains a specific word?
For a use case where multiple statements(like insert, delete, etc) to access the same change records, surround them in explicit transaction statement(Begin..Commit) which will lock stream.
You can have an additional column like a Flag, lock stream using Begin, use the stream to insert to target table from staging, use stream to perform a second merge to staging table to mark the column Flag.
https://docs.snowflake.com/en/user-guide/streams.html#examples