LMAX Disruptor: Must EventHandler clone object received from EventHandler#onEvent

1k Views Asked by At

I have an application with Many Producers and consumers.

From my understanding, RingBuffer creates objects at start of RingBuffer init and you then copy object when you publish in Ring and get them from it in EventHandler.

My application LogHandler buffers received events in a List to send it in Batch mode further once the list has reached a certain size. So EventHandler#onEvent puts the received object in the list , once it has reached the size , it sends it in RMI to a server and clears it.

My question, is do I need to clone the object before I put in list, as I understand, once consumed they can be reused ?

Do I need to synchronize access to the list in my EventHandler#onEvent ?

3

There are 3 best solutions below

2
On BEST ANSWER

Yes - your understanding is correct. You copy your values in and out of the ringbuffer slots.

I would suggest that yes you clone the values as you extract it from the ring buffer and into your event handler list; otherwise the slot can be reused.

You should not need to synchronise access to the list as long as it is a private member variable of your Event Handler and you only have one event handler instance per thread. If you have multiple event handlers adding to the same (eg static) List instance then you would need synchronisation.


Clarification:

Be sure to read the background in OzgurH's comments below. If you stick to using the endOfBatch flag on disruptor and use that to decide the size of your batch, you do not have to copy objects out of the list. If you are using your own accumulation strategy (such as size - as per the question), then you should clone objects out as the slot could be reused before you have had the chance to send.

Also worth noting that if you are needing to synchronize on the list instance, then you have missed a big opportunity with disruptor and will destroy your performance anyway.

1
On

It is possible to use slots in the Disruptor's RingBuffer (including ones containing a List) without cloning/copying values. This may be a preferable solution for you depending on whether you are worried about garbage creation, and whether you actually need to be concerned about concurrent updates to the objects being placed in the RingBuffer. If all the objects being placed in the slot's list are immutable, or if they are only being updated/read by a single thread at a time (a precondition which the Disruptor is often used to enforce), there will be nothing gained from cloning them as they are already immune to data races.

On the subject of batching, note that the Disruptor framework itself provides a mechanism for taking items from the RingBuffer in batches in your EventHandler threads. This is approach is fully thread-safe and lock-free, and could yield better performance by making your memory access patterns more predictable to the CPU.

0
On

No, you are not right.

Simple answer:

  • you do not have to clone anything
  • you do not have to synchronize anything

Concept of LMAX disruptor is that you have pre-allocated list of events. When publisher wants to publish, it will ask for next "FREE" instance from ring buffer, i.e.:

        final long sequenceId = ringBuffer.next();
        final SomeEvent serviceEvent = ringBuffer.get(sequenceId);

Than you do need to pass to given instance your request. So, instead of creating new request/event instance, you (against OOP approach), you will need to use existing instance and set it to your needs. For example:

        serviceEvent.requestCommandCreateNewUser("banán", "jahodový");

You have to take care in implementation of given method, to not create new instances (to be aligned with disruptor philosophy). You will for example just copy parameters to pre-allocated private fields.

Than you mark your instance as "ready to process", using publish call with sequence id:

        ringBuffer.publish(sequenceId);

After this, you should not access serviceEvent instance anymore. At least not as publisher...

On the other side, you will get given instance to processing using handler. Next code is constructing part:

        final Disruptor<SomeEvent> disruptor
                = new Disruptor<>(
                SomeEvent.EVENT_FACTORY,
                150, // we would not like to hit pathological mapping ( https://en.algorithmica.org/hpc/cpu-cache/associativity/#pathological-mappings )
                threadFactory,
                ProducerType.SINGLE,
                waitStrategy);
        disruptor.handleEventsWith(new BatchingEventHandler());
        ringBuffer = disruptor.start();

This is just set-up. Actual processing of events is in BatchingEventHandler. There can be also batching. Smart-batching is perhaps best approach.

Smartbatching just takes what is prepared in ring buffer and makes batch. Does not matter if single or 10 events are processed in batch. Meanwhile incomming requests are being put to ringbuffer. When actual batch is finished, algorithm will again take anything, what is prepared in ringbuffer and process all events in batch...

Have a look at BatchingEventHandler class:

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.Sequence;

import java.util.ArrayList;

public class BatchingEventHandler implements EventHandler<UnionedServiceEvent> {
    /**
     * Maximum events to process at once. This is upper cap for smart batch size.
     */
    public static final int MAXIMUM_BATCH_SIZE = 20;
    private Sequence sequenceCallback;
    private final ArrayList<UnionedServiceEvent> internalBatch = new ArrayList(25);
    //private int currentBatchRemaining = 20;

    @Override
    public void setSequenceCallback(final Sequence sequenceCallback) {
        this.sequenceCallback = sequenceCallback;
    }

    @Override
    public void onEvent(final UnionedServiceEvent event, final long sequence, final boolean endOfBatch) {
        this.internalBatch.add(event);

        boolean logicalChunkOfWorkComplete = isLogicalChunkOfWorkComplete(endOfBatch);
        if (logicalChunkOfWorkComplete) {
            // mark given instances as "free". From now, we can not use any older UnionedServiceEvent instances!
            // We have forgotten them in isLogicalChunkOfWorkComplete method
            sequenceCallback.set(sequence);
        }
    }

    private boolean isLogicalChunkOfWorkComplete(boolean forceEnd) {
        // Consider, what is big batch and do test for large batch when I/O is involved
        boolean internalBatchMaximumSizeReached = this.internalBatch.size() >= MAXIMUM_BATCH_SIZE;
        if(internalBatchMaximumSizeReached || forceEnd) {
            // DO actual I/O operation with all instances.
            System.out.println("Processed %s events in single batch.".formatted(this.internalBatch.size()));
            this.internalBatch.clear();
        }
        return internalBatchMaximumSizeReached;
    }
}

For minimal info, you can have a look at https://www.baeldung.com/lmax-disruptor-concurrency for example.

For documentation, please take a time and read https://lmax-exchange.github.io/disruptor/user-guide/index.html. At least part "Dealing With Large Batches", just before end of the document.