Vertx event bus reply handler not getting invoked for a specific call

1.1k Views Asked by At

I'm running a local instance of vertx. Router redirects my requests to a worker verticle which has the following handler:

protected Handler<Message<JsonObject>> handler1() {
    return msg -> {
        final RequestBody req = mapper.readValue(msg.body().encode(), RequestBody.class);
        processRequest(req, msg);
    }
}

The processRequest function accepts the request body, makes calls to two external services, aggregates the responses, and returns back to the client.

private processRequest(RequestBody req, Message<JsonObject> msg) {
    CompletableFuture<String> service1Response = getService1ResponseAsync(req); // Make async call to service 1
    String service2Response = getService2ResponseSync(req); // Make sync call to service 2

    ResponseBody response = aggregateResult(service1Response.join(), service2Response);  // Tag1
    msg.reply(mapper.writeValueAsString(response));
}

private CompletableFuture<String> getService1ResponseAsync(RequestBody req) {
    CompletableFuture<String> result = new CompletableFuture();
    // Below handler call makes GET call to service 1 using apache HTTP client and returns the response
    vertx.eventBus().request("serviceVerticleAddr1", mapper.writeValueAsString(req), new DeliveryOptions(), reply -> { // Tag2
        if (reply.succeeded())
            result.complete(reply.result().body().toString());
        else
            result.completeExceptionally(result.cause());
    }
}

When I hit the above API, my request times out. The thread from worker pool assigned for the execution of my request gets blocked at Tag1 forever. On debugging a little further, I found that the reply handler for the call in Tag2 is not getting invoked.

The handlers in service verticle (serviceVerticleAddr1) [i.e. Tag2] returns proper response for other APIs making use of it, but for me it's getting blocked. Can someone please help me identify the cause? Is there some kind of deadlock being formed when the thread calling vertx.eventBus().request [Tag2] starts to wait for future completion at service1Response.join() [Tag1]?

1

There are 1 best solutions below

0
On

I think is blocked because of that sender is not being notified by the consumer that message has been processed. I would recommend you to check inside the handler block registered for the consumer of serviceVerticleAddr1 address and ensure that is replying (notifying) to the sender that the requested message has successfully handled (or not). Consumer might look like

vertx.eventBus().consumer("serviceVerticleAddr1", message -> {
  try{
     doSomething();
     message.reply("done");
  } catch(){
     message.fail(0, "fails");
  }
});

This way, sender's async handler would be notified that consumer could process requested message