LMAX Disruptor SPSC - 6 million ops per second

585 Views Asked by At

With Disruptor ring buffer, I am able to acheive only 6 million ops per second. I am wondering where I am going wrong. My event handler just incerments a counter. This is with Single Producer and Single Consumer. Can someone tell me if I am wrong with the semantics itself. The program creates a producer thread which adds to the buffer. And creates an event handler to handle the publish event. Each time an event is published, the eventhandler increments a volatile counter.

public class MainClass{

   public static class globalVariables
   {
        static int NUMBER_OF_ITERATIONS = 33554432; // 2 power 25     
        static int NUMBER_OF_THREADS;
        static int RING_SIZE = NUMBER_OF_ITERATIONS;
        static int WRITE_CODE = 1;

        static volatile int keep_going = 1; 
   };

   public void start_execution()
   {            
        int remainder = globalVariables.NUMBER_OF_ITERATIONS % globalVariables.NUMBER_OF_THREADS;               
        int iterations_per_thread = ( globalVariables.NUMBER_OF_ITERATIONS - remainder ) / globalVariables.NUMBER_OF_THREADS ;


      /* New Shared Object */
      final sharedObject newSharedObject = new sharedObject();


    ExecutorService exec = Executors.newFixedThreadPool(1);
      Disruptor<valueEvent> disruptor = new Disruptor<valueEvent>( valueEvent.EVENT_FACTORY, globalVariables.RING_SIZE, exec );


      /* Creating event handler whenever an item is published in the queue */

           final EventHandler<valueEvent> handler = new EventHandler<valueEvent>()
           {
              public void onEvent(final valueEvent event, final long sequence, 
                          final boolean endOfBatch) throws Exception
              {         
                 newSharedObject.shared_variable++; // increment the shared variable
              }
           };


          /* Use the above handler to handler events */
          disruptor.handleEventsWith(handler);


     /* start Disruptor */
     final RingBuffer<valueEvent> ringBuffer = disruptor.start();


     final long[] runtime = new long [globalVariables.NUMBER_OF_THREADS];
     /* Code the producer thread */
     final class ProducerThread extends Thread {
        int i;

        public ProducerThread( int i )
        {
            this.i = i;
        }

        public void run()
        {
           long idle_counter = 0;
           long count;

           System.out.println("In thread "+i );

           long startTime = System.nanoTime();

           //while( globalVariables.keep_going == 1 )
           for( int counter=0; counter<globalVariables.NUMBER_OF_ITERATIONS; counter++ )
           {
              // Publishers claim events in sequence
              long sequence = ringBuffer.next();
              valueEvent event = ringBuffer.get(sequence);

              event.setValue(globalVariables.WRITE_CODE); 

              // make the event available to EventProcessors
              ringBuffer.publish(sequence);  
           }

           long stopTime = System.nanoTime();
           runtime[i] = (stopTime - startTime)/1000; 
        }
     };

     /* ------------------------------------------------------------------------------- */     
     //final class AlarmHandler extends TimerTask {     
          /*** Implements TimerTask's abstract run method.   */
    //    @Override public void run(){
    //      globalVariables.keep_going = 0;
    //    }
   //  };

     /* ------------------------------------------------------------------------------- */
     /* Creating Producer threads */
     ProducerThread[] threads = new ProducerThread[globalVariables.NUMBER_OF_THREADS];
     for (int i = 0; i < globalVariables.NUMBER_OF_THREADS; i++) {
        threads[i] = new ProducerThread( i );
        threads[i].start();
     }

     // Waiting for the threads to finish
     for (int i = 0; i < globalVariables.NUMBER_OF_THREADS; i++) {
        try
        {
         threads[i].join();
        } catch ( InterruptedException e ) { System.out.println("hi exception :)"); }
     } 

     /* shutdown */     
     disruptor.shutdown();
     exec.shutdown();

     /* Printing Statistics */
     System.out.println( "Shared Variable: " + newSharedObject.shared_variable );
     for ( int i=0; i<globalVariables.NUMBER_OF_THREADS; i++ )
     {
         System.out.println("Runtime="+ runtime[i] + "; Operations per second = " + (globalVariables.NUMBER_OF_ITERATIONS  / runtime[i] )*1000000 +"ops/sec" );         
     }     

   }

    public static void main( String args[] )
    {
        globalVariables.NUMBER_OF_THREADS = Integer.parseInt( args[0] );    

        System.out.println( "Number of Threads = "+ globalVariables.NUMBER_OF_THREADS );

        MainClass mainObj = new MainClass();
        mainObj.start_execution();
        System.exit(0);
    }
};

Here is the output of the program

Shared Variable: 33554432; Runtime=5094139 microseconds; Operations per second = 6000000

Any help would be much appreciated.

1

There are 1 best solutions below

3
On

Since you are running the event handler in a single thread, and not sharing that state, you should get significantly better performance (but still correct function), by having the event handler work on a non-volatile field. The disruptor ensures that only one event is processed by your handler at a time so you do not need to worry about losing increments.

If another component in the system is reliant on this value appearing in a specific order (eg: it is a control value), then you should consider using something like AtomicInteger with lazySet [1].

[1] http://psy-lob-saw.blogspot.com.au/2012/12/atomiclazyset-is-performance-win-for.html