RSocket : RejectedResumeException (0x4): unknown resume token

359 Views Asked by At

I am using rsocket starter in Spring Boot (2.5.3). I have a requester and responder set up and its working quite well (for all kind of rsocket communication). The issue is when I am trying to implement resumability option on the requester side.

Here is the code for requester

@Bean
    public RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
        Resume resume = new Resume()
                .sessionDuration(Duration.ofMinutes(15))
                .streamTimeout(Duration.ofMinutes(15))
                .retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))
                        .doBeforeRetry(s -> System.out.println("Resume: Before Retry : "+s))
                        .doAfterRetry(s -> System.out.println("Resume: After Retry : "+s))
                );

        return builder
                .dataMimeType(MimeType.valueOf("application/json"))
                .rsocketConnector(rSocketConnector -> {
                    rSocketConnector.reconnect(Retry.fixedDelay(10, Duration.ofSeconds(20)).doBeforeRetry(s -> {
                        System.out.println("Reconnect : Disconnected. Trying to reconnect..."+s);
                    }));
                    rSocketConnector.resume(resume);
                })
                .websocket(URI.create("ws://localhost:9190/api/workflow/realtime"));
    }

On the responder side:

@Bean
    RSocketServerCustomizer rSocketResume() {
        Resume resume = new Resume()
                .sessionDuration(Duration.ofMinutes(15))
                .streamTimeout(Duration.ofMinutes(15))
                .retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))
                        .doBeforeRetry(s -> System.out.println("Resume: Before Retry : "+s))
                        .doAfterRetry(s -> System.out.println("Resume: After Retry : "+s))
                );
        return rSocketServer -> rSocketServer.resume(resume );
    }

But when responder goes down and comes back alive, I get below error on requester console:

**[----> Responder dead at this point]**
Resume: Before Retry : attempt #1 (1 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: After Retry : attempt #1 (1 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: Before Retry : attempt #2 (2 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: After Retry : attempt #2 (2 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: Before Retry : attempt #3 (3 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: After Retry : attempt #3 (3 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}

**[<---- Responder back to live at this point]**

2021-08-08 04:48:39.404 ERROR 24748 --- [actor-tcp-nio-5] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: RejectedResumeException (0x4): unknown resume token
Caused by: io.rsocket.exceptions.RejectedResumeException: unknown resume token
    at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.1.jar:na]
    at io.rsocket.resume.ClientRSocketSession.tryReestablishSession(ClientRSocketSession.java:271) ~[rsocket-core-1.1.1.jar:na]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196) ~[reactor-core-3.4.8.jar:3.4.8]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.8.jar:3.4.8]
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) ~[reactor-core-3.4.8.jar:3.4.8]
    at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:330) ~[reactor-core-3.4.8.jar:3.4.8]
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:160) ~[reactor-core-3.4.8.jar:3.4.8]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:114) ~[rsocket-core-1.1.1.jar:na]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.1.jar:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.8.jar:3.4.8]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279) ~[reactor-netty-core-1.0.9.jar:1.0.9]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388) ~[reactor-netty-core-1.0.9.jar:1.0.9]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404) ~[reactor-netty-core-1.0.9.jar:1.0.9]
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:706) ~[reactor-netty-http-1.0.9.jar:1.0.9]
    at reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:159) ~[reactor-netty-http-1.0.9.jar:1.0.9]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.9.jar:1.0.9]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.66.Final.jar:4.1.66.Final]
    at java.lang.Thread.run(Unknown Source) ~[na:1.8.0_261]

2021-08-08 04:48:39.407 ERROR 24748 --- [actor-tcp-nio-5] reactor.Flux.Map.1                       : onError(RejectedResumeException (0x4): unknown resume token)

Can anyone suggest why this behavior? Also I want to understand what is this token on resume?

Thanks.

0

There are 0 best solutions below