Schdulers.elastic not creating new threads in Reactor

1.1k Views Asked by At

I am trying to create a flow where a flux emits 10 items, each in parallel, with each item sleeping for 1s. Since each item is being published on a separate thread, I expect the entire process to take 1s. But the logs show that it's taking 10s instead.

I tried changing subscribeOn to publishOn, map to doOnNext. But none of them seem to work.

I am new to Reactor and am trying to understand where I am going wrong. Any help would be most appreciated. Thanks

    public void whenIsANewThreadCreated() {
                .map(count -> {
           + " - Sleeping for 1s" + " with count: " + count);
                    try {
                    } catch (InterruptedException e) {
                    return count;
2020-03-30 16:17:29.799  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 1
2020-03-30 16:17:30.802  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 2
2020-03-30 16:17:31.804  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 3
2020-03-30 16:17:32.805  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 4
2020-03-30 16:17:33.806  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 5
2020-03-30 16:17:34.808  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 6
2020-03-30 16:17:35.809  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 7
2020-03-30 16:17:36.811  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 8
2020-03-30 16:17:37.812  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 9
2020-03-30 16:17:38.814  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 10


There are 3 best solutions below


You have to first create a parallel flux by calling parallel method and you have to use runOn to achieve parallelism.

    .map(count -> {
        System.out.println(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
        try {
        } catch (InterruptedException e) {
        return count;

  • Use Schedulers.boundedElastic() as using Scheduler.elastic() is discouraged
  • parallel by default will create threads based on your CPU core. If you want more threads use parallel(10) - I think this is what you want to see.

The specification mandates that onNext events are invoked serially. Your map is effectively turning input onNext events into onNext events that block for 1 second. Per the spec, 10 incoming onNext lead to a series of 10 outgoing onNext that each block for 1s => 10s of blocking.

You absolutely 100% HAVE to use parallel(10).runOn(Scheduler.elastic()) if you want to distribute that blocking workload on 10 parallel rails. (the Scheduler for runOn can also be Schedulers.boundedElastic(), or Schedulers.newParallel(10)).



You can start those processes in background and obtain multithreading. This is not Parallelism. You should use parallel scheduler when you are doing CPU intensive tasks, use elastic when I/O or blocking operations.

public void whenIsANewThreadCreated() {
            .subscribeOn(Schedulers.boundedElastic()) // if not, main calling thread will be used
            .flatMap(count -> {
       + " - Sleeping for 1s" + " with count: " + count);
                return Mono.fromCallable(() -> method5(count)).subscribeOn(Schedulers.boundedElastic());

Mono<Integer> method5(int count) {
    try {
    } catch (InterruptedException e) {
    return Mono.just(count);

and you get something like this

 23:42:33.289 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 1
 23:42:33.342 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 2
 23:42:33.342 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 3
 23:42:33.342 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 4
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 5
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 6
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 7
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 8
 23:42:33.344 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 9
 23:42:33.344 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 10