I want to use a single producer for writing JSON objects to multiple topics.
The following code is doing what I want but it feels wrong to use the setDefaultTopic()
method to tell the KafkaTemplate
to which topic it should send the message.
If I use the send(String topic, ? payload)
method than the StringJsonMessageConverter
won't work.
My Producer:
public class MyProducer {
@Autowired
private KafkaTemplate<String, ?> kafka;
public void send(String topic, Message<?> message) {
kafka.setDefaultTopic(topic);
kafka.send(message);
}
}
And my configuration:
@Configuration
public class MyProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
...
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setMessageConverter(new StringJsonMessageConverter());
return kafkaTemplate;
}
}
Any suggestions on how to do this properly?
UPDATE
I changed the code to this ...
Producer:
public void send(Message<?> message) {
kafka.send(message);
}
In my controller (where I create the message objects);
MessageHeaders headers = new MessageHeaders(Collections.singletonMap(KafkaHeaders.TOPIC, "topicName"));
GenericMessage<NewsRequest> genericMessage = new GenericMessage<>(payload, headers);
producer.send(genericMessage);
The MessageHeaders object will still contain the id and timestamp.
When using the
send(Message<?>)
variant, the message converter expects the topic to be in a message header...If you can determine the topic some other way, you need to create a custom converter.
Changing the default topic is not thread-safe.