Spring Integration with Kafka not sending messages

564 Views Asked by At

I am working on Spring - Kafka using Java DSL and I see that the messages are not produced/sent to the kafka topic.

The code I have been using is:

@Bean
public IntegrationFlow sendToKafkaFlow() {
    return IntegrationFlows.from(kafkaPublishChannel)
                           .handle(kafkaMessageHandler())
                           .get();
}

private KafkaProducerMessageHandlerSpec<String, Object, ?> kafkaMessageHandler() {
    return Kafka
            .outboundChannelAdapter(_kafkaProducerFactory.getKafkaTemplate().getProducerFactory())
            .messageKey(m -> m
                    .getHeaders()
                    .getId())
            //.headerMapper(mapper())
            .topic(_topicConfiguration.getCheProgressUpdateTopic())
            .configureKafkaTemplate(t -> t.getTemplate());
}

@Bean
public DefaultKafkaHeaderMapper mapper() {
    return new DefaultKafkaHeaderMapper();
}

The producer configurations I am using are:

private ProducerFactory<String, Object> producerFactory() {
    final Map<String, Object> producerProps = new HashMap<>();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProducerConfiguration.getKafkaServerProducerHost());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerConfiguration.isKafkaProducerIdempotentEnabled());
    producerProps.put(ProducerConfig.ACKS_CONFIG, kafkaProducerConfiguration.getKafkaProducerAcks());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerConfiguration.getKafkaProducerRetries());
    producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, kafkaProducerConfiguration.getKafkaProducerBatchSize());
    producerProps.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerConfiguration.getKafkaProducerLingerMs());
    return new DefaultKafkaProducerFactory<>(producerProps);
}

Not sure why I am not seeing the messages in the kafka topic. Can you please help me out here?

1

There are 1 best solutions below

0
On

Try to use sync(true) on that Kafka.outboundChannelAdapter(). I believe there should be some errors if you don't see any progress during sending. You may also consider to use a DEBUG logging level for the org.springframework.integration category to see how your messages are traveling through your integration flow.