Let's consider a streaming process that reads from a table:
df_vbronze_google_similar_apps = spark.readStream.table('${var.database}.bronze_google_similar_apps').\
createOrReplaceTempView('live_bronze_google_similar_apps')
This process is triggered time after time and does not run continuously. Defining it as a view, allows me to use the streaming data frame in queries:
stream_query = spark.sql("""
SELECT
*
FROM live_bronze_google_similar_apps as source
LATERAL VIEW json_tuple( `data`
,'title'
,'description'
,'descriptionHTML'
) AS title
,description
,descriptionHTML
""")
Now I can do some stuff with the streaming query data frame:
def update_function(batch, epoch):
destination_table = DeltaTable.forName(spark, "${var.database}.silver_google_app_data").alias("D")
source_table = batch.alias("S")
destination_table.merge(
source=source_table
,condition="S.app_id = D.app_id"
)\
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()
stream_query.\
writeStream.\
option("checkpointLocation", "/apps/status/sparkstreaming/silver_google_app_data").\
trigger(once=True).\
foreachBatch(update_function).\
start()
As soon as the source table gets new data, only the new information is processed at the next trigger, and this is because of the checkpoint. Now, let's assume that, for unknown reasons, the current checkpoint is no longer usable. How can I restart the streaming from a specific point? In the case of Kafka, I would store the offset elsewhere, so that, I can resume the streaming from the data not yet processed. But how do I get something similar when streaming from a table?