I have two test that use sleep() to simulate an api call which takes time to process, and test if Mono.just() make it non-blocking.
In my first test, I emitted a String directly, but made it blocking in map().
@Test
public void testMono() throws InterruptedException {
for (int i = 0; i < 10; i++) {
Mono.just("Hello World")
.map(s -> {
System.out.println("Thread:" + Thread.currentThread().getName() + " " + s);
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.length();
})
.map(integer -> {
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer.toString();
})
.subscribeOn(Schedulers.parallel())
.subscribe(System.out::println);
System.out.println("Loop: " + i + " " + "after MONO");
}
sleep(10000);
}
And the result is expected as non-blocking, since all the outputs System.out.println("Loop: " + i + " " + "after MONO"); show up together at the same time.
However, in the second test, I replace the emitted element from "Hello world" to a blocking getString() method. The purpose of put sleep() in getString() is that, I'd like to simulate the scenario that it's taking time to get the emitted element. And the result is now blocking since the output shows up one by one after it finished receiving the emitted element.
@Test
public void testMono2() throws InterruptedException {
for (int i = 0; i < 10; i++) {
Mono.just(getString())
.map(s -> {
System.out.println("Thread:" + Thread.currentThread().getName() + " " + s);
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return s.length();
})
.map(integer -> {
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer.toString();
})
.subscribeOn(Schedulers.parallel())
.subscribe(System.out::println);
System.out.println("Loop: " + i + " " + "after MONO");
}
sleep(10000);
}
public String getString(){
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello world";
}
And in the third test, it's also acting as non-blocking.
int count = 0;
@Test
public void testFluxSink() throws InterruptedException {
for (int i = 0; i < 10; i++) {
System.out.println("Start Loop: " + i);
int curLoop = count++;
Flux.create(fluxSink -> {
try {
System.out.println("Loop: " + curLoop + " " + " start sleeping");
sleep(5000);
System.out.println("Loop: " + curLoop + " " + " end sleeping");
fluxSink.next(onNext());
} catch (InterruptedException e) {
e.printStackTrace();
}
}).subscribeOn(Schedulers.parallel()).subscribe(s -> {
System.out.println(Thread.currentThread());
System.out.println("Loop: " + curLoop + " " + "completed receiving" + " " + s);
});
}
sleep(100000);
}
public String onNext() throws InterruptedException {
sleep(5000);
return "onNext";
}
Wondering if I misunderstand the concept of reactor, and would like to know how did I use it in the wrong way for the 2nd test?
First of all, it is crucial to understand that Reactor does not perform any magical conversion of blocking calls to non-blocking ones. To make an end-to-end non-blocking application, you need to use non-blocking drivers, NIO, etc. If you wrap your blocking code to the
MonoorFlux, the thread is still will be blocked.As for your second example, with sleep in the
getStringmethod. There are two points to pay attention at:The first one: you use
Mono.just. The important thing is that it is usually used to provide a known value immediately. It means that thegetStringcalculation is happening not in the scope of the reactor chain, but in the main thread during the reactor chain assembling phase. That's why you see "sequential" behavior. If you replace it withMono.fromCallable, then the calculation will be in the reactor chain (parallel scheduler threads), and you will see the same behavior as in the 1 or 3 examples.The second one: It's important to note, that wrapping your
getStringmethod to theMono.fromCallabledoes not make your code non-blocking. Yoursleepstill halts the threads of the parallel scheduler. In your production code, yourgetStringmethod will likely be some database call or other service call that should be done via a non-blocking driver or NIO-based lib like Netty. To emulate it usedelayElementinstead ofsleep, it works in a non-blocking way. Here is an example:It prints