I have two rsocket-client and one rsocket-server. Both rsocket-client initially subscribe to the broadcast route listenCommand to receive commands from rsocket-server. Then one of the clients sends a command on route executeCommand to send it to other rsocketRequsters besides itself via the previously established route listenCommand connection. That is, I need to send the message to all rsocketRequester excluding myself through the already established connection between client and server. Now I get the error "reactor.core.Execptions$ErrorCallbackNotImplemented: ApplicationErrorExecption (0x201): Requset-Stream not implemented" and the second client does not receive a command from the listenCommand subscription. I've seen that it is suggested to make a client-side handler, but is it possible to solve this problem on the server? Maybe I'm missing something important?

Expected result: the client that sends the command will not receive it over the broadcast connection, but the other client will.

Rscoket-server controller:

private Sinks.Many<String> executedCommandSink = Sinks.many().multicast()
    .directBestEffort();

@MessageMapping("project.{projectId}.command")
public Flux<String> listenCommand(@DestinationVariable String projectId, RSocketRequester requester) {

    return executedCommandSink.asFlux();
}

@MessageMapping("project.{projectId}.command.execute")
public Mono<?> executeCommand(
        @DestinationVariable String projectId, Mono<String> commandMono, RSocketRequester requester) {

    Set<RSocketRequester> requesters = getRequestersForSendCommand(projectId, requester);

    return commandMono.flatMap(command -> {
        sendGraphCommand(requesters, command);
        return Mono.just(command);
    });
}

private void sendCommand(Set<RSocketRequester> requesters, String command) {
    requesters.forEach(requesterToSend -> requesterToSend
        .route("project.{projectId}.command", command)
        .data(Flux.just(command))
        .retrieveFlux(String.class)
        .subscribe());
}

public Set<RSocketRequester> getRequestersForSendGraphCommand(String projectId, RSocketRequester requester) {
    Set<RSocketRequester> sessionsByProject = ConcurrentHashMap.newKeySet();
    allRequesters.forEachEntry(1L, entry -> {
        if (entry.getValue().equals(projectId) && entry.getKey() != requester) {
            sessionsByProject.add(entry.getKey());
        }
    });
    return sessionsByProject;
}

Rsocket-client:

@Autowired
    public RSocketManagerClient(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies) {

        clientUUID = UUID.randomUUID().toString();
        log.info("Connecting using client ID: {}", clientUUID);

        this.rsocketRequester = rsocketRequesterBuilder
                .setupData(clientUUID)
                .rsocketStrategies(strategies)
    .connectWebSocket(URI.create("ws://127.0.0.1:8307/rsocket"))
                .subscribeOn(Schedulers.parallel())
                .block();

        this.rsocketRequester.rsocket()
                .onClose()
                .doFirst(() -> log.info("Client: {} CONNECTED.", clientUUID))
                .doOnError(error -> log.error("Connection to client {} CLOSED", clientUUID))
                .doFinally(consumer -> log.info("Client {} DISCONNECTED", clientUUID))
                .subscribe();
    
public void executeCommand() {
    log.info("\nClient with id-{} subscribe on execute command", clientUUID);

    String block = this.rsocketRequester
            .route("project.db28981b-9ad4-4867-bc2d-11b4522f865c.command.execute")
            .data("Hello")
            .retrieveMono(String.class)
            .doOnNext(System.out::println)
            .block();
    log.info("Response: {}", block);
}

public void subscribeOnCommand() {
    log.info("\nClient with id-{} subscribe on command", clientUUID);

    this.rsocketRequester
            .route("project.db28981b-9ad4-4867-bc2d-11b4522f865c.command")
            .retrieveFlux(String.class)
            .doOnNext(System.out::println)
            .subscribe();
}

0

There are 0 best solutions below