rxjava - items not emitted by sequentially

79 Views Asked by At

in below code lines

@Test
fun rx() {
    val items = Observable.just(1, 2, 3, 4, 5)
            .observeOn(Schedulers.io()) //<---- if remove this line each item is emitted by sequentially (i.e 12345)

    items
            .filter { it == 1 }
            .doOnNext { print("1") }
            .subscribe()

    items
            .filter { it == 2 }
            .doOnNext { print("2") }
            .subscribe()

    items
            .filter { it == 3 }
            .doOnNext { print("3") }
            .subscribe()

    items
            .filter { it == 4 }
            .doOnNext { print("4") }
            .subscribe()

    items
            .filter { it == 5 }
            .doOnNext { print("5") }
            .subscribe()

    Thread.sleep(1000)
}

I expect log should say "12345", but it's say 23415, 31245, 53124 etc.. (i.e. Order looks like random)

The each item 1, 2, 3, 4, 5 seem that it's not emitted sequentially.

Is there any reason about that? and How can I fix it with using ".observeOn(Schedulers.io())" ?

1

There are 1 best solutions below

0
On

What you are doing, is that you are creating 5 different subscribers with no guarantee of execution in order. If you want to keep the order of execution, you should use Observable.concat and combine list of observables into one stream

    val firstObservable = items
            .filter { it == 1 }
            .doOnNext { print("1") }
    val secondObservable = items
            .filter { it == 2 }
            .doOnNext { print("2") }

    val listOfObservables = arrayListOf(firstObservable, secondObservable...)
    
    Observable
        .concat(listOfObservables)
        .subscribe()
    Thread.sleep(1000);

EDIT:

Also please look at the first comment about TestScheduler, because if you want to make your tests more predictable you should read more about TestScheduler and testing in RxJava.