Vert.x "Cannot assign requested address" errors caused by high throughput

116 Views Asked by At

I"m doing a stress test on vert.x application and send ~10K RPS. My application send an http async request from a dedicated verticle. I"m using vert.x http client, and see that around 20 seconds my application sent the http requests successfully. After 20 seconds i"m starting to get a lot of "Cannot assign requested address" errors.

I tried to deploy more verticles, to set different values to the http client thread pool and nothing helped to solve the issue. I guess that the issue related to the high throughput in a short time around 1 minute.

Main Class:

 public static void main(String[] args) {

final VertxOptions vertxOptions = new VertxOptions()
  .setMaxEventLoopExecuteTime(1)
  .setMaxEventLoopExecuteTimeUnit(TimeUnit.MILLISECONDS);

final Vertx vertx = Vertx.vertx(vertxOptions);

final Injector injector = Guice.createInjector(new Dependencies(vertx));

CustomCodecRegister.register(vertx.eventBus());

final Stream<Future<String>> deploymentFutures = Stream.of(
        deployWorker(vertx, injector, StatsHttpVerticle.class, 10)
).flatMap(stream -> stream);

CompositeFuture.all(deploymentFutures.collect(Collectors.toList()))
        .onSuccess(successfulCompositeFuture -> { });}

private static <T> Stream<Future<String>> deployWorker(Vertx vertx, Injector injector, Class<T> workerVerticleClass, int numVerticles) {
    final String poolName = workerVerticleClass.getSimpleName()
            .toLowerCase()
            .replace("verticle", "-worker-pool");
    final int numberOfThreads = 50;

    final DeploymentOptions options = new DeploymentOptions()
            .setWorker(true)
            .setWorkerPoolName(poolName)
            .setWorkerPoolSize(numberOfThreads);

    return IntStream.range(0, numVerticles)
            .mapToObj(ignore -> Future.future((Promise<String> promise) ->
                    vertx.deployVerticle((Verticle) injector.getInstance(workerVerticleClass), options, promise)));
}

EventBusAdapter:

  public void send(Map<String, Object> queryParams, HashMap<String, String> headers, boolean followRedirect, Event eventToFire) {
    StatsRequest statsRequest = new StatsRequest(queryParams, headers, eventToFire, followRedirect);
    eventBus.request(FIRE_GET_METRIC_TO_STATS,statsRequest);
  }

WorkerVerticle:

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    vertx.eventBus().consumer(FIRE_GET_METRIC_TO_STATS, this::fire);
    startPromise.complete();
  }

  private void fire(Message<StatsRequest> message) {
    StatsRequest body = message.body();
    MultiMap multimapHeader = MultiMap.caseInsensitiveMultiMap();

    WebClientOptions webClientOptions = new WebClientOptions();
    webClientOptions.setMaxPoolSize(1000);
    WebClient httpClient = WebClient.create(vertx, webClientOptions);

    httpClient.request(HttpMethod.GET, port, "example.com", "/1x1.gif" + "?" + "queryParamsString")
            .followRedirects(false)
            .putHeaders(multimapHeader)
            .timeout(120000)
            .send()
            .onSuccess(response -> {
              logger.info("All good");
            })
            .onFailure(err -> {
              logger.error("Exception: " + err.getMessage());
            });
  }

How can i solve this issue?

0

There are 0 best solutions below