I developed a Python Kafka producer that sends multiple json records as a nd-json binary string to a Kafka topic. Then I'm trying to read these messages in Spark Structured Streaming with PySpark as follow:
events_df = select(from_json(col("value").cast("string"), schema).alias("value"))
but this code works only with a single json documents. If the value contains multiple records as a newline delimited json, Spark can't decode it correctly.
I don't want to send a kafka message for each single event. How can I achieve this?
I managed to do what I was looking for in this way, splitting the full text string by newline and then exploding the array in rows to be parsed with the schema: