Server not receiving data in RSocket Fire and Forget case

482 Views Asked by At

I have the following RSocket server

@Log4j2
public class NativeRsocketServerFnF {
  public static void main(String[] args) {
    RSocketFactory.receive()
        .frameDecoder(ZERO_COPY)
        .errorConsumer(log::error)
        .acceptor((setup, clientHandleRsocket) -> {
          return Mono.just(
              new AbstractRSocket() {
                @Override
                public Mono<Void> fireAndForget(Payload payload) {
                  CharSequence message = payload.data().readCharSequence(payload.data().readableBytes(), forName("UTF-8"));
                  payload.release();
                  log.info("> from client: {}", message);
                  return Mono.empty();
                }
              }
          );
        })
        .transport(TcpServerTransport.create(8000))
        .start()
        .block()
        .onClose()
        .block();
  }
}

and the following RSocket client

@Log4j2
public class NativeRsocketClientFnF {
  public static void main(String[] args) {
    RSocketFactory.connect()
        .frameDecoder(ZERO_COPY)
        .errorConsumer(log::error)
        .transport(TcpClientTransport.create(8000))
        .start()
        .flatMap(rSocket -> rSocket.fireAndForget(DefaultPayload.create("ping")))
        .block();
  }
}

As you can see I am trying send "ping" as payload data from client to the server

When I start the server & start the client for 1st time, I see > from client: ping

If I restart the client again, I dont see any messages on the server. The break point is not even hit on the server

My understanding is Fire and Forget simply sends the data & does not bother to wait & see if the server is successful in processing the data, but in my case, the server itself is not receiving the data on subsequent runs of client (as good as new clients)

Is there something I am missing?

I am using version 1.0.0-RC5 of rsocket-core & rsocket-transport-netty

OS: Ubuntu 16.04

1

There are 1 best solutions below

2
On BEST ANSWER

Your understanding of Fire and Forget pattern is correct.

The problem is io.rsocket.RSocketRequester#fireAndForget completes successfully before data was sent to the network. And when you block it and exit your client application, the connection closes.

See io.rsocket.RSocketRequester#handleFireAndForget

    return emptyUnicastMono()
        .doOnSubscribe(
            new OnceConsumer<Subscription>() {
              @Override
              public void acceptOnce(@Nonnull Subscription subscription) {
                ByteBuf requestFrame =
                    RequestFireAndForgetFrameFlyweight.encode(
                        allocator,
                        streamId,
                        false,
                        payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
                        payload.sliceData().retain());
                payload.release();

                sendProcessor.onNext(requestFrame);
              }
            });

sendProcessor is intermediate flux processor. It is used by actual connection and sending request frame to the network may be delayed.

You could add .then(Mono.delay(Duration.ofMillis(100))) before your .block() calling on the client side for test purpose. I think most real applications won't exit immediately after calling fireAndForget.