From within a Quarkus application I need to publish tombstone messages to a compacted Apache Kafka topic. As my use-case is imperative I use an Emitter
for sending messages to the topic (as suggested in the quarkus blog). The code for non-tombstone messages (with payload) is:
@Dependent
public class Publisher {
@Inject
@Channel("theChannelName")
Emitter<MyDataStructure> emitter;
public CompletionStage<Void> publish(final MyDataStructure myData) {
OutgoingKafkaRecordMetadata<String> metadata =
OutgoingKafkaRecordMetadata.<String>builder()
.withKey(myData.getTopicKey())
.build();
return CompletableFuture.runAsync(
() -> emitter.send(Message.of(myData).addMetadata(metadata)));
}
}
The Emitter
also implements <M extends Message<? extends T>> void send(M msg)
which I hoped would allow me to craft a Message
with payload of null
as tombstone message. Unfortunately all implementations of the Message.of(..)
factory method, that allow to provide metadata (which is needed to provide the message-key), specify that payload, must not be {@code null}.
What is the proper way (following Quarkus / SmallRye Reactive Messaging concepts) to publish tombstone messages to a Kafka topic using an Emitter
?
I would recommend using the
Record
class (see documentation). ARecord
is a key/value pair, which represents the key and value of the Kafka record to write. Both can benull
, but in your case, only the value part should benull
:Record.of(key, null);
.So, you need to change the type of the Emitter to be
Record<Key, Value>
, such as:While
runAsync
is convenient, emitters are already async. So, no need to use that. In addition, the behavior can be dramatic in containers (if your level of parallelism is less than 2).My code returned the result of the
send
method which is aCompletionStage
. That stage will be completed when the record is written in Kafka (and acked by the broker).