I have an RPC application that has a single connection and multiple channels per thread. Currently, the program works as intended if its used in just a single thread. However, when adding more than one thread an array of unexpected issues occur, such as:
- amqp.exceptions.UnexpectedFrame: Received 0x31 while expecting 0xce
- Not being able to send messages
- Not receiving messages
- ConnectionResetByPeer
The logic of the RPC app is there because it works as intended by using one thread, but using more than one thread is where the app starts to break.
The code for my app is:
from __future__ import absolute_import, unicode_literals
from kombu import Connection, Producer, Consumer, Queue, uuid
import uuid
import threading
import time
from utils import connector
class RpcConnection(object):
def __init__(self):
# connector variable is an amqp url to RabbitMq server
self._connection = Connection(connector, heartbeat=10)
self._pool = self._connection.ChannelPool(6)
def connection(self):
# Returns same connection
return self._connection
def new_channel(self):
# Returns different channel each time from pool of channels
return self._pool.acquire()
class ExecuteAction(object):
def __init__(self, connection, channel):
self._connection = connection
self._channel = channel
# Define single response queue on each instance of a thread
self.callback_queue = Queue(name=str(uuid.uuid4()),
exclusive=True,
auto_delete=True,
durable=False)
self.correlation_id = None
self.response = None
def __exit__(self, exc_type, exc_val, exc_tb):
self._channel.release()
def execute_action(self):
self.correlation_id = str(uuid.uuid4())
# Send RPC message
with Producer(self._channel) as producer:
producer.publish(
{'message_type': 'test',
'speed': int(1)},
exchange='exchange.rpc',
routing_key='message.action.rpc',
reply_to=self.callback_queue.name,
correlation_id=self.correlation_id,
)
# Consume RPC message back from queue
with Consumer(self._channel,
on_message=self.on_response,
queues=[self.callback_queue],
no_ack=False):
while self.response is None:
print('Waiting for response')
self._connection.drain_events()
return self.response
def on_response(self, message):
if message.properties['correlation_id'] == self.correlation_id:
print('Message received: {}\n'.format(message.payload))
self.response = message.payload
class ThreadBase(threading.Thread):
def __init__(self, connection, channel):
threading.Thread.__init__(self)
self._channel = channel
self._connection = connection
self.execute_action = None
def initialise(self):
# Initialises another class object which sends messages using RPC
self.execute_action = ExecuteAction(connection=self._connection,
channel=self._channel)
def run(self):
while True:
# Infinite loop that calls execute_action() to send an RPC message every 4 seconds
self.execute_action.execute_action()
time.sleep(4)
if __name__ == '__main__':
rpc_connection = RpcConnection()
# For thread 1 and thread 2, the same connection is being passed, but each thread has a different channel
# based on that same connection
thread1 = ThreadBase(connection=rpc_connection.connection(),
channel=rpc_connection.new_channel())
thread2 = ThreadBase(connection=rpc_connection.connection(),
channel=rpc_connection.new_channel())
thread1.initialise()
thread2.initialise()
thread1.setName('Thread 1')
thread2.setName('Thread 2')
thread1.start()
#time.sleep(2)
thread2.start()
If a time.sleep()
of 2 or more seconds is added then the app works as intended with more than one thread, but this is not the desired outcome because the threads aren't working in parallel to each other. The goal here is to have multiple threaded channels using the same connection working in parallel to each other to make and receive RPC calls.
Any help is greatly appreciated.
You should use one connection per thread, and then create channels within that thread.
Also, note that your code calls
self._connection.drain_events()
from two separate threads, but using the same connection. That is definitely going to be problematic.