Spring Kafka single producer for multiple topics

5.9k Views Asked by At

I want to use a single producer for writing JSON objects to multiple topics.

The following code is doing what I want but it feels wrong to use the setDefaultTopic() method to tell the KafkaTemplate to which topic it should send the message.

If I use the send(String topic, ? payload) method than the StringJsonMessageConverter won't work.

My Producer:

public class MyProducer {

    @Autowired
    private KafkaTemplate<String, ?> kafka;

    public void send(String topic, Message<?> message) {
        kafka.setDefaultTopic(topic);
        kafka.send(message);
    }
}

And my configuration:

@Configuration
public class MyProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        ...

        return new DefaultKafkaProducerFactory<>(props);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
        kafkaTemplate.setMessageConverter(new StringJsonMessageConverter());

        return kafkaTemplate;
    }
}

Any suggestions on how to do this properly?

UPDATE

I changed the code to this ...

Producer:

public void send(Message<?> message) {
    kafka.send(message);
}

In my controller (where I create the message objects);

MessageHeaders headers = new MessageHeaders(Collections.singletonMap(KafkaHeaders.TOPIC, "topicName"));
GenericMessage<NewsRequest> genericMessage = new GenericMessage<>(payload, headers);
producer.send(genericMessage);

The MessageHeaders object will still contain the id and timestamp.

2

There are 2 best solutions below

3
On BEST ANSWER

When using the send(Message<?>) variant, the message converter expects the topic to be in a message header...

String topic = headers.get(KafkaHeaders.TOPIC, String.class);

If you can determine the topic some other way, you need to create a custom converter.

Changing the default topic is not thread-safe.

1
On

You have to use StringJsonMessageConverter manually before calling template:

ProducerRecord<?, ?> producerRecord = kafka.getMessageConverter().fromMessage(message, topic);
kafka.send(producerRecord);