LMAX Disruptor Producer Incorrectly Wraparound + Overwrite Before Consumer Completes Read

796 Views Asked by At

I am recently introduced to the LMAX Disruptor and decided to give it a try. Thanks to the developers, the setup was quick and hassle free. But I think I am running into an issue if someone can help me with it.

The issue: I was told that when the producer publish the event, it should block until the consumer had a chance to retrieve it before wrapping around. I have a sequence barrier on the consumer side and I can confirm that if there is no data published by the producer, the consumer's waitFor call will block. But, producer doesn't seem to be regulated in any way and will just wraparound and overwrite unprocessed data in the ring buffer.

I have a producer as a runnable object running on separate thread.

public class Producer implements Runnable {
    private final RingBuffer<Event> ringbuffer;
    public Producer(RingBuffer<Event> rb) {
        ringbuffer = rb;
    }
    public void run() {
           long next = 0L;
           while(true) {
               try {
                   next = ringbuffer.next();
                   Event e = ringbuffer.get(next);
                   ... do stuff...
                   e.set(... stuff...);
               }
               finally {
                   ringbuffer.publish(next);
               }
           }
    }
}

I have a consumer running on the main thread.

public class Consumer {
     private final ExecutorService exec;
     private final Disruptor<Event> disruptor;
     private final RingBuffer<Event> ringbuffer;
     private final SequenceBarrier seqbar;
     private long seq = 0L;

     public Consumer() {
         exec = Executors.newCachedThreadPool();
         disruptor = new Disruptor<>(Event.EVENT_FACTORY, 1024, Executors.defaultThreadFactory());
         ringbuffer = disruptor.start();
         seqbar = ringbuffer.newBarrier();

         Producer producer = new Producer(ringbuffer);
         exec.submit(producer);
    }

    public Data getData() {
         seqbar.waitFor(seq);
         Event e = ringbuffer.get(seq);
         seq++;
         return e.get();
    }
}

Finally, I run the code like so:

public class DisruptorTest {
     public static void main(String[] args){
         Consumer c = new Consumer();
         while (true) {
             c.getData();
             ... Do stuff ...
         }
}
1

There are 1 best solutions below

1
On

You need to add a gating sequence (com.lmax.disruptor.Sequence) to the ringBuffer, this sequence must be updated on what point your consumer is.

You can implement your event handling with EventHandler interface and using the provided BatchEventProcessor(com.lmax.disruptor.BatchEventProcessor.BatchEventProcessor) which comes with builtin sequence

Here's a fully working example

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.dsl.Disruptor;

public class Main {

   static class Event {

      int id;
   }

   static class Producer implements Runnable {

      private final RingBuffer<Event> ringbuffer;

      public Producer(RingBuffer<Event> rb) {
         ringbuffer = rb;
      }

      @Override
      public void run() {
         long next = 0L;
         int id = 0;
         while (true) {
            try {
               next = ringbuffer.next();
               Event e = ringbuffer.get(next);
               e.id = id++;
            } finally {
               ringbuffer.publish(next);
            }
         }
      }
   }

   static class Consumer {

      private final ExecutorService exec;
      private final Disruptor<Event> disruptor;
      private final RingBuffer<Event> ringbuffer;
      private final SequenceBarrier seqbar;
      private BatchEventProcessor<Event> processor;

      public Consumer() {
         exec = Executors.newCachedThreadPool();
         disruptor = new Disruptor<>(() -> new Event(), 1024, Executors.defaultThreadFactory());
         ringbuffer = disruptor.start();
         seqbar = ringbuffer.newBarrier();

         processor = new BatchEventProcessor<Main.Event>(
               ringbuffer, seqbar, new Handler());
         ringbuffer.addGatingSequences(processor.getSequence());

         Producer producer = new Producer(ringbuffer);
         exec.submit(producer);
      }
   }

   static class Handler implements EventHandler<Event> {

      @Override
      public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {
         System.out.println("Handling event " + event.id);
      }

   }

   public static void main(String[] args) throws Exception {

      Consumer c = new Consumer();
      while (true) {
         c.processor.run();
      }
   }
}