How to configure Rate Limiting on Kafka Topic while publishing message?

2.7k Views Asked by At

We are using Kafka to process events. We have 2 use cases which resulting more events.

  1. Sometimes we are getting lot of bot traffic.
  2. Some of our customers are generating more traffic frequently.

This is causing delay in processing other customers events.

So I want to add some rate limiting based on the customer. As per my research there is no way to add rate limiting while publishing message to Topic.

Our application is developed on Micronaut framework.

Is there a way to add limits at consumer level?

Is there any java related frameworks on top of Kafka to implement rate limiting ?

3

There are 3 best solutions below

4
On

You are asking about rate limiting, but what you are trying to achieve is prioritizing processing of data, don't you? This is not an easy thing, because it really depends on your flow, but I'd suggest to consider couple things:

  1. What is the bottleneck? Is it a consumer? Where is the back pressure coming from?
  2. Do you produce to the same topic? Maybe you want to split traffic to multiple topics?
  3. I've worked on a bit similar product and used Flink, that provides ability to configure parallelism per source (or route), which provides QoS for main data flow and in case that lower priority has spikes in incoming data it would cause back pressure for its processing, but without affect on higher priority flow.
0
On

Request throttling can be controlled with quotas on the broker, which the clients then internally understand without changing code (other than adding authentication to specify client groups).

0
On

Per-customer limits would be challenging to implement with kafka quotas. Instead, you might want to check out resilience4j-micronaut and do this limiting on the producer side.

You can use a RateLimiterRegistry to create and retrieve RateLimiter instances:

RateLimiterConfig config = RateLimiterConfig.custom()
  .limitRefreshPeriod(Duration.ofMillis(1))
  .limitForPeriod(10)
  .timeoutDuration(Duration.ofMillis(25))
  .build();

// Create registry
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);

// Use registry
RateLimiter rateLimiterWithDefaultConfig = rateLimiterRegistry
  .rateLimiter("name1");

RateLimiter rateLimiterWithCustomConfig = rateLimiterRegistry
  .rateLimiter("name2", config);

You can decorate any Callable, Supplier, Runnable, Consumer, CheckedRunnable, CheckedSupplier, CheckedConsumer or CompletionStage with a RateLimiter:

// Decorate your call to BackendService.doSomething()
CheckedRunnable restrictedCall = RateLimiter
    .decorateCheckedRunnable(rateLimiter, backendService::doSomething);

Try.run(restrictedCall)
    .andThenTry(restrictedCall)
    .onFailure((RequestNotPermitted throwable) -> LOG.info("Wait before call it again :)"));

For per-customer or per-user throttling, you would need an instance of RateLimiter per entity to be limited:

LimiterManager limiterManager = new LimiterManager();

String customerNameUnique = "Acme123"; // // Get from current client request
final RateLimiter rateLimiter = limiterManager.getLimiter(customerNameUnique);

Runnable runnable = RateLimiter.decorateRunnable(rateLimiter, new Runnable() {

    @Override
    public void run() {
        // TODO: Your code here, publishing events to kafka topic
    }
});

Try.runRunnable(runnable).onFailure(
        error -> System.out.print(error)
);

// Use a LimiterManager utility class to create / retrieve per-customer instances of RateLimiter
public static class LimiterManager {
    
    final ConcurrentMap<String, RateLimiter> keyRateLimiters = new ConcurrentHashMap<String, RateLimiter>();

    final RateLimiterConfig rateLimiterConfig = RateLimiterConfig.custom().timeoutDuration(Duration.ofMillis(100))
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(25) // Max 25 accesses per 1 second
            .build();

    public RateLimiter getLimiter(String entity) {
        return keyRateLimiters.compute(entity, (key, limiter) -> {
                return (limiter == null) ? RateLimiter.of(entity, rateLimiterConfig) : limiter;
        });
    }
}