We have the following merge-to-delta function. The merge
function ensures we update the record appropriately based on certain conditions. So, in the function usage, you can see we define the merge condition and pass it into the function.
This function is currently used in Batch-processing, we run this once a day to process files.
# Define merge function
def MergeToDelta(df, path, merge_target_alias, merge_source_alias, merge_conditions):
delta_table = DeltaTable.forPath(spark, path)
delta_table.alias(merge_target_alias) \
.merge(
df.alias(merge_source_alias),
merge_conditions) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# Define merge conditions
merge_target_alias = 'target'
merge_source_alias = 'source'
merge_conditions = ('source.Id = target.Id AND ' +
'source.Name = target.Name AND ' +
'source.School = target.School AND ' +
'source.Age = target.Age AND ' +
'source.DepartmentId = target.DepartmentId AND ' +
'source.BirthDate = target.BirthDate AND ' +
'source.CallId = target.CallId')
some_schema = ''
some_path = ''
raw_df = (spark.read.schema(some_schema).json(some_path))
delta_data_path = '/mnt/students'
# Usage
MergeToDelta(raw_df, delta_data_path, merge_target_alias, merge_source_alias, merge_conditions)
With AutoLoader / Streaming, we use writeStream
function and I don't see a way to pass in a merge condition like we do in Batch-processing. The following is an example.
raw_df = (spark.readStream
.format("cloudFiles")
.schema(file_schema)
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", autoloader_checkpoint_path)
.load(path))
raw_df = (raw_df
.withColumn('Id', lit(id))
.withColumn('PartitionDate', to_date(col('BirthDate'))))
raw_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", writestream_checkpoint_path) \
#.partitionBy(*partition_columns)
.start(delta_data_path)
Maybe this is just my misunderstanding of how Streaming works in Databricks / Delta Live Table, but is there a way to specify merge condition when writing the stream to Delta Lake?
You'll want to use the forEachBatch writer, example Python code below: