I'm new to Spring Reactor and WebFlux and a little confused regarding the event flow within Spring functional web.
Example: I have a handler function returning a Mono<ServerResponse>
. Within it, a findAll()
repository method is executed returning a Flux<T>
. In compliance to the reactive manifesto, in order to be async, non-blocking and allow backpressure I would like to see an onNext()
for every element returned from the repository. However, looking at the server logs during request processing I see only one onNext()
event, which makes sense as my return type is a Mono
containing the response:
Router Function
@Bean
public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
return RouterFunctions
.route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
, itemsHandler::getAll);
}
Handler Function
Mono<ServerResponse> getAll(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(itemRepository.findAll(), Item.class)
.log("GET items");
}
Events log
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | request(unbounded)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745 INFO 19096 --- [ctor-http-nio-4] GET items : | onComplete()
In contrast, implementing a classic Spring annotated controller method with a Flux<T>
as return type, I'll see an onNext()
for every instance of T
(i.e. every item of the result set), which looks more "correct" to me (client now has control over event flow etc.):
Controller
@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
return itemRepository
.findAll()
.log("GET items");
}
Log
2020-05-10 15:14:04.135 INFO 19096 --- [ctor-http-nio-5] GET items : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136 INFO 19096 --- [ctor-http-nio-5] GET items : request(unbounded)
2020-05-10 15:14:04.137 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onComplete()
This is confusing. Let me elaborate:
- Using
Mono<ServerResponse>
seems evil in the sense that it encapsulates the entire result set in a single event, which for me feels like breaking the reactive priciples of asynchronous, non-blocking, backpressure-enabled event flow. Doesn't this take the control away from the client? This looks like traditional, blocking, client/server communications to me. - Returning the
Flux<T>
directly feels a lot nicer because it enables per-result event handling and backpressure control.
My questions are:
- What are the implications of creating a
Mono<ServerResponse>
? Will this cause a blocking, synchrous interaction, emitting theonNext()
only when all items have been read from the repo? Will I lose backpressure functionality etc.? - How can I get the functional style backend to send an
onNext()
for every item in the result set? - What would be the best practice in terms of the return type of a functional style handler function that is fully reactive, i.e. non-blocking, asynchronous, and backpressure compatible? I'm unsure whether
Mono<ServerResponse>
doesn't break these reactive principles.
I might be completely wrong or missing something important. Thanks for your help!
It all depends on the client consuming the
ServerResponse
. According to the WebFlux docs (https://docs.spring.io/spring-framework/docs/5.2.x/spring-framework-reference/web-reactive.html#spring-webflux) setting up handler functions to returnMono<ServerResponse>
regardless of the number of returned items is the standard way and absolutely fine - as long as the client correctly handles the underlyingFlux<T>
all is well. My problem arose because I tested the the endpoints usingcurl
, which isn't able to detect the underlyingFlux
. Using a functional-style enabled client (likeorg.springframework.web.reactive.function.client.WebClient
), theMono<ServerResponse>
can be de-serialized into aFlux<T>
first, enabling all the nice reactive functionality, and making ouronNext()
events show up.Client code
Calling the backend like so, de-serializing the ServerResponse into a Flux:
Will result in seeing all
onNext()
events, enabling client-side event handling:So all is well and fully reactive as long as proper client handling of the response takes place.