Send continuous query data from apache ignite to rsocket with request-stream

140 Views Asked by At

I am trying to receive data in ignite continuous query and send streaming data via rsocket request-stream to the client. the block of code for continuous query is like:

 ContinuousQuery<String, BinaryObject> query = new ContinuousQuery<>();

        query.setLocalListener(new CacheEntryUpdatedListener<String, BinaryObject>() {
            @Override
            public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends BinaryObject>> events)
                    throws CacheEntryListenerException {
                // react to the update events here
                events.forEach(cacheEntryEvent -> {
                    SampleResponse sampleResponse= createResponse(cacheEntryEvent.getValue());
              });
            }
        });
        igniteCache.query(query);

this code is within a spring boot service. I wonder to send the sampleResponse to the rsocket which is in the controller class like this:

@MessageMapping("marketData.stream")
    public Flux<SampleResponse> responseStream(@Header("jwt") String jwt, SampleRequest request) {
// the method which contains the continuous query    
 return Flux.push(service.processStream(request));
}

How could I notify the rsocket to send the message created within the onUpdate method of ignite continuous query?

1

There are 1 best solutions below

0
On

The general pattern is Flux.create

    val f: Flux<String> = Flux.create { 
      it.onCancel { 
        // cancel subscription
      }
      
      it.next("a")
      it.next("b")
      it.next("c")
      it.complete()
    }

So likely for you something like

    val f: Flux<String> = Flux.create { 
      ContinuousQuery<String, BinaryObject> query = new ContinuousQuery<>();


        query.setLocalListener(new CacheEntryUpdatedListener<String, BinaryObject>() {
            @Override
            public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends BinaryObject>> events)
                    throws CacheEntryListenerException {
                // react to the update events here
                events.forEach(cacheEntryEvent -> {
                    SampleResponse sampleResponse= createResponse(cacheEntryEvent.getValue());
              });
            }
        });
        igniteCache.query(query);

        
      it.onCancel { 
        // cancel somehow
        xxx.cancel();
      }
    }

But ideally you'd have some ignite-reactor or ignite-jdk9flowable binding and use that instead. Or something based on Kotlin Flow?