Read newline delimited json from Kafka message in Spark Structured Streaming

1.2k Views Asked by At

I developed a Python Kafka producer that sends multiple json records as a nd-json binary string to a Kafka topic. Then I'm trying to read these messages in Spark Structured Streaming with PySpark as follow:

events_df = select(from_json(col("value").cast("string"), schema).alias("value"))

but this code works only with a single json documents. If the value contains multiple records as a newline delimited json, Spark can't decode it correctly.

I don't want to send a kafka message for each single event. How can I achieve this?

1

There are 1 best solutions below

0
On BEST ANSWER

I managed to do what I was looking for in this way, splitting the full text string by newline and then exploding the array in rows to be parsed with the schema:

    events = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "quickstart-events") \
        .option("startingOffsets", "earliest")\
        .load()\
        .selectExpr("CAST(value AS STRING) as data")
    
    events = events.select(explode(split(events.data, '\n')))
    events = events.select(from_json(col("col"), event_schema).alias('value'))
    events = events.selectExpr('value.*')```