Check one specific event is followed by another one and emit success using RxJava

437 Views Asked by At

I need to check that an infinite observable (events from a device) emits ne specific event lets call it "Started" which is then followed by another one, "Finished". However in between these two events, any number of different events can be received and they must be ignored. The result of this should be a Completable.complete() which is successful when the "Started" event was followed by the "Finished" event before a set timeout.

I have a working solution for this problem, however it looks ugly and too complex and I think there probably is a more elegant/simple solution. My current code looks like this, I have generalized my code so it is easier to understand, basically in this example I check that after the Flowable emits the number "5" before a timeout of 10 seconds the number "8" is received.:

    Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
            .publish().autoConnect(1);

    return events
            .filter(number -> number == 5)
            .firstElement()
            .concatMapCompletable(number -> {
                if (number == 5) {
                    return events
                            .filter(number2 -> number2 == 8)
                            .firstElement()
                            .concatMapCompletable(number2 -> {
                                if (number2 == 8) {
                                    return Completable.complete();
                                } else {
                                    return Completable.error(new Exception("Number 3 expected, got " + number2));
                                }
                            });
                } else {
                    return Completable.error(new Exception("Number 2 expected, got " + number));
                }
            })
            .timeout(10, TimeUnit.SECONDS, Completable.error(new Exception("Timeout!")));

EDIT: I have found a cleaner version, however it seems weird since I am using the .filter operator to then Complete on the first element received, I post it here below for reference:

    Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
            .publish().autoConnect(1);

    TestObserver testObserver = events
            .filter(number -> number == 5)
            .firstElement()
            .concatMapCompletable(number ->
                    events
                            .filter(number2 -> number2 == 8)
                            .firstElement()
                            .concatMapCompletable(number2 ->
                                    Completable.complete()))
            .timeout(10, TimeUnit.SECONDS, Completable.error(new Exception("Timeout!")))
            .test();

UPDATE2: Version which I am much more happy about:

    Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
            .publish().autoConnect(1);

    TestObserver testObserver = events
            .skipWhile(number -> number != 5)
            .firstElement()
            .flatMapCompletable(number -> Completable.fromObservable(events
                    .takeUntil(number2 -> number2 == 8)
                    .toObservable()
            ));
1

There are 1 best solutions below

10
On BEST ANSWER

I'm not sure to understand what you want to do exactelly but you can use buffer or window operators like the following:

Flowable.just(1, 2, 3, 4, 5)
        .buffer(2, 1)
        .filter(e -> e.size() > 1)
        .flatMapCompletable(e -> {
            int first = e.get(0);
            int second = e.get(1);
            if (first == 2) {
                if (second == 3) {
                    return Completable.complete();
                } else {
                    return Completable.error(new Exception("..."));
                }
            }

            return Completable.fromObservable(Observable.just(e));
        })

UPDATE

Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS)
        .share();

source
        .skipWhile(e -> e != 5)
        .flatMapCompletable(e -> Completable.fromObservable(source
                .takeUntil(x -> x == 8)
                .timeout(10, TimeUnit.SECONDS)))
        .subscribe();