Currently I use the following event producer:
@Component
class GenericEventProducer : EventProducer<BaseEvent<*>> {
override val sink: Sinks.Many<Message<BaseEvent<*>>> = Sinks.many().replay().latest()
@Bean
fun projekte() = Supplier<Flux<Message<BaseEvent<*>>>> { sink.asFlux().filter { it.payload is ProjektEvent } }
}
The EventProducer
is a wrapper with a default method around sink.tryEmitNext(message)
. The advantage is, that I can use the spring cloud stream bindings and producer configuration.
Now that the reactive Kafka binder matured into 1.x raises naturally the question for me whether it could and should be used instead of this sink-based solution. I couldn't find proper examples on its usage and it generally lacks documentation on ReactiveKafkaProducerTemplate
.