I have created a bucket token based rate limitter in Java and tested it with a single thread and a multi thread approach using Executor Service. When I run with a single thread I get 200 as total requests consumed when I set 20 requests per second as the rate. Same config when I try to run with an multi threaded approach using executors I get 220 as the total requests consumed even though I have used synchronized keyword. I run both the approaches for a total of 10s.

Please find the two classes below:

BucketToken.class

package algo.interviewquestions;

public class BucketToken {
    private Integer maxBucketSize;
    private Integer tokensAvailable;
    private Long refillBucketRate;
    private Long nextRefillTime;
    private Long totalRequestsReceived;
    private Long totalRequestsConsumed;
    private Integer bucketRefillCount;

    public BucketToken(Integer maxBucketSize, Long refillBucketRate) {
        this.maxBucketSize = maxBucketSize;
        this.refillBucketRate = refillBucketRate;
        this.nextRefillTime = System.currentTimeMillis() + refillBucketRate;
        this.tokensAvailable = maxBucketSize;
        this.totalRequestsReceived = 0L;
        this.totalRequestsConsumed = 0L;
        this.bucketRefillCount = 0;
        refill();
    }

    public Long getTotalRequestsConsumed() {
        return totalRequestsConsumed;
    }

    public Integer getTokensAvailable() {
        return tokensAvailable;
    }

    public Integer getBucketRefillCount() {
        return bucketRefillCount;
    }

    public Long getTotalRequestsReceived() {
        return totalRequestsReceived;
    }

     public synchronized boolean tryConsume() {
        totalRequestsReceived++;
        refill();
        if (this.tokensAvailable > 0) {
            this.tokensAvailable --;
            this.totalRequestsConsumed++;
            return true;
        }
        return false;
    }

    private synchronized void refill() {
        if (System.currentTimeMillis() < this.nextRefillTime) {
            return;
        }
        this.nextRefillTime = System.currentTimeMillis() + this.refillBucketRate;
        this.tokensAvailable = Math.max(this.maxBucketSize, this.tokensAvailable);
        this.bucketRefillCount++;
    }

}

BucketTokenTest.class

package algo.interviewquestions;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class BucketTokenTest {
    public static void main(String[] args) throws InterruptedException {
        BucketToken bucketToken = new BucketToken(20, 1000L);
        AtomicInteger totalRequestsConsumed1 = runInMultiThreadedApproach(bucketToken);

        BucketToken bucketToken2 = new BucketToken(20, 1000L);
        AtomicInteger totalRequestsConsumed2 = runWithSingleThread(bucketToken2);

        printRequestsStats(bucketToken, totalRequestsConsumed1);
        printRequestsStats(bucketToken2, totalRequestsConsumed2);
    }

    private static AtomicInteger runWithSingleThread(BucketToken bucketToken) {
        AtomicInteger totalRequestsConsumed = new AtomicInteger(0);
        long startTime = System.currentTimeMillis();
        while (System.currentTimeMillis() - startTime < 10000L) {
            if (bucketToken.tryConsume()) {
                totalRequestsConsumed.incrementAndGet();
                System.out.println(Thread.currentThread().getName() + " - Request Accepted");
                continue;
            }
            System.out.println(Thread.currentThread().getName() + " - Request Declined");
        }
        return totalRequestsConsumed;
    }

    private static void printRequestsStats(BucketToken bucketToken, AtomicInteger totalRequestsConsumed) {
        System.out.println("Total Requests Received = " + bucketToken.getTotalRequestsReceived());
        System.out.println("Total Requests Accepted = " + totalRequestsConsumed.get());
        System.out.println("Total Requests Accepted = " + bucketToken.getTotalRequestsConsumed());
        System.out.println("Bucket refill count = " + bucketToken.getBucketRefillCount());
    }

    private static AtomicInteger runInMultiThreadedApproach(BucketToken bucketToken) throws InterruptedException {
        int threadCount = 5;
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        AtomicInteger totalRequestsConsumed = new AtomicInteger(0);
        long startTime = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(threadCount);
        IntStream.rangeClosed(1,threadCount).boxed().forEach(t -> executorService.submit(() -> {
            while (System.currentTimeMillis() - startTime < 10000L) {
                if (bucketToken.tryConsume()) {
                    totalRequestsConsumed.incrementAndGet();
                    System.out.println(Thread.currentThread().getName() + " - Request Accepted");
                    continue;
                }
                System.out.println(Thread.currentThread().getName() + " - Request Declined");
            }
            latch.countDown();
        }));
        long startTime2 = System.currentTimeMillis();
        latch.await();
        System.out.println("Completed in " + (System.currentTimeMillis() - startTime2) + "ms");
        executorService.shutdown();
        return totalRequestsConsumed;
    }
}

Can someone tell me what change I need to do for handling the concurrent modification or is there something I am missing?

I expect both the approaches single thread and multi-thread to give the same request consumed count in 10s which is 200.

I tried using synchronized keyword on the methods where modification is done but still I am seeing different values.

0

There are 0 best solutions below