Stream output of one flux to another

340 Views Asked by At

I am getting the data from messageHandler(kafka) and processing it via servicehandler to give me some result

serviceHandler.process(message).getMessage().toString()

Now I need to output this stream of results so that whenever a new record comes through messagehandler, it get processed and output is pushed to front-end(angular)

@Autowired
MessageHandler messageHandler;
@Autowired
ServiceHandler serviceHandler;

public void runHandler() {
    Flux<Message> messages = messageHandler.flux();             
    messages.subscribeOn(Schedulers.parallel()) 
    .doOnNext(message -> serviceHandler.process(message).getMessage().toString())
    .subscribe();
}
public Flux pushresult(){ ???? }

Does anyone know of a way to what I need?

1

There are 1 best solutions below

0
On

Take a look at .map() and .flatMap(). In your case it looks like map() but if the next service you are calling also returns a Mono/Flux you want .flatMap() or you'll end up with something like:

Flux<Flux<String>>

instead of

Flux<String>

.doOnNext is meant for side effects like logging.