Databricks can't "rescue" data from Parquet using schemaEvolutionMode="rescue", raises an error instead

625 Views Asked by At

I have parquet files with evolving schema, I need to load all of them into single Delta Table. My goal is to use Autoloader and schemaEvolutionMode="rescue" (so all fields from the source which are not aligned with the target schema should fall into "_rescued_data" column). I also provide .schema(target_schema) for autoloader. But when I read from some files I get this error:

Invalid Spark read type: expected optional group my_column (LIST) 
{ repeated group list { optional binary element (STRING); } } 
to be list but found Some(StringType)

my_column has data type String in the target table.

So why it was not loaded into _rescued_data column and raised the error instead?

The code which I'm using:

read_options = {
"cloudFiles.format": "parquet",
"cloudFiles.schemaLocation: "some location",
"cloudFiles.schemaEvolutionMode": "rescue"
}
 
spark.readStream.format("cloudFiles")
.options(**read_options)
.schema(target_schema)
.load("source_path")
.foreachBatch(<save function>)
.outputMode("append")
.trigger("availableNow", True)
.start()

Databricks version is 13.2 (Spark 3.4.0, Scala 2.12)

2

There are 2 best solutions below

0
archjkeee On BEST ANSWER

The reason of the error was that "rescued_data" column was cleared after reading the dataframe (before writing).

0
DileeprajnarayanThumula On

The error message you are getting indicates that a column in your Parquet file doesn't match the expected schema, and it is not falling into the "_rescued_data" column as expected.

For example I have used a CSV file with schema evolution when using Delta Lake with Autoloader in Spark Streaming job.

The below code performs Autoloader with schema inference and schema evolution mode "rescue"

In a scenario where you do not want stop the stream. New columns have to be rescued columns and later apply transformation

df = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "rescue") 
.option("cloudFiles.schemaLocation", schema_loc)
.load(source_location)
)
query = (df.writeStream
.format("delta")
.option("checkpointLocation", checkpoints_loc)
.outputMode("overwrite")
.option("mergeSchema", "true")
.start(target_location)
)

To Enable schema inference

.option("cloudFiles.inferColumnTypes", "true")

Schema evolution mode

.option("cloudFiles.schemaEvolutionMode", "rescue")

schema evolution

.option("mergeSchema", "true")

enter image description here

In a scenario where you do not want stop the stream. New cols have to rescued col and later apply transformation

SparkSession.builder.appName("DeltaWithAutomaticRetries").config("spark.databricks.delta.schema.autoMerge.enabled", "true").getOrCreate()

The above code will enable automatic retries for schema evolution in Delta Lake

Know more How does Auto Loader schema evolution work?