How to send messages in order from Apache Spark to Kafka topic

700 Views Asked by At

I have a use case where event information about sensors is inserted continuously in MySQL. We need to send this information with some processing in a Kafka topic every 1 or 2 minutes.

I am using Spark to send this information to Kafka topic and for maintaining CDC in Phoenix table.I am using a Cron job to run spark job every 1 minute.

The issue I am currently facing is message ordering, I need to send these messages in ascending timestamp to end the system Kafka topic (which has 1 partition). But most of the message ordering is lost due to more than 1 spark DataFrame partition sends information concurrently to Kafka topic.

Currently as a workaround I am re-partitioning my DataFrame in 1, in order to maintain the messages ordering, but this is not a long term solution as I am losing spark distributed computing.

If you guys have any better solution design around this please suggest.

1

There are 1 best solutions below

0
On

I am able to achieve message ordering as per ascending timestamp to some extend by reparations my data with the keys and by applying sorting within a partition.

val pairJdbcDF = jdbcTable.map(row => ((row.getInt(0), row.getString(4)), s"${row.getInt(0)},${row.getString(1)},${row.getLong(2)},${row. /*getDecimal*/ getString(3)},${row.getString(4)}"))
        .toDF("Asset", "Message")
val repartitionedDF = pairJdbcDF.repartition(getPartitionCount, $"Asset")
        .select($"Message")
        .select(expr("(split(Message, ','))[0]").cast("Int").as("Col1"),
          expr("(split(Message, ','))[1]").cast("String").as("TS"),
          expr("(split(Message, ','))[2]").cast("Long").as("Col3"),
          expr("(split(Message, ','))[3]").cast("String").as("Col4"),
          expr("(split(Message, ','))[4]").cast("String").as("Value"))
        .sortWithinPartitions($"TS", $"Value")