RxJava 2 - Reduce queue in concatMap

574 Views Asked by At

I have a subject that emits some data. Then I want to modify this data in the queue (long operation). But when another subject will emit other things I want to reduce my queue

    PublishSubject<Integer> myBaseSubject = PublishSubject.create();
    PublishSubject<Integer> otherSubject = PublishSubject.create();

    myBaseSubject
            .concatMap(integer -> Observable.timer(5, TimeUnit.SECONDS) // long operation
                    .map(aLong -> integer)
            )
            .subscribe(
                    integer -> Log.i(TAG, "result: " + integer),
                    Functions.emptyConsumer()
            );

    myBaseSubject.onNext(1);
    myBaseSubject.onNext(2);
    myBaseSubject.onNext(3);
    myBaseSubject.onNext(4); 
    myBaseSubject.onNext(5);

    otherSubject.onNext(3);
    otherSubject.onNext(4);

    myBaseSubject.onNext(6);
    myBaseSubject.onNext(7);

Now I want to modify my disposable to skip elements emited by otherSubject (3 and 4) if those elements aren't already consumed by concatMap(they are still in queue).

Is it possible to achieve that?

PS: PublishSubject is not required - it's used just for simplicity

1

There are 1 best solutions below

0
On

You could have a ConcurrentHashMap to track which elements should be not processed by concatMap. I'm assuming the decision to not process an item happens after that item's index has been created.

ConcurrentHashMap<Integer, Integer> map = new ConcurrentHashMap<>();

myBaseSubject
    .doOnNext(v  -> map.put(v, v))
    .concatMap(integer -> {
        if (map.remove(integer) != null) {
            return Observable.timer(5, TimeUnit.SECONDS) // long operation
                .map(aLong -> integer)
        }
        return Observable.empty();
    })
    .subscribe(
        integer -> Log.i(TAG, "result: " + integer),
        Functions.emptyConsumer()
    );

myBaseSubject.onNext(1);
myBaseSubject.onNext(2);
myBaseSubject.onNext(3);
myBaseSubject.onNext(4); 
myBaseSubject.onNext(5);

map.remove(3);
map.remove(4);

myBaseSubject.onNext(6);
myBaseSubject.onNext(7);