Schdulers.elastic not creating new threads in Reactor

1.1k Views Asked by At

I am trying to create a flow where a flux emits 10 items, each in parallel, with each item sleeping for 1s. Since each item is being published on a separate thread, I expect the entire process to take 1s. But the logs show that it's taking 10s instead.

I tried changing subscribeOn to publishOn, map to doOnNext. But none of them seem to work.

I am new to Reactor and am trying to understand where I am going wrong. Any help would be most appreciated. Thanks

    public void whenIsANewThreadCreated() {
        Flux.range(1,10)
                .publishOn(Schedulers.elastic())
                .map(count -> {
                    logger.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
                    try {
                        Thread.sleep(1_000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return count;
                })
        .blockLast();
    }
2020-03-30 16:17:29.799  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 1
2020-03-30 16:17:30.802  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 2
2020-03-30 16:17:31.804  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 3
2020-03-30 16:17:32.805  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 4
2020-03-30 16:17:33.806  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 5
2020-03-30 16:17:34.808  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 6
2020-03-30 16:17:35.809  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 7
2020-03-30 16:17:36.811  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 8
2020-03-30 16:17:37.812  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 9
2020-03-30 16:17:38.814  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 10

3

There are 3 best solutions below

2
On BEST ANSWER

You have to first create a parallel flux by calling parallel method and you have to use runOn to achieve parallelism.

Flux.range(1,10)
    .parallel()
    .runOn(Schedulers.elastic())
    .map(count -> {
        System.out.println(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
        try {
            Thread.sleep(1_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return count;
    }).subscribe();

  • Use Schedulers.boundedElastic() as using Scheduler.elastic() is discouraged
  • parallel by default will create threads based on your CPU core. If you want more threads use parallel(10) - I think this is what you want to see.
0
On

The specification mandates that onNext events are invoked serially. Your map is effectively turning input onNext events into onNext events that block for 1 second. Per the spec, 10 incoming onNext lead to a series of 10 outgoing onNext that each block for 1s => 10s of blocking.

You absolutely 100% HAVE to use parallel(10).runOn(Scheduler.elastic()) if you want to distribute that blocking workload on 10 parallel rails. (the Scheduler for runOn can also be Schedulers.boundedElastic(), or Schedulers.newParallel(10)).

0
On

Reference: https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking

You can start those processes in background and obtain multithreading. This is not Parallelism. You should use parallel scheduler when you are doing CPU intensive tasks, use elastic when I/O or blocking operations.

public void whenIsANewThreadCreated() {
    Flux.range(1,10)
            .subscribeOn(Schedulers.boundedElastic()) // if not, main calling thread will be used
            .flatMap(count -> {
                log.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
                return Mono.fromCallable(() -> method5(count)).subscribeOn(Schedulers.boundedElastic());
            })
            .blockLast();
}


Mono<Integer> method5(int count) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return Mono.just(count);
}

and you get something like this

 23:42:33.289 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 1
 23:42:33.342 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 2
 23:42:33.342 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 3
 23:42:33.342 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 4
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 5
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 6
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 7
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 8
 23:42:33.344 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 9
 23:42:33.344 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 10