Remove duplicates(considering key as flight number) and get only the latest record w.r.t timestamp

211 Views Asked by At

Can I upsert data, which is avro schema in Kafka?

I want to pick record from topic and then, filter the flights (eg: consider two records have same flight number. We need to pick only latest one by considering time stamp as mentioned in Avro schema

How can I do this I want to remove duplicates of same flight number

{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "oldsomething random" }

Output stream should be like,

{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" :  "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
 builder.stream(inputTopic, Consumed.with(Serdes.String(), flightDataSerde))
    
        .map((k, v) -> new KeyValue<>((String) v.getFlightStatus(), (Integer) v.getFlightNumber()))
    
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
        // Apply COUNT method
      .count()
        // Write to stream specified by outputTopic
        .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));

Avro:

  "namespace": "io.confluent.developer.avro",
  "type": "record",
  "name": "FlightData",
  "fields": [
    {"name": "FlightNumber", "type": "int"},
    {"name": "OriginAirport", "type": "string"},
    {"name": "DestinationAirport", "type": "string"},
        {"name": "OriginDate", "type": "string"},
        {"name": "OriginTime", "type": "string"},
        {"name": "DestinationDate", "type": "string"},
        {"name": "DestinationTime", "type": "string"},
        {"name": "FlightStatus", "type": "string"},

        {"name": "GateOut", "type": "string"},
        {"name": "GateIn", "type": "string"},
        {"name": "RecordDateTime", "type": "string"}
  ]
}
2

There are 2 best solutions below

0
On

by considering time stamp as mentioned in Avro schema

This is what the TimestampExtractor interface is for. Otherwise, you could adjust your upstream producer to make that timestamp the actual record timestamp

two records have same flight number. We need to pick only latest one

This is the default behavior for ordered records of the same key arriving into the source topic. You'll want to consider logic to handle late arriving data, though, and skip any data with later timestamps. This can be done with the Processor API easier than the Streams DSL, which you'll need to use anyway to get access to check against the table contents

0
On

The main problem you need to address is, for how long do you want to wait before you emit a result record. When you get the first record, you don't know if you can emit it right away, of if there might be a duplicate later (with larger or smaller timestamp).

Thus, you need to define some window and use an aggregation that keeps only one record per key and per window. In this aggregation, you can compare the timestamps and only keep the desired record.

After the aggregation, you can use suppress() to only emit a single final result record when the window closes.