I am trying to receive data in ignite continuous query and send streaming data via rsocket request-stream to the client. the block of code for continuous query is like:
ContinuousQuery<String, BinaryObject> query = new ContinuousQuery<>();
query.setLocalListener(new CacheEntryUpdatedListener<String, BinaryObject>() {
@Override
public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends BinaryObject>> events)
throws CacheEntryListenerException {
// react to the update events here
events.forEach(cacheEntryEvent -> {
SampleResponse sampleResponse= createResponse(cacheEntryEvent.getValue());
});
}
});
igniteCache.query(query);
this code is within a spring boot service. I wonder to send the sampleResponse to the rsocket which is in the controller class like this:
@MessageMapping("marketData.stream")
public Flux<SampleResponse> responseStream(@Header("jwt") String jwt, SampleRequest request) {
// the method which contains the continuous query
return Flux.push(service.processStream(request));
}
How could I notify the rsocket to send the message created within the onUpdate method of ignite continuous query?
The general pattern is
Flux.create
So likely for you something like
But ideally you'd have some ignite-reactor or ignite-jdk9flowable binding and use that instead. Or something based on Kotlin Flow?