Can I upsert data, which is avro schema in Kafka?
I want to pick record from topic and then, filter the flights (eg: consider two records have same flight number. We need to pick only latest one by considering time stamp as mentioned in Avro schema
How can I do this I want to remove duplicates of same flight number
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "oldsomething random" }
Output stream should be like,
{ "FlightNumber" : 1, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "latest one" }
{ "FlightNumber" : 2, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Delayed", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 3, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 4, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Scheduled", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
{ "FlightNumber" : 5, "OriginAirport" : "BOM", "DestinationAirport" : "DEL", "OriginDate" : "2020-07-26", "OriginTime" : "11:00", "DestinationDate" : "2020-07-26", "DestinationTime" : "11:00:00", "FlightStatus" : "Ontime", "GateIn" : "IN", "GateOut" : "Out", "RecordDateTime" : "qwer" }
builder.stream(inputTopic, Consumed.with(Serdes.String(), flightDataSerde))
.map((k, v) -> new KeyValue<>((String) v.getFlightStatus(), (Integer) v.getFlightNumber()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Integer()))
// Apply COUNT method
.count()
// Write to stream specified by outputTopic
.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
Avro:
"namespace": "io.confluent.developer.avro",
"type": "record",
"name": "FlightData",
"fields": [
{"name": "FlightNumber", "type": "int"},
{"name": "OriginAirport", "type": "string"},
{"name": "DestinationAirport", "type": "string"},
{"name": "OriginDate", "type": "string"},
{"name": "OriginTime", "type": "string"},
{"name": "DestinationDate", "type": "string"},
{"name": "DestinationTime", "type": "string"},
{"name": "FlightStatus", "type": "string"},
{"name": "GateOut", "type": "string"},
{"name": "GateIn", "type": "string"},
{"name": "RecordDateTime", "type": "string"}
]
}
This is what the TimestampExtractor interface is for. Otherwise, you could adjust your upstream producer to make that timestamp the actual record timestamp
This is the default behavior for ordered records of the same key arriving into the source topic. You'll want to consider logic to handle late arriving data, though, and skip any data with later timestamps. This can be done with the Processor API easier than the Streams DSL, which you'll need to use anyway to get access to check against the table contents