I am trying to implement disruptor in Lmax architecture .
As you know in lmax architecture we have a ring buffer for creating a queue to process data .
Here you can see the structure of that :
I have implemented this structure in python as you can see here :
import multiprocessing
class CircularBuffer(object):
def __init__(self, max_size=10):
"""Initialize the CircularBuffer with a max_size if set, otherwise
max_size will elementsdefault to 10"""
self.buffer = [None] * max_size
self.blconsumer = 0
self.receiver = 0
self.journalerPointer=0
self.replicatorPointer=0
self.unmarshallerPointer=0
self.max_size = max_size
def __str__(self):
"""Return a formatted string representation of this CircularBuffer."""
items = ['{!r}'.format(item) for item in self.buffer]
return '[' + ', '.join(items) + ']'
def size(self):
"""Return the size of the CircularBuffer
Runtime: O(1) Space: O(1)"""
if self.receiver >= self.blconsumer:
return self.receiver - self.blconsumer
return self.max_size - self.head - self.receiver
def is_empty(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == self.blconsumer
def is_replicator_after_receiver(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == (self.replicatorPointer-1) % self.max_size
def is_journaler_after_receiver(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == (self.journalerPointer-1) % self.max_size
def is_unmarshaller_after_receiver(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == (self.unmarshallerPointer-1) % self.max_size
def is_BusinessLogicConsumer_after_unmarshaller(self):
"""Return True if the head of the CircularBuffer is equal to the tail,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.unmarshallerPointer == (self.blconsumer-1) % self.max_size
def is_full(self):
"""Return True if the tail of the CircularBuffer is one before the head,
otherwise return False
Runtime: O(1) Space: O(1)"""
return self.receiver == (self.blconsumer-1) % self.max_size
def receive(self, item):
"""Insert an item at the back of the CircularBuffer
Runtime: O(1) Space: O(1)"""
if self.is_full()==False :
self.buffer[self.receiver] = item
self.receiver = (self.receiver + 1) % self.max_size
def front(self):
"""Return the item at the front of the CircularBuffer
Runtime: O(1) Space: O(1)"""
return self.buffer[self.blconsumer]
def consume(self):
"""Return the item at the front of the Circular Buffer and remove it
Runtime: O(1) Space: O(1)"""
# if self.is_empty():
# raise IndexError("CircularBuffer is empty, unable to dequeue")
# if self.is_BusinessLogicConsumer_after_unmarshaller()==True :
# raise IndexError("BusinessLogicConsumer can't be after receiver")
if self.is_BusinessLogicConsumer_after_unmarshaller()==False and self.is_empty()==False:
item = self.buffer[self.blconsumer]
self.buffer[self.blconsumer] = None
self.blconsumer = (self.blconsumer + 1) % self.max_size
return item
def replicator(self):
# if self.is_empty():
# raise IndexError("CircularBuffer is empty, unable to dequeue")
# if self.is_replicator_after_receiver()==True :
# raise IndexError("replicator can't be after receiver")
if self.is_replicator_after_receiver()==False and self.is_empty()==False:
item = self.buffer[self.replicatorPointer]
self.replicatorPointer = (self.replicatorPointer + 1) % self.max_size
return item
def journaler(self):
# if self.is_empty():
# raise IndexError("CircularBuffer is empty, unable to dequeue")
# if self.is_journaler_after_receiver()==True :
# raise IndexError("journaler can't be after receiver")
if self.is_journaler_after_receiver()==False and self.is_empty()==False:
item = self.buffer[self.journalerPointer]
self.journalerPointer = (self.journalerPointer + 1) % self.max_size
return item
def unmarshaller(self):
# if self.is_empty():
# raise IndexError("CircularBuffer is empty, unable to dequeue")
# if self.is_unmarshaller_after_receiver()==True :
# raise IndexError("unmarshaller can't be after receiver")
if self.is_unmarshaller_after_receiver()==False and self.is_empty()==False:
item = self.buffer[self.journalerPointer]
self.unmarshallerPointer = (self.unmarshallerPointer + 1) % self.max_size
return item
As you can see in the picture we have a business logic part in Lmax that fetches data from Ring Buffer into CPU for fast processing . Unfortunately I couldn't find any document to implement Business logic layer . How can I fetch data from ring buffer into cpu register in Lmax with python ?