Reactive Feign with Webflux

4.7k Views Asked by At

I'm using Reactive Feign implementation from here (https://github.com/Playtika/feign-reactive) but even when I tried to use Spring WebClient I have the same problem.

When I call a URL with a POST Request, passing a JSON Body, the stream never done. The thread remains freeze awaiting for finish and it never hapens.

fun post(request: PostRequest): Mono<PostResponse> {   
    val response = myClient.post(request)
        .onErrorResume {
            log.error("Error sending request", it)
            Mono.error<PostResponse>(it)
        }
        .doOnSuccess {
            log.info("Success with response $it")
            it
        }
    response.block() 
    return response
}

But, when I call subscribe() nothing happens and request remains freezed. When I call block() method, it throws the follow exception:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4

Running Debug, I got this log:

2021-01-06 11:57:42.047 DEBUG 13624 --- [ctor-http-nio-4] r.client.log.DefaultReactiveLogger       : [MyClient#post]--->POST http://localhost:8081/post HTTP/1.1
2021-01-06 11:57:42.048 DEBUG 13624 --- [ctor-http-nio-4] r.n.resources.PooledConnectionProvider   : [id: 0x77f83c34, L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] Channel acquired, now 1 active connections and 0 inactive connections
2021-01-06 11:57:42.048 DEBUG 13624 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id: 0x77f83c34, L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] Added decoder [ReadTimeoutHandler] at the end of the user pipeline, full pipeline: [reactor.left.httpCodec, ReadTimeoutHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
2021-01-06 11:57:42.048 DEBUG 13624 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id: 0x77f83c34, L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] Added decoder [WriteTimeoutHandler] at the end of the user pipeline, full pipeline: [reactor.left.httpCodec, ReadTimeoutHandler, WriteTimeoutHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
2021-01-06 11:57:42.048 DEBUG 13624 --- [ctor-http-nio-4] r.netty.http.client.HttpClientConnect    : [id: 0x77f83c34, L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] Handler is being applied: {uri=http://localhost:8081/post, method=POST}
2021-01-06 11:57:42.049 DEBUG 13624 --- [ctor-http-nio-4] r.n.resources.PooledConnectionProvider   : [id: 0x77f83c34, L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] onStateChange(POST{uri=/post, connection=PooledConnection{channel=[id: 0x77f83c34, L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80]}}, [request_prepared])
2021-01-06 11:57:42.074 DEBUG 13624 --- [ctor-http-nio-4] r.n.resources.PooledConnectionProvider   : [id: 0x77f83c34, L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80] onStateChange(POST{uri=/post, connection=PooledConnection{channel=[id: 0x77f83c34, L:/10.7.8.88:62005 - R:localhost/127.0.0.1:80]}}, [request_sent])

There is a way to make this request?

1

There are 1 best solutions below

0
On

Replace .block() call by .map() in this way:

fun post(request: PostRequest): Mono<PostResponse> =
    myClient.post(request)
        .onErrorResume {
            log.error("Error sending request", it)
            Mono.error<PostResponse>(it)
        }
        .map {
          log.info("Success with response $it")
          ok(it)
        }

.map() is synchronous, so it does the same as subscribe(). Subscribe should be called from the client-side, not from the server-side on non-blocking pipelines. With .map() you the pipeline becomes synchronous, but it won't run until a client .subscribes()