SpringBoot RSockets Kotlin - Missing 'rsocketResponse'

213 Views Asked by At

I'm trying to make example chat application and I'm getting strange error when using fire-and-forget from my ReactJs rsocket-websocket-client. The only thing I found GitHub - Fix RSocket Fire and forget handling with Kotlin Could there be a problem with client config?

Error message:

java.lang.IllegalArgumentException: Missing 'rsocketResponse'
    at org.springframework.util.Assert.notNull(Assert.java:201) ~[spring-core-5.3.6.jar:5.3.6]
    at org.springframework.messaging.rsocket.annotation.support.RSocketPayloadReturnValueHandler.handleEncodedContent(RSocketPayloadReturnValueHandler.java:64) ~[spring-messaging-5.3.6.jar:5.3.6]
    at org.springframework.messaging.handler.invocation.reactive.AbstractEncoderMethodReturnValueHandler.lambda$handleReturnValue$0(AbstractEncoderMethodReturnValueHandler.java:121) ~[spring-messaging-5.3.6.jar:5.3.6]
    at org.springframework.messaging.handler.invocation.reactive.ChannelSendOperator$WriteBarrier.onComplete(ChannelSendOperator.java:251) ~[spring-messaging-5.3.6.jar:5.3.6]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:133) ~[reactor-core-3.4.5.jar:3.4.5]
    at kotlinx.coroutines.reactor.MonoCoroutine.onCompleted(Mono.kt:66) ~[kotlinx-coroutines-reactor-1.4.3.jar:na]
    at kotlinx.coroutines.AbstractCoroutine.onCompletionInternal(AbstractCoroutine.kt:104) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.JobSupport.tryFinalizeSimpleState(JobSupport.kt:294) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:856) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:828) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:111) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.debug.internal.DebugProbesImpl$CoroutineOwner.resumeWith(DebugProbesImpl.kt:461) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46) ~[kotlin-stdlib-1.4.32.jar:1.4.32-release-371 (1.4.32)]
    at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.EventLoop.processUnconfinedEvent(EventLoop.common.kt:69) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:244) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:369) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:403) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:395) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:300) ~[kotlinx-coroutines-core-jvm-1.4.3.jar:na]
    at kotlinx.coroutines.reactive.AwaitKt$awaitOne$$inlined$suspendCancellableCoroutine$lambda$1.onComplete(Await.kt:173) ~[kotlinx-coroutines-reactive-1.4.3.jar:na]
    at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:269) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoUsingWhen$MonoUsingWhenSubscriber.deferredComplete(MonoUsingWhen.java:280) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:540) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Operators.complete(Operators.java:136) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:397) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoTakeLastOne$TakeLastOneSubscriber.onComplete(MonoTakeLastOne.java:119) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredComplete(FluxUsingWhen.java:405) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:540) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:690) ~[reactor-pool-0.2.4.jar:0.2.4]
    at reactor.core.publisher.Operators.complete(Operators.java:136) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:802) ~[reactor-pool-0.2.4.jar:0.2.4]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:259) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Operators.complete(Operators.java:136) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:201) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:61) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:83) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onError(FluxFilter.java:291) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:259) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Operators.error(Operators.java:197) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:52) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4150) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:397) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:894) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:997) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:212) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:229) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onComplete(FluxFilterFuseable.java:391) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940) ~[reactor-core-3.4.5.jar:3.4.5]
    at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:99) ~[r2dbc-postgresql-0.8.7.RELEASE.jar:0.8.7.RELEASE]
    at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:439) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:784) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:732) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:240) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:197) ~[reactor-core-3.4.5.jar:3.4.5]
    at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:719) ~[r2dbc-postgresql-0.8.7.RELEASE.jar:0.8.7.RELEASE]
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:984) ~[r2dbc-postgresql-0.8.7.RELEASE.jar:0.8.7.RELEASE]
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860) ~[r2dbc-postgresql-0.8.7.RELEASE.jar:0.8.7.RELEASE]
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767) ~[r2dbc-postgresql-0.8.7.RELEASE.jar:0.8.7.RELEASE]
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) ~[reactor-core-3.4.5.jar:3.4.5]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:280) ~[reactor-netty-core-1.0.6.jar:1.0.6]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:389) ~[reactor-netty-core-1.0.6.jar:1.0.6]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401) ~[reactor-netty-core-1.0.6.jar:1.0.6]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.6.jar:1.0.6]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.63.Final.jar:4.1.63.Final]
    at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

RSocket Controller (post function causes the error):

@Controller
class MessageController(private val _chatService: IChatService) {


    @MessageMapping("send")
    suspend fun post(@Payload p: String) {
        _chatService.post(MessageDTO(null, p))
    }

    @MessageMapping("messages")
    suspend fun messageStream() =
        _chatService.stream().onStart {
        emitAll(_chatService.latest())
    }
}
0

There are 0 best solutions below