in below code lines
@Test
fun rx() {
val items = Observable.just(1, 2, 3, 4, 5)
.observeOn(Schedulers.io()) //<---- if remove this line each item is emitted by sequentially (i.e 12345)
items
.filter { it == 1 }
.doOnNext { print("1") }
.subscribe()
items
.filter { it == 2 }
.doOnNext { print("2") }
.subscribe()
items
.filter { it == 3 }
.doOnNext { print("3") }
.subscribe()
items
.filter { it == 4 }
.doOnNext { print("4") }
.subscribe()
items
.filter { it == 5 }
.doOnNext { print("5") }
.subscribe()
Thread.sleep(1000)
}
I expect log should say "12345", but it's say 23415, 31245, 53124 etc.. (i.e. Order looks like random)
The each item 1, 2, 3, 4, 5 seem that it's not emitted sequentially.
Is there any reason about that? and How can I fix it with using ".observeOn(Schedulers.io())" ?
What you are doing, is that you are creating 5 different subscribers with no guarantee of execution in order. If you want to keep the order of execution, you should use
Observable.concat
and combine list of observables into one streamEDIT:
Also please look at the first comment about TestScheduler, because if you want to make your tests more predictable you should read more about TestScheduler and testing in RxJava.