I need to check that an infinite observable (events from a device) emits ne specific event lets call it "Started" which is then followed by another one, "Finished". However in between these two events, any number of different events can be received and they must be ignored. The result of this should be a Completable.complete() which is successful when the "Started" event was followed by the "Finished" event before a set timeout.
I have a working solution for this problem, however it looks ugly and too complex and I think there probably is a more elegant/simple solution. My current code looks like this, I have generalized my code so it is easier to understand, basically in this example I check that after the Flowable emits the number "5" before a timeout of 10 seconds the number "8" is received.:
Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
.publish().autoConnect(1);
return events
.filter(number -> number == 5)
.firstElement()
.concatMapCompletable(number -> {
if (number == 5) {
return events
.filter(number2 -> number2 == 8)
.firstElement()
.concatMapCompletable(number2 -> {
if (number2 == 8) {
return Completable.complete();
} else {
return Completable.error(new Exception("Number 3 expected, got " + number2));
}
});
} else {
return Completable.error(new Exception("Number 2 expected, got " + number));
}
})
.timeout(10, TimeUnit.SECONDS, Completable.error(new Exception("Timeout!")));
EDIT: I have found a cleaner version, however it seems weird since I am using the .filter operator to then Complete on the first element received, I post it here below for reference:
Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
.publish().autoConnect(1);
TestObserver testObserver = events
.filter(number -> number == 5)
.firstElement()
.concatMapCompletable(number ->
events
.filter(number2 -> number2 == 8)
.firstElement()
.concatMapCompletable(number2 ->
Completable.complete()))
.timeout(10, TimeUnit.SECONDS, Completable.error(new Exception("Timeout!")))
.test();
UPDATE2: Version which I am much more happy about:
Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
.publish().autoConnect(1);
TestObserver testObserver = events
.skipWhile(number -> number != 5)
.firstElement()
.flatMapCompletable(number -> Completable.fromObservable(events
.takeUntil(number2 -> number2 == 8)
.toObservable()
));
I'm not sure to understand what you want to do exactelly but you can use
buffer
orwindow
operators like the following:UPDATE