Business logic imp in LMAX architecture in python

230 Views Asked by At

I am trying to implement disruptor in Lmax architecture . As you know in lmax architecture we have a ring buffer for creating a queue to process data . Here you can see the structure of that : enter image description here

I have implemented this structure in python as you can see here :

    import multiprocessing




class CircularBuffer(object):

    def __init__(self, max_size=10):
        """Initialize the CircularBuffer with a max_size if set, otherwise
        max_size will elementsdefault to 10"""
        self.buffer = [None] * max_size
        self.blconsumer = 0
        self.receiver = 0
        self.journalerPointer=0
        self.replicatorPointer=0
        self.unmarshallerPointer=0
        self.max_size = max_size

    def __str__(self):
        """Return a formatted string representation of this CircularBuffer."""
        items = ['{!r}'.format(item) for item in self.buffer]
        return '[' + ', '.join(items) + ']'

    def size(self):
        """Return the size of the CircularBuffer
        Runtime: O(1) Space: O(1)"""
        if self.receiver >= self.blconsumer:
            return self.receiver - self.blconsumer
        return self.max_size - self.head - self.receiver


    def is_empty(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == self.blconsumer

    def is_replicator_after_receiver(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == (self.replicatorPointer-1) % self.max_size

    def is_journaler_after_receiver(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == (self.journalerPointer-1) % self.max_size
    
    def is_unmarshaller_after_receiver(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == (self.unmarshallerPointer-1) % self.max_size

    def is_BusinessLogicConsumer_after_unmarshaller(self):
        """Return True if the head of the CircularBuffer is equal to the tail,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.unmarshallerPointer == (self.blconsumer-1) % self.max_size    

    def is_full(self):
        """Return True if the tail of the CircularBuffer is one before the head,
        otherwise return False
        Runtime: O(1) Space: O(1)"""
        return self.receiver == (self.blconsumer-1) % self.max_size

    def receive(self, item):
        """Insert an item at the back of the CircularBuffer
        Runtime: O(1) Space: O(1)"""
        if self.is_full()==False :
            self.buffer[self.receiver] = item
            self.receiver = (self.receiver + 1) % self.max_size


    def front(self):
        """Return the item at the front of the CircularBuffer
        Runtime: O(1) Space: O(1)"""
        return self.buffer[self.blconsumer]

    def consume(self):
        """Return the item at the front of the Circular Buffer and remove it
        Runtime: O(1) Space: O(1)"""
        # if self.is_empty():
        #     raise IndexError("CircularBuffer is empty, unable to dequeue")
        # if self.is_BusinessLogicConsumer_after_unmarshaller()==True :
        #     raise IndexError("BusinessLogicConsumer can't be after receiver")
        if self.is_BusinessLogicConsumer_after_unmarshaller()==False and  self.is_empty()==False:

            item = self.buffer[self.blconsumer]
            self.buffer[self.blconsumer] = None
            self.blconsumer = (self.blconsumer + 1) % self.max_size
            return item

    def replicator(self):

        # if self.is_empty():
        #     raise IndexError("CircularBuffer is empty, unable to dequeue")
        # if self.is_replicator_after_receiver()==True :
        #     raise IndexError("replicator can't be after receiver")
        if self.is_replicator_after_receiver()==False and  self.is_empty()==False:
            item = self.buffer[self.replicatorPointer]
            self.replicatorPointer = (self.replicatorPointer + 1) % self.max_size
            return item  

    def journaler(self):
     
        # if self.is_empty():
        #     raise IndexError("CircularBuffer is empty, unable to dequeue")
        # if self.is_journaler_after_receiver()==True :
        #     raise IndexError("journaler can't be after receiver")
        if self.is_journaler_after_receiver()==False and  self.is_empty()==False:

            item = self.buffer[self.journalerPointer]
            self.journalerPointer = (self.journalerPointer + 1) % self.max_size
            return item         

    def unmarshaller(self):
     
        # if self.is_empty():
        #     raise IndexError("CircularBuffer is empty, unable to dequeue")
        # if self.is_unmarshaller_after_receiver()==True :
        #     raise IndexError("unmarshaller can't be after receiver")
        if self.is_unmarshaller_after_receiver()==False and  self.is_empty()==False:

            item = self.buffer[self.journalerPointer]
            self.unmarshallerPointer = (self.unmarshallerPointer + 1) % self.max_size
            return item   
    

As you can see in the picture we have a business logic part in Lmax that fetches data from Ring Buffer into CPU for fast processing . Unfortunately I couldn't find any document to implement Business logic layer . How can I fetch data from ring buffer into cpu register in Lmax with python ?

0

There are 0 best solutions below