RSocket and Spring not handle multiple requests

650 Views Asked by At

I play with RSocket together with Spring boot. I want to make simple request-response example. As example I took code from this link:

https://www.baeldung.com/spring-boot-rsocket#request-response

Source code:

https://github.com/eugenp/tutorials/tree/master/spring-5-webflux/src/main/java/com/baeldung/spring/rsocket

When I run example code without change I get error during request with Exception. This error is not point of this question, but I just want to show changes comapre to original source by baeldung.

[reactor-tcp-nio-1] org.springframework.core.log.CompositeLog: [5927a44d-9] 500 Server Error for HTTP GET "/current/pko" io.rsocket.exceptions.ApplicationErrorException: No handler for destination '' at io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): |_ checkpoint ⇢ Handler com.baeldung.spring.rsocket.client.MarketDataRestController#current(String) [DispatcherHandler] |_ checkpoint ⇢ HTTP GET "/current/pko" [ExceptionHandlingWebHandler] Stack trace: at io.rsocket.exceptions.Exceptions.from(Exceptions.java:76) at io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:706) at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:640) at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:670) at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:205) at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:112) at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:260) at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:366) at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:834)

So I change client code from

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() {
        return RSocketFactory.connect()
                             .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
                             .frameDecoder(PayloadDecoder.ZERO_COPY)
                             .transport(TcpClientTransport.create(7000))
                             .start()
                             .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
    }
}

to

@Configuration
public class ClientConfiguration {
    

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.builder()
                .rsocketStrategies(rSocketStrategies)
                .connectTcp("localhost", 7000)
                .block();
    }
}

This small change help, an exception does not occur. Other issue that is point of that question iis that requests from client(requester) are processed one by one by server(responder). I create SOAPUI REST project and run GET request in 2 thread. It look like server use single thread. This is not what I expect to achieve.

To make it easy I will show whole solution.

Server :

Simple controller

@Controller
public class MarketDataRSocketController {

    Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);

    private final MarketDataRepository marketDataRepository;

    public MarketDataRSocketController(MarketDataRepository marketDataRepository) {
        this.marketDataRepository = marketDataRepository;
    }

    @MessageMapping("currentMarketData")
    public Mono<MarketData> currentMarketData(MarketDataRequest marketDataRequest) {
        logger.info("Getting data for: "+marketDataRequest);
        Mono<MarketData> result = marketDataRepository.getOne(marketDataRequest.getStock());
        logger.info("Controller thread move forward: "+marketDataRequest);
        return result;
    }

    @MessageExceptionHandler
    public Mono<MarketData> handleException(Exception e) {
        return Mono.just(MarketData.fromException(e));
    }
}

In repository I add Thread.sleep(10000); just to simulate long running operation.

@Component
public class MarketDataRepository {
    Logger logger = LoggerFactory.getLogger(MarketDataRSocketController.class);
    private static final int BOUND = 100;
    private Random random = new Random();

    public Mono<MarketData> getOne(String stock) {
        //return return Mono.just(getMarketDataResponse(stock)); original code from baeldung.
        return Mono.just(stock).map(s -> getMarketDataResponse(s));
    }

    private MarketData getMarketDataResponse(String stock) {
        logger.info("Repository thread go speel ZzzZZ");
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
        logger.info("Repository thread move forward");
        return new MarketData(stock, random.nextInt(BOUND));
    }
}

Client

Simple client configuration:

@Configuration
public class ClientConfiguration {


    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.builder()
                .rsocketStrategies(rSocketStrategies)
                .connectTcp("localhost", 7000)
                .block();
    }
}

And simple REST Controller that I use in SOAP UI

@RestController
public class MarketDataRestController {
    Logger logger = LoggerFactory.getLogger(MarketDataRestController.class);

    private final Random random = new Random();
    private final RSocketRequester rSocketRequester;

    public MarketDataRestController(RSocketRequester rSocketRequester) {
        this.rSocketRequester = rSocketRequester;
    }

    @GetMapping(value = "/current/{stock}")
    public Publisher<MarketData> current(@PathVariable("stock") String stock) {
        logger.info("Get REST call for stock : "+stock);
        return rSocketRequester.route("currentMarketData")
                               .data(new MarketDataRequest(stock))
                               .retrieveMono(MarketData.class);
    }
}

When i RUN server and client I get incomprehensible behavior to me. By SOAP UI I make single request in 2 threads.

In client log I get:

2021-09-01 11:30:14,614 INFO [reactor-http-nio-2] com.baeldung.spring.rsocket.client.MarketDataRestController: Get REST call for stock : pko
2021-09-01 11:30:14,691 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.client.MarketDataRestController: Get REST call for stock : pko

In server I get logs like:

Log from first shot:

// get data from client
2021-09-01 11:30:14,843 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Getting data for: MarketDataRequest(stock=pko)
// Log that Contoller thread go forward after call repository
2021-09-01 11:30:14,844 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Controller thread move forward: MarketDataRequest(stock=pko)
// Log that repository sleep thread
2021-09-01 11:30:14,862 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread go speel ZzzZZ
// Repository finish work
2021-09-01 11:30:24,863 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread move forward

Server is procesing only single call and just wait when repository finish job. Then process next request in similar way:

2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Getting data for: MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRSocketController: Controller thread move forward: MarketDataRequest(stock=pko)
2021-09-01 11:30:24,874 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread go speel ZzzZZ
2021-09-01 11:30:34,876 INFO [reactor-http-nio-3] com.baeldung.spring.rsocket.server.MarketDataRepository: Repository thread move forward

I don't understand why the server is processing calls one by one. Maybe there is some problem in code or maybe I'm not understand something right. Thank you in advance.

1

There are 1 best solutions below

0
On BEST ANSWER

In Reactor, by default, everything is running on the main thread. Calling Thread.sleep the main thread is blocking and the application freezes. if you would like to simulate a long-running operation you could use the delayElements operator:

.delayElements(Duration.ofSeconds(10));

Note: Reactor BlockHound detects and reports such blocking calls.