rxJava how to make flatMap run on multi threads

685 Views Asked by At

I want every item emitted from flatMap to run on its own thread
This is a simplified example of a real usage where each item will be a url request.
Adding subscribeOn(Schedulers.io()) on each single still run on a single thread
What's the rule here?


Integer[] array= new Integer[100];
for (int i = 0; i < 100; i++){
    array[i] = i+1;
}

Observable.fromArray(array)
        .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
            @Override
            public SingleSource<Integer> apply(Integer i) throws Throwable {
                Log.i(TAG, "apply " +  i + " " + Thread.currentThread().getName());
                return Single.just(i).subscribeOn(Schedulers.io());
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread()) 
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }

            @Override
            public void onNext(@NonNull Integer i) {
               // Log.i(TAG, "onNext " + Thread.currentThread().getName() + i);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {
            }
        });

Result:

2020-12-16 22:54:47.010 10649-10700/com.example.rxjava I/MYTAG: apply 1 RxCachedThreadScheduler-1
2020-12-16 22:54:47.037 10649-10700/com.example.rxjava I/MYTAG: apply 2 RxCachedThreadScheduler-1
2020-12-16 22:54:47.038 10649-10700/com.example.rxjava I/MYTAG: apply 3 RxCachedThreadScheduler-1
2020-12-16 22:54:47.039 10649-10700/com.example.rxjava I/MYTAG: apply 4 RxCachedThreadScheduler-1
2020-12-16 22:54:47.040 10649-10700/com.example.rxjava I/MYTAG: apply 5 RxCachedThreadScheduler-1
2020-12-16 22:54:47.043 10649-10700/com.example.rxjava I/MYTAG: apply 6 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 7 RxCachedThreadScheduler-1
2020-12-16 22:54:47.051 10649-10700/com.example.rxjava I/MYTAG: apply 8 RxCachedThreadScheduler-1
1

There are 1 best solutions below

1
On BEST ANSWER

You were on the right track except the use of just, which takes an existing object thus whatever created and computed that object happened before. In this case, it was the lambda of the flatMapSingle which is called from the same thread.

You have to make the computation itself part of the flow to be run in parallel via fromCallable for example:

Observable.fromArray(array)
.flatMapSingle(i -> {
    return Single.fromCallable(() -> {
        Log.i(TAG, "apply " +  i + " " + Thread.currentThread().getName());
        return i + 1000;
    })
    .subscribeOn(Schedulers.io());
})
.observeOn(AndroidSchedulers.mainThread())
// ...
;