How to call @MessageMapping endpoint with RSocketRequester?

71 Views Asked by At

everyone I try to call @MessageMapping endpoint using RSocketRequester. My controller class which contains endpoint I want to be called is

@RestController
public class SomeController {
    private static final Logger LOG = LoggerFactory.getLogger(SomeController.class);
  
    private TronCache<String, RSocketConsumer> consumersMapWarpper;
    private Map<String, RSocketConsumer> consumersMap;
    private Map<String, CacheMetaInfo> cacheMetaInfoMap = new HashMap();
    private static final String GET_METADATA = "getMetadata";

    public SomeController() {
    }

    @PostConstruct
    public void init() {
        LOG.info("init in SomeController");
    }

    @ConnectMapping({"handshake"})
    public void connect(RSocketRequester requester, @Payload RefDataRequest payload) {
        LOG.info("Client Connection Handshake: [{}]", payload.getCallerName());
        this.validateConnection(payload);
        requester.route("handshake", new Object[0]).data(RefDataResponse.HANDSHAKE.callerName(this.instanceConfig.getPodName())).retrieveMono(ConsumerNotification.class).subscribe((c) -> {
            LOG.info("SOD Status[{}] Consumer[{}]", c.isSodCompleted(), c.getCallerName());
            RSocketConsumer consumer = new RSocketConsumer(payload.getCallerName(), c, requester);
            this.consumersMap.computeIfAbsent(payload.getCallerName(), (v) -> {
                return consumer;
            });
            this.setDisconnectionHandler(consumer);
            this.itrsUpdater.publishAddConsumer(payload.getCallerName());
        });
    }

    @ConnectMapping({"genericConnection"})
    public void connectForProperty(RSocketRequester requester, @Payload RefDataRequest payload) {
        LOG.info("Client Connection : [{}] [{}] ", "Generic [One time] Connection", payload.getCallerName());
        this.validateConnection(payload);
    }

    @MessageMapping ({"test"})
    public Mono<String> getString(String s) {
        LOG.info("test in RefDataController");
        return Mono.just("string");
    }

}

My class in another application which calls endpoint is

public class RSocketConnector {

    private static final Logger LOG = LoggerFactory.getLogger(RSocketRefDataConnector.class);

    private RSocketRequester requester;
    private boolean initialized;
    private final String host;
    private final int port;

    private final String podName;
    private final String environment;
    private final String setupRoute;

 
    private final String encoders;

    private final String decoders;


    public String getHost() {
        return host;
    }


    /**
     * @param host
     * @param port
     * @param podName
     * @param environment
     * @param encoders
     * @param decoders
     */
    private RSocketRefDataConnector(String host,
                                    int port,
                                    String podName,
                                    String environment,
                                    String setupRoute,
                                    String encoders,
                                    String decoders) {
        this.host = host;
        this.port = port;
        this.podName = podName;
        this,setupRoute = setupRoute;
        this.environment = environment;
        this.encoders=StringUtils.defaultIfBlank(encoders,Jackson2CborEncoder.class.getName());
        this.decoders=StringUtils.defaultIfBlank(decoders,Jackson2CborDecoder.class.getName());
    }

  
    public void initialize() {

        RSocketStrategies strategies;
        try {
            strategies = new RSocketConfig().rsocketCustomEncoderDecoder(encoders,decoders);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }

        ClientTransport transport = this.createTransport(host, port);

        requester = RSocketRequester.builder()
                .rsocketConnector(connector -> {
                    connector.reconnect(getRetrySpec())
                            .acceptor(RSocketMessageHandler.responder(strategies, this))
                            .payloadDecoder(PayloadDecoder.ZERO_COPY);
                })
                .dataMimeType(MediaType.APPLICATION_CBOR)
                .setupRoute(this.setupRoute)
                .setupData(RefDataRequest.SIGNAL.handShake(podName, environment, TronRuntimeConfig.hostInfo()))
                .rsocketStrategies(strategies)
                .transport(transport);

        if (!isOneTimeConnection) {
            setDisconnectionHandler();
            //Heartbeat check
            Flux.interval(Duration.ofMinutes(1))
                    .doOnNext(l -> heartbeatCheck())
                    .subscribe();

            initialized = true;
        }
        //calling endpoint
         requester.route("test").data("hello")
                .retrieveMono(String.class)
                .doOnNext(o->LOG.info("inbound message"))
                .onErrorContinue((e,t)->LOG.info("contine on erro"))
                .doOnSuccess(r -> LOG.info("message from test doOnSuccess is " + r))
                .subscribe();
    }

