I am working on Spring - Kafka using Java DSL and I see that the messages are not produced/sent to the kafka topic.
The code I have been using is:
@Bean
public IntegrationFlow sendToKafkaFlow() {
return IntegrationFlows.from(kafkaPublishChannel)
.handle(kafkaMessageHandler())
.get();
}
private KafkaProducerMessageHandlerSpec<String, Object, ?> kafkaMessageHandler() {
return Kafka
.outboundChannelAdapter(_kafkaProducerFactory.getKafkaTemplate().getProducerFactory())
.messageKey(m -> m
.getHeaders()
.getId())
//.headerMapper(mapper())
.topic(_topicConfiguration.getCheProgressUpdateTopic())
.configureKafkaTemplate(t -> t.getTemplate());
}
@Bean
public DefaultKafkaHeaderMapper mapper() {
return new DefaultKafkaHeaderMapper();
}
The producer configurations I am using are:
private ProducerFactory<String, Object> producerFactory() {
final Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerConfiguration.getKafkaServerProducerHost());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerConfiguration.isKafkaProducerIdempotentEnabled());
producerProps.put(ProducerConfig.ACKS_CONFIG, kafkaProducerConfiguration.getKafkaProducerAcks());
producerProps.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerConfiguration.getKafkaProducerRetries());
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerConfiguration.getKafkaProducerBatchSize());
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerConfiguration.getKafkaProducerLingerMs());
return new DefaultKafkaProducerFactory<>(producerProps);
}
Not sure why I am not seeing the messages in the kafka topic. Can you please help me out here?
Try to use
sync(true)
on thatKafka.outboundChannelAdapter()
. I believe there should be some errors if you don't see any progress during sending. You may also consider to use a DEBUG logging level for theorg.springframework.integration
category to see how your messages are traveling through your integration flow.