I have created a bucket token based rate limitter in Java and tested it with a single thread and a multi thread approach using Executor Service. When I run with a single thread I get 200 as total requests consumed when I set 20 requests per second as the rate. Same config when I try to run with an multi threaded approach using executors I get 220 as the total requests consumed even though I have used synchronized keyword. I run both the approaches for a total of 10s.
Please find the two classes below:
BucketToken.class
package algo.interviewquestions;
public class BucketToken {
private Integer maxBucketSize;
private Integer tokensAvailable;
private Long refillBucketRate;
private Long nextRefillTime;
private Long totalRequestsReceived;
private Long totalRequestsConsumed;
private Integer bucketRefillCount;
public BucketToken(Integer maxBucketSize, Long refillBucketRate) {
this.maxBucketSize = maxBucketSize;
this.refillBucketRate = refillBucketRate;
this.nextRefillTime = System.currentTimeMillis() + refillBucketRate;
this.tokensAvailable = maxBucketSize;
this.totalRequestsReceived = 0L;
this.totalRequestsConsumed = 0L;
this.bucketRefillCount = 0;
refill();
}
public Long getTotalRequestsConsumed() {
return totalRequestsConsumed;
}
public Integer getTokensAvailable() {
return tokensAvailable;
}
public Integer getBucketRefillCount() {
return bucketRefillCount;
}
public Long getTotalRequestsReceived() {
return totalRequestsReceived;
}
public synchronized boolean tryConsume() {
totalRequestsReceived++;
refill();
if (this.tokensAvailable > 0) {
this.tokensAvailable --;
this.totalRequestsConsumed++;
return true;
}
return false;
}
private synchronized void refill() {
if (System.currentTimeMillis() < this.nextRefillTime) {
return;
}
this.nextRefillTime = System.currentTimeMillis() + this.refillBucketRate;
this.tokensAvailable = Math.max(this.maxBucketSize, this.tokensAvailable);
this.bucketRefillCount++;
}
}
BucketTokenTest.class
package algo.interviewquestions;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
public class BucketTokenTest {
public static void main(String[] args) throws InterruptedException {
BucketToken bucketToken = new BucketToken(20, 1000L);
AtomicInteger totalRequestsConsumed1 = runInMultiThreadedApproach(bucketToken);
BucketToken bucketToken2 = new BucketToken(20, 1000L);
AtomicInteger totalRequestsConsumed2 = runWithSingleThread(bucketToken2);
printRequestsStats(bucketToken, totalRequestsConsumed1);
printRequestsStats(bucketToken2, totalRequestsConsumed2);
}
private static AtomicInteger runWithSingleThread(BucketToken bucketToken) {
AtomicInteger totalRequestsConsumed = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() - startTime < 10000L) {
if (bucketToken.tryConsume()) {
totalRequestsConsumed.incrementAndGet();
System.out.println(Thread.currentThread().getName() + " - Request Accepted");
continue;
}
System.out.println(Thread.currentThread().getName() + " - Request Declined");
}
return totalRequestsConsumed;
}
private static void printRequestsStats(BucketToken bucketToken, AtomicInteger totalRequestsConsumed) {
System.out.println("Total Requests Received = " + bucketToken.getTotalRequestsReceived());
System.out.println("Total Requests Accepted = " + totalRequestsConsumed.get());
System.out.println("Total Requests Accepted = " + bucketToken.getTotalRequestsConsumed());
System.out.println("Bucket refill count = " + bucketToken.getBucketRefillCount());
}
private static AtomicInteger runInMultiThreadedApproach(BucketToken bucketToken) throws InterruptedException {
int threadCount = 5;
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
AtomicInteger totalRequestsConsumed = new AtomicInteger(0);
long startTime = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(threadCount);
IntStream.rangeClosed(1,threadCount).boxed().forEach(t -> executorService.submit(() -> {
while (System.currentTimeMillis() - startTime < 10000L) {
if (bucketToken.tryConsume()) {
totalRequestsConsumed.incrementAndGet();
System.out.println(Thread.currentThread().getName() + " - Request Accepted");
continue;
}
System.out.println(Thread.currentThread().getName() + " - Request Declined");
}
latch.countDown();
}));
long startTime2 = System.currentTimeMillis();
latch.await();
System.out.println("Completed in " + (System.currentTimeMillis() - startTime2) + "ms");
executorService.shutdown();
return totalRequestsConsumed;
}
}
Can someone tell me what change I need to do for handling the concurrent modification or is there something I am missing?
I expect both the approaches single thread and multi-thread to give the same request consumed count in 10s which is 200.
I tried using synchronized keyword on the methods where modification is done but still I am seeing different values.