      /**
     * @param host
     * @param port
     * @return
     */
    private ClientTransport createTransport(String host, int port) {
        if (isProtocolWebsocket()) {
            LOG.info("SSL : {}", this.ssl);
            if (this.ssl) {
                LOG.info("Initializing refdata websocket secure connection [{}]", String.format(WSS_URL_PATTERN, host, port));
                return WebsocketClientTransport.create(
                        HttpClient.create()
                                .baseUrl(String.format(WSS_URL_PATTERN, host, port))
                                .secure((s) -> s.sslContext(buildSslContext()))
                        , WS_PATH);
            } else {
                LOG.info("Initializing refdata websocket connection [{}]", String.format(WS_URL_PATTERN, host, port));
                return WebsocketClientTransport.create(URI.create(String.format(WS_URL_PATTERN, host, port)));
            }
        } else {
            LOG.info("Initializing refdata TCP connection [{}:{}]", host, port);
            return TcpClientTransport.create(host, port);
        }
    }

    /**
     * @return
     */
    private SslContext buildSslContext() {

        try {
            KeyManager[] keyManagers = SSLUtils.getKeyManagers(ResourceUtils.getURL(trustStore).openStream(), trustStoreSafeNet, trustStoreSafeNet);
            TrustManager[] trustManagers = InsecureTrustManagerFactory.INSTANCE.getTrustManagers();//SSLUtils.getTrustManagers(ResourceUtils.getURL(trustStore).openStream(), trustStoreSafeNet);

            return SslContextBuilder.forClient()
                    .clientAuth(ClientAuth.REQUIRE)
                    .keyManager(keyManagers[0])
                    .trustManager(trustManagers[0])
                    .build();

        } catch (Exception e) {
            LOG.error("Exception occurred while building SSL Context [{}]", ExceptionUtils.getStackTrace(e));
        }
        return null;
    }

    /**
     *
     */
    public void heartbeatCheck() {
        requester.route(ROUTE_HEARTBEAT)
                .data(heartbeatRequest)
                .retrieveMono(RefDataResponse.class)
                .subscribe(r -> connectionFailed.set(false));
    }

    /**
     * @return
     */
    private Retry getRetrySpec() {
        if (retryLimit == 0;
            return Retry
                    .fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(retryInterval))
                    .scheduler(Schedulers.single())
                    //.doBeforeRetry(e -> connectionFailed.set(true))
                    .doAfterRetry(e -> {
                        LOG.warn("doAfterRetry===>{}", e);
                        this.connectionFailed.set(true);
                    });
        else
            return Retry
                    .fixedDelay(retryLimit, Duration.ofSeconds(retryInterval))
                    .scheduler(Schedulers.single())
                    //.doBeforeRetry(e -> connectionFailed.set(true))
                    .doAfterRetry(e -> {
                        LOG.warn("doAfterRetry===>{}", e);
                        this.connectionFailed.set(true);
                    })
                    .onRetryExhaustedThrow((rb, rs) -> new RefDataException("Unable to establish connection with RefData server. # of retry attempted [" + retryLimit + "]"));
    }

}

I expect to receive "string" after calling "test" endpoint. Now my connection is getting closed without returning anything

0

There are 0 best solutions below