Databricks Auto Loader with Merge Condition

1.4k Views Asked by At

We have the following merge-to-delta function. The merge​ function ensures we update the record appropriately based on certain conditions. So, in the function usage, you can see we define the merge condition and pass it into the function. This function is currently used in Batch-processing, we run this once a day to process files.

# Define merge function
def MergeToDelta(df, path, merge_target_alias, merge_source_alias, merge_conditions):
    delta_table = DeltaTable.forPath(spark, path)
    delta_table.alias(merge_target_alias) \
        .merge(
            df.alias(merge_source_alias),
            merge_conditions) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()

# Define merge conditions
merge_target_alias = 'target'
merge_source_alias = 'source'
merge_conditions = ('source.Id = target.Id AND ' +
                    'source.Name = target.Name AND ' +
                    'source.School = target.School AND ' +
                    'source.Age = target.Age AND ' +
                    'source.DepartmentId = target.DepartmentId AND ' +
                    'source.BirthDate = target.BirthDate AND ' +
                    'source.CallId = target.CallId')

some_schema = ''
some_path = ''
raw_df = (spark.read.schema(some_schema).json(some_path))
delta_data_path = '/mnt/students'

# Usage
MergeToDelta(raw_df, delta_data_path, merge_target_alias, merge_source_alias, merge_conditions)

With AutoLoader / Streaming, we use writeStream​ function and I don't see a way to pass in a merge condition like we do in Batch-processing. The following is an example.

raw_df = (spark.readStream
    .format("cloudFiles")
    .schema(file_schema)
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", autoloader_checkpoint_path)
    .load(path))

raw_df = (raw_df
    .withColumn('Id', lit(id))
    .withColumn('PartitionDate', to_date(col('BirthDate'))))

raw_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", writestream_checkpoint_path) \
    #.partitionBy(*partition_columns)
    .start(delta_data_path)

Maybe this is just my misunderstanding of how Streaming works in Databricks / Delta Live Table, but is there a way to specify merge condition when writing the stream to Delta Lake?

1

There are 1 best solutions below

0
On

You'll want to use the forEachBatch writer, example Python code below:

def foreach_batch_function(df, epoch_id):
    pass # MERGE HERE

  
raw_df.writeStream.foreachBatch(foreach_batch_function).start()