Spring Webflux: I want to send data to kafka after saving to database

997 Views Asked by At

I'm trying to do send data to kafka after my database operation is successful.

I have a /POST endpoint which store the data in mongodb and return the whole object along with mongoDB uuid.

Now I want to perform an addition task, if data is successfully saved in mongodb i should call my kafka producer method and send the data.

Not sure how to do it.

Current Codebase

public Mono<?> createStock(StockDTO stockDTONBody) {
    // logger.info("Received StockDTO body: {}, ", stockDTONBody);
    
    Mono<StockDTO> stockDTO = mongoTemplate.save(stockDTONBody);

   // HERE I WANT TO SEND TO KAFKA IF DATA IS SAVED TO MONGO.

    return stockDTO;
}
2

There are 2 best solutions below

0
On

Thanks @Alex for help. I

Adding my answer for others.

public Mono<?> createStock(StockDTO stockDTONBody) {
     // logger.info("Received StockDTO body: {}, ", stockDTONBody);

     Mono<StockDTO> stockDTO = mongoTemplate.save(stockDTONBody);


     // =============== Kafka Code added======================
     return stockDTO.flatMap(data -> sendToKafka(data, "create"));
    }

public Mono<?> sendToKafka(StockDTO stockDTO, String eventName){

 Map<String, Object> data = new HashMap<String, Object>();

    data.put("event", eventName);
    data.put("campaign", stockDTO);
    template.send(kafkaTopicName, data.toString()).log().subscribe();

    System.out.println("sending to Kafka "+ eventName + data.toString());
    
    return Mono.just(stockDTO);
}
1
On

This can result in dual writes if your data is saved in mongo and something goes wrong while publishing to kafka. Data will be missing in kafka. Instead you should use change data capture for this. Mongo provides mongo change streams which can be used here or there are other open source kafka connectors available where you can configure the connectors to listen to changelogs of Mongo and stream those to kafka.