How to wait for all Disruptor messages to be consumed in a test case

39 Views Asked by At

I'm trying to unit test an application which publishes/consumes messages using the Disruptor ring buffer. In the test I would like to

  1. Configure the Disruptor and it's EventHandlers
  2. Publish messages onto the ring buffer
  3. Wait for all messages to be consumed by all EventHandlers
  4. Perform some assertions

Is there a simple way that I can wait for the messages to be consumed (step 3 above)?

1

There are 1 best solutions below

0
lance-java On

I ended up solving this with a loop which completes when RingBuffer.remainingCapacity() is equal to Disruptor.getBufferSize()

public class MyTest {
    private Disruptor<MyEvent> disruptor;
  
    @BeforeEach
    public void beforeEach() {
        disruptor = new Disruptor<>(...);
        disruptor.handleEventsWith(...);
        disruptor.start();
    }

    @AfterEach
    public void afterEach() {
        disruptor.shutdown();
    }

    @Test
    public void myTest() throws Exception {
        disruptor.publishEvent(...);
        disruptor.publishEvent(...);

        awaitRingBuffer();
        
        assertThat(...);
    }
 
    private void awaitRingBuffer() throws InterruptedException {
        RingBuffer<MyEvent> ringBuffer = disruptor.getRingBuffer();
        while (ringBuffer.remainingCapacity() != disruptor.getBufferSize()) {
            Thread.sleep(10);
        }
    }
}