RxJava Grouped Flowable with Conflation

83 Views Asked by At

I' m trying to create a Flow for a fast producer with slow consumer for FX (foreign exchange) prices. The basic idea is that prices coming from the source should be sent to the consumer as fast as possible.

The following is important for the working of this flow:

  • While the consumer is busy submitting prices, new prices must be received from the price source (in other words we don't want to slow down the producer at any stage).
  • When the consumer is finished processing it's current rate, it is then available to process another rate from the source.
  • The consumer is only ever interested in the latest published price for a ccy pair - in other words, we want prices to be conflated if the consumer does not keep up.
  • The consumer can (and should) process submissions in parallel, as long as these submissions are of a different ccy pair. For example, EUR/USD can be submitted in parallel with GBP/USD, but while EUR/USD is busy, other EUR/USD rates must be conflated.

Here is my current attempt:

public class RxTest {

 
    private static final Logger LOG = LoggerFactory.getLogger(RxTest.class);

 
    CcyPair EUR_USD = new CcyPair("EUR/USD");
    CcyPair GBP_USD = new CcyPair("GBP/USD");
    CcyPair USD_JPY = new CcyPair("USD/JPY");

    List<CcyPair> ALL_CCY_PAIRS = Empty.<CcyPair>vector()
            .plus(EUR_USD)
            .plus(GBP_USD)
            .plus(USD_JPY);

    @Test
    void testMyFlow() throws Exception {
        AtomicInteger rateGenerater = new AtomicInteger(0);
        Flowable.<Rate>generate(emitter -> {
                    MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100));
                    final CcyPair ccyPair = ALL_CCY_PAIRS.get(ThreadLocalRandom.current().nextInt(3));
                    final String rate = String.valueOf(rateGenerater.incrementAndGet());
                    emitter.onNext(new Rate(ccyPair, rate));
                })
                .subscribeOn(Schedulers.newThread())
                .doOnNext(rate -> LOG.info("Process: {}", rate))
                .groupBy(Rate::ccyPair)
                .map(Flowable::publish)
                .doOnNext(ConnectableFlowable::connect)
                .flatMap(grp -> grp.map(rate -> rate))
                .onBackpressureLatest()
                .observeOn(Schedulers.io())
                .subscribe(onNext -> {
                    LOG.info("Long running process: {}", onNext);
                    MILLISECONDS.sleep(500);
                    LOG.info("Long running process complete: {}", onNext);
                });
        MILLISECONDS.sleep(5000);
    }

    record CcyPair(String name) {
        public String toString() {
            return name;
        }
    }

    record Rate(CcyPair ccyPair, String rate) {
        public String toString() {
            return ccyPair + "->" + rate;
        }
    }

}

Which gives me this output:

09:27:05,743 INFO [RxTest] [RxNewThreadScheduler-1] - Process: EUR/USD->1
09:27:05,764 INFO [RxTest] [RxCachedThreadScheduler-1] - Long running process: EUR/USD->1
09:27:05,805 INFO [RxTest] [RxNewThreadScheduler-1] - Process: USD/JPY->2
...
09:27:06,127 INFO [RxTest] [RxNewThreadScheduler-1] - Process: GBP/USD->9
09:27:06,165 INFO [RxTest] [RxNewThreadScheduler-1] - Process: EUR/USD->10
09:27:06,214 INFO [RxTest] [RxNewThreadScheduler-1] - Process: USD/JPY->11
09:27:06,265 INFO [RxTest] [RxCachedThreadScheduler-1] - Long running process complete: EUR/USD->1
09:27:06,265 INFO [RxTest] [RxCachedThreadScheduler-1] - Long running process: USD/JPY->2
09:27:06,302 INFO [RxTest] [RxNewThreadScheduler-1] - Process: USD/JPY->12
09:27:06,315 INFO [RxTest] [RxNewThreadScheduler-1] - Process: EUR/USD->13
...
09:27:06,672 INFO [RxTest] [RxNewThreadScheduler-1] - Process: EUR/USD->23
09:27:06,695 INFO [RxTest] [RxNewThreadScheduler-1] - Process: GBP/USD->24
09:27:06,758 INFO [RxTest] [RxNewThreadScheduler-1] - Process: EUR/USD->25
09:27:06,773 INFO [RxTest] [RxCachedThreadScheduler-1] - Long running process complete: USD/JPY->2
09:27:06,773 INFO [RxTest] [RxCachedThreadScheduler-1] - Long running process: EUR/USD->3

There are a number problems here:

  1. The consumer is not getting the "latest" rate per ccy pair, but the next one. This is obviously because there is an internal buffer and the backpressure has not kicked in yet. I don't want to wait for this backpressure to kick in, I simply want whatever is the latest emission to go to the consumer next.
  2. The consumer is not processing in parallel - while EUR/USD is processing, for example, the other ccy pairs are being buffered.

Some notes/thoughts:

  • I'm very new at JavaRx and am struggling to find common patterns/idioms in how to tackle these kinds of problems, so please be patient :)
  • I'm not at all sure that backpressure is the right way to achieve this at all. RxJava has many interesting operators like window(), cache(), takeLast() etc, but none of them seem to work exactly like I want. I would have liked to have an operator like "conflate" or such - I'm sure there is something that can achieve this, just not sure how.
  • I struggle to understand how a slow consumer can tell the flow that "I'm busy, just conflate everything until I'm done". Can this only be done via scheduling on threads? That seems worrisome because what if the consumer is asynchronous - how will it tell the producer to hold on while it's busy?
1

There are 1 best solutions below

0
On

You can use groupBy to create subflows for each currency conversions, but then you need to put those subflows into their own threads. RxJava has many buffers so skipping items while some part of the code is busy may be difficult.

source
.groupBy(Rate::ccyPair)
.flatMap(group -> {
    return group
           .onBackpressureLatest()
           .delay(0, TimeUnit.MILLISECONDS)
           .doOnNext(rate -> {
                    LOG.info("Long running process: {}", onNext);
                    MILLISECONDS.sleep(500);
                    LOG.info("Long running process complete: {}", onNext);
           })
           ;
 }, false, 128, 1)
.subscribe();