Uni wait for Vertx eventBus message

851 Views Asked by At

I have two endpoints:

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    @Path("/waitForEvent")
    public Uni<Object> waitForEvent() {
        return Uni.createFrom().emitter(em -> {
            //wait for event from eventBus
    //            eventBus.consumer("test", msg -> {
    //                System.out.printf("receive event: %s\n", msg.body());
    //                em.complete(msg);
    //            });
        }).ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
    }
    
    @GET
    @Path("/send")
    public void test() {
        System.out.println("send event");
        eventBus.send("test", "send test event");
    }

The waitForEvent() should only complete if it receives the event from the eventBus. How can I achieve this using vertx and mutiny?

1

There are 1 best solutions below

0
On BEST ANSWER

In general, we avoid that kind of pattern and use the request/reply mechanism from the event bus:

@GET
@Path("/send")
public Uni<String> test() {
   return bus.<String>request("test", name)        
        .onItem().transform(Message::body)
        .ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));
}

When implementing with two endpoints (as in the question), it can become a bit more complicated as if you have multiple calls to the /waitForEvent endpoint, you need to be sure that every "consumer" get the message.

It is still possible, but would will need something like this:

@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/waitForEvent")
public Uni<String> waitForEvent() {
  return Uni.createFrom().emitter(emitter -> {
    MessageConsumer<String> consumer = bus.consumer("test");
      consumer.handler(m -> {
      emitter.complete(m.body());
      consumer.unregisterAndForget();
   })
        .ifNoItem().after(Duration.ofSeconds(5)).failWith(new RuntimeException("timeout"));

}
    
@GET
@Path("/send")
public void test() {
  bus.publish("test", "send test event");
}

Be sure to use the io.vertx.mutiny.core.eventbus.EventBus variant of the event bus.