How to execute in parallel the Kafka consumer and router handlers in a Vertx Java application?

350 Views Asked by At

I have a Java application that uses the Vertx framework and Kafka.

There is a Kafka consumer with a handler containing my custom code and a Vertx router that simply returns an "OK" to the requests.

While consuming messages from the topic, it happens that the router does not return "OK" immediately, but the request is pending for some seconds (even minutes) before returning the response. This happens specifically when there are a lot of messages on the topic (e.g., 30K messages). I highlight that every single execution of the consumer lasts less than one second.

How can I consume the messages without blocking the Vertx route so that I can immediately receive the response from the route while the Kafka consumer is running?

It is important that the executions of the Kafka consumer must go sequentially between them (e.g., the execution of the consumer must start before the previous one is finished). I just want that the consumer execution and the router execution can run in parallel.

Here is the Vertx router:

  private Router createRouter() {
    final Router router = Router.router(vertx);
    router.get("/healthcheck").handler(rc -> rc.response().end("OK"));
    return router;
  }

Here is the Kafka consumer:

  private void createKafkaConsumer() {
    KafkaConfiguration kafkaConfig = new KafkaConfiguration(config());
    consumer = KafkaConsumer.create(vertx, kafkaConfig.kafkaConsumerConfig());
    consumer.exceptionHandler(event -> log.error(event.getMessage()));

    consumer.handler(record -> {
        try {

           // Here is my consumer function that interacts with a database

        } catch (Exception e) {
          log.warn("Something is wrong: ",e);
        }
        consumer.commit();
    });

EDIT:

Consumer Class:

public class ConsumerVerticle extends CustomAbstractVerticle {
  @Override
  public void start(Promise<Void> promise) {
    createKafkaConsumer(); // function above
  }
}

Web Service class:

public class WebServiceVerticle extends CustomAbstractVerticle {
    @Override
    public void start(Promise<Void> promise) {
      createAnHttpServer(vertx, createRouter(), config(), promise); // createRouter() is the function above
    }
}

Main Class:

public class MainVerticle extends CustomVerticle {

  private Future<Void> deployAllVerticles(Vertx vertx, JsonObject config, Class[] classes) {
    final Promise<Void> promise = Promise.promise();
    List<Future> futureList = new ArrayList<>();
    for (Class verticleClass : classes) {
      futureList.add(deployVerticle(vertx, verticleClass));
    }
  }

  private Future<String> deployVerticle(Vertx vertx, Class clazz) {
    return Future.future(promise -> vertx.deployVerticle(clazz.getName(), promise));
  }

  @Override
  public void start() {

    vertx = Vertx.vertx(new VertxOptions()
           .setMaxEventLoopExecuteTime(TimeUnit.SECONDS.toNanos(5))
    deployAllVerticles(vertx, new Class[] {ConsumerVerticle.class, WebServiceVerticle.class});
  }
}
0

There are 0 best solutions below