I am using the from_avro function in PySpark to read data from Kafka in the Avro format and utilizing a schema registered within a schema registry. However, I am encountering an issue where the schema registry does not appropriately account for schema changes during batch processing. Consequently, it consistently utilizes the latest schema when the streaming job commences, but it fails to consider any schema changes that might occur in between streaming jobs. In an ideal scenario, the schema registry should take into account the schema ID specified in the first 5 bytes to ensure accurate schema resolution.
data_df = (
spark.readStream.format("kafka")
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("kafka.security.protocol", "SSL")
.option("kafka.bootstrap.servers", servers_details)
.option("kafka.ssl.truststore.location", location)
.option("kafka.ssl.truststore.password", pwd)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "false")
.option("maxOffsetsPerTrigger", 30)
.option("subscribe", name)
.load()
)
transform_df = (
df.withColumn(
"record",
from_avro(
fn.col("value"),
schemaRegistryAddress="http://schema-registry.com",
subject=f"{topic_name}-value",
),
)
.withColumn("schema_id", function_convert(fn.expr("substring(value, 2, 4)")))
.select("schema_id", fn.col("record"))
)
display(transform_df)
I tried options from from_avro but it seems its not working
transform_df = df.withColumn(
"record",
from_avro(
fn.col("value"),
options={"confluent.value.schema.validation": "true"},
schemaRegistryAddress="http://schema-registry.com",
subject=f"{topic_name}-value",
),
).select(fn.col("record").alias("RECORD_CONTENT"))
It looks like you're using the
from_avrofunction in PySpark to read Avro data from Kafka and facing issues with schema changes not being considered during batch processing. Unfortunately, thefrom_avrofunction in PySpark doesn't directly provide an option to specify the schema ID for schema resolution using the first 5 bytes. Theconfluent.value.schema.validationoption you tried doesn't address this specific scenario.However, you can work around this issue by manually resolving the Avro schema using the schema registry and the schema ID present in the first 5 bytes of the Avro data. Here's a possible approach to achieve this:
In this approach, we define a UDF (
parse_avro_udf) that takes the Avro data, fetches the corresponding schema from the schema registry using the schema ID present in the first 5 bytes, and then parses the Avro data using the resolved schema. Thefetch_schema_from_registryfunction is responsible for fetching the schema from the schema registry based on the schema ID.By using this UDF, you should be able to handle schema changes during batch processing, as it will resolve the correct schema for each record based on the schema ID present in the Avro data.