Multi-threaded Kombu RPC application with single connection and multiple channels

2k Views Asked by At

I have an RPC application that has a single connection and multiple channels per thread. Currently, the program works as intended if its used in just a single thread. However, when adding more than one thread an array of unexpected issues occur, such as:

  • amqp.exceptions.UnexpectedFrame: Received 0x31 while expecting 0xce
  • Not being able to send messages
  • Not receiving messages
  • ConnectionResetByPeer

The logic of the RPC app is there because it works as intended by using one thread, but using more than one thread is where the app starts to break.

The code for my app is:

from __future__ import absolute_import, unicode_literals

from kombu import Connection, Producer, Consumer, Queue, uuid
import uuid
import threading
import time
from utils import connector


class RpcConnection(object):
    def __init__(self):
        # connector variable is an amqp url to RabbitMq server
        self._connection = Connection(connector, heartbeat=10)
        self._pool = self._connection.ChannelPool(6)

    def connection(self):
        # Returns same connection
        return self._connection

    def new_channel(self):
        # Returns different channel each time from pool of channels
        return self._pool.acquire()


class ExecuteAction(object):
    def __init__(self, connection, channel):
        self._connection = connection
        self._channel = channel

        # Define single response queue on each instance of a thread
        self.callback_queue = Queue(name=str(uuid.uuid4()),
                                    exclusive=True,
                                    auto_delete=True,
                                    durable=False)

        self.correlation_id = None
        self.response = None

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._channel.release()

    def execute_action(self):
        self.correlation_id = str(uuid.uuid4())

        # Send RPC message
        with Producer(self._channel) as producer:
            producer.publish(
                {'message_type': 'test',
                 'speed': int(1)},
                exchange='exchange.rpc',
                routing_key='message.action.rpc',
                reply_to=self.callback_queue.name,
                correlation_id=self.correlation_id,
            )

        # Consume RPC message back from queue
        with Consumer(self._channel,
                      on_message=self.on_response,
                      queues=[self.callback_queue],
                      no_ack=False):
            while self.response is None:
                print('Waiting for response')
                self._connection.drain_events()

        return self.response

    def on_response(self, message):
        if message.properties['correlation_id'] == self.correlation_id:
            print('Message received: {}\n'.format(message.payload))
            self.response = message.payload


class ThreadBase(threading.Thread):
    def __init__(self, connection, channel):
        threading.Thread.__init__(self)

        self._channel = channel
        self._connection = connection

        self.execute_action = None

    def initialise(self):
        # Initialises another class object which sends messages using RPC
        self.execute_action = ExecuteAction(connection=self._connection,
                                            channel=self._channel)

    def run(self):
        while True:
            # Infinite loop that calls execute_action() to send an RPC message every 4 seconds
            self.execute_action.execute_action()
            time.sleep(4)


if __name__ == '__main__':
    rpc_connection = RpcConnection()

    # For thread 1 and thread 2, the same connection is being passed, but each thread has a different channel
    # based on that same connection
    thread1 = ThreadBase(connection=rpc_connection.connection(),
                         channel=rpc_connection.new_channel())

    thread2 = ThreadBase(connection=rpc_connection.connection(),
                         channel=rpc_connection.new_channel())

    thread1.initialise()
    thread2.initialise()

    thread1.setName('Thread 1')
    thread2.setName('Thread 2')

    thread1.start()
    #time.sleep(2)
    thread2.start()

If a time.sleep() of 2 or more seconds is added then the app works as intended with more than one thread, but this is not the desired outcome because the threads aren't working in parallel to each other. The goal here is to have multiple threaded channels using the same connection working in parallel to each other to make and receive RPC calls.

Any help is greatly appreciated.

1

There are 1 best solutions below

2
On

You should use one connection per thread, and then create channels within that thread.

Also, note that your code calls self._connection.drain_events() from two separate threads, but using the same connection. That is definitely going to be problematic.