I have a subject that emits some data. Then I want to modify this data in the queue (long operation). But when another subject will emit other things I want to reduce my queue
PublishSubject<Integer> myBaseSubject = PublishSubject.create();
PublishSubject<Integer> otherSubject = PublishSubject.create();
myBaseSubject
.concatMap(integer -> Observable.timer(5, TimeUnit.SECONDS) // long operation
.map(aLong -> integer)
)
.subscribe(
integer -> Log.i(TAG, "result: " + integer),
Functions.emptyConsumer()
);
myBaseSubject.onNext(1);
myBaseSubject.onNext(2);
myBaseSubject.onNext(3);
myBaseSubject.onNext(4);
myBaseSubject.onNext(5);
otherSubject.onNext(3);
otherSubject.onNext(4);
myBaseSubject.onNext(6);
myBaseSubject.onNext(7);
Now I want to modify my disposable to skip elements emited by otherSubject
(3 and 4) if those elements aren't already consumed by concatMap
(they are still in queue).
Is it possible to achieve that?
PS: PublishSubject is not required - it's used just for simplicity
You could have a
ConcurrentHashMap
to track which elements should be not processed byconcatMap
. I'm assuming the decision to not process an item happens after that item's index has been created.