Bridging messages from eventbus to subscribed websocket clients using VertX

3.4k Views Asked by At

I'm trying to use websocket to create a publish-suscribe model which can be reached through both SockJS and Websocket (for Javascript clients on one side, and for Java clients on the other.

Currently I'm able to subscribe using SockJS and Javascript clients. However, when trying to use websocket to receive messages in my Java client, I am able to connect to the websocket server, but don't receive any messages from the server.

My current client code:

public class WebsocketReceiverIntegrationTest {
    public static void main(String[] args) {
        System.out.println("Started demo WS client");
        HttpClient client = Vertx.vertx().createHttpClient();

        System.out.println("Will now connect to WS server");
        client.websocket(8091, "localhost", "/", webSocket -> {
           webSocket.handler( data -> {
                System.out.print("Received data " + data.toString("ISO-8859-1"));
           });
        });

        System.out.println("Ready to receive data");

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext() && ! scanner.next().equals("q")) {
        }

        System.out.println("Stopped");
        client.close();
    }
}

Relevant bit in my current server code:

@Override
public void start(Future<Void> fut) {
    Router router = Router.router(vertx);

    router.route("/eventbus/*").handler(sockJSEventBusHandler());

    router.route().failureHandler(errorHandler());

    vertx.createHttpServer().websocketHandler(serverWebSocket -> {
        System.out.print("Websocket client connected!");
        vertx.eventBus().consumer(serverWebSocket.textHandlerID());
    }).listen(8091);

    vertx.setPeriodic(1000, id -> {
        vertx.eventBus().send("gameupdates", Json.encodePrettily(tickGameState()));
    });
}

private SockJSHandler sockJSEventBusHandler() {
    BridgeOptions options = new BridgeOptions().addOutboundPermitted(new PermittedOptions().setAddress(GAMEUPDATE_CHANNEL_NAME));
    return SockJSHandler.create(vertx).bridge(options, event -> {
        if (event.type() == BridgeEventType.SOCKET_CREATED) {
            logger.info("A socket was created");
        }
        event.complete(true);
    });
}
2

There are 2 best solutions below

0
On

There are two ways you can handle the request :

  1. Router to route incoming request and reply with the response.
     public void start(Future<Void> fut) {
         Router router = Router.router(vertx);


           BridgeOptions opts = new BridgeOptions()
           .addInboundPermitted(new PermittedOptions().setAddressRegex(".*"))
           .addOutboundPermitted(new  PermittedOptions().setAddressRegex(".*"));


          SockJSHandler ebHandler = SockJSHandler.create(vertx).bridge(opts);
          router.route("/eventbus/*").handler(ebHandler);
          router.route("/eventbus/").handler(this::getApi);
          ...
          router.route().handler(StaticHandler.create());
      }


     private void getApi(RoutingContext routingContext) {
           routingContext.response().setStatusCode(201).putHeader("content-type", "application/json; charset=utf-8")
              .end("ok");
      }

You can implement GET / POST / PUT and DELETE using Router , refer this doc for more info : REST API with Vert.x Web

  1. Another way to create a Tunnel while deploying Vertical
  EventBus eb = vertx.eventBus();

public void start(Future<Void> fut) {
     Router router = Router.router(vertx);


       BridgeOptions opts = new BridgeOptions()
       .addInboundPermitted(new PermittedOptions().setAddressRegex(".*"))
       .addOutboundPermitted(new  PermittedOptions().setAddressRegex(".*"));


      SockJSHandler ebHandler = SockJSHandler.create(vertx).bridge(opts);
      router.route("/eventbus/*").handler(ebHandler);
      router.route("/eventbus/").handler(this::getApi);
      ...
      router.route().handler(StaticHandler.create());

     createListenerTunnel(addressToServer, addressToClient);

  }

private void createListenerTunnel(String addressToServer, String addressToClient) {
eb.consumer(string).handler(message -> {
        String timestamp = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM)
                .format(Date.from(Instant.now()));

        eb.publish("local-message-receiver", timestamp  + ":" + message.body());


    });
}

Where eb is an EventBus object referring the same port for deployed vertical.

You can refer this example .

0
On

Note that the SockJS protocol supports connections from regular websocket clients. For example, if you have the SockJSHandler registered at /api/realtime, then you should be able to connect to /api/realtime/websocket with your websocket client.