I'm using Kombu to connect to RabbitMQ, and found myself in some strange behavior. I have a generic producer function (see below), that I use throughout our code base. This works very well in most cases, but some of our unit tests started failing here. The connection is closed when opening the with block and opening the connection manually results in a kombu.exceptions.OperationalError: [Errno 104] Connection reset by peer.
Everything is run with docker-compose, also the tests are run inside a container. Though the reset pack indicates a network issue, I don't have the feeling it's particularly network related as I have no issues sending the message from other places in the code. I suppose it could have something to do with the testing setup, but I have no idea where to start looking.
- I validated that the message is properly serialized
- I validated that the connection string is working
- I validated that the two containers have a network link
- I validated compatible versions (Kombu 5.2.3, AMQP 5.1.1, RabbitMQ 3.11.8)
- There's no open concurrent open connections
- There's no messages in the queue
- There's a lot of stack and git tickets that mention the 'Connection reset by peer', but more information than a reset package is send from remote in this context is hard to find, especially stuff that's relevant.
The strange thing is really that this function works fine when called, but fails in a fraction of our tests. There's not much strange going on with our pytest setup though (init file at the bottom).
The error traceback:
platform/resources.py:39: in publish_event
send_amqp_message(event)
messagebus/producer.py:16: in send_amqp_message
conn.connect()
/usr/local/lib/python3.9/dist-packages/kombu/connection.py:275: in connect
return self._ensure_connection(
/usr/local/lib/python3.9/dist-packages/kombu/connection.py:434: in _ensure_connection
return retry_over_time(
/usr/local/lib/python3.9/dist-packages/kombu/utils/functional.py:312: in retry_over_time
return fun(*args, **kwargs)
/usr/local/lib/python3.9/dist-packages/kombu/connection.py:878: in _connection_factory
self._connection = self._establish_connection()
/usr/local/lib/python3.9/dist-packages/kombu/connection.py:813: in _establish_connection
conn = self.transport.establish_connection()
/usr/local/lib/python3.9/dist-packages/kombu/transport/pyamqp.py:201: in establish_connection
conn.connect()
/usr/local/lib/python3.9/dist-packages/amqp/connection.py:329: in connect
self.drain_events(timeout=self.connect_timeout)
/usr/local/lib/python3.9/dist-packages/amqp/connection.py:525: in drain_events
while not self.blocking_read(timeout):
/usr/local/lib/python3.9/dist-packages/amqp/connection.py:530: in blocking_read
frame = self.transport.read_frame()
/usr/local/lib/python3.9/dist-packages/amqp/transport.py:312: in read_frame
payload = read(size)
/usr/local/lib/python3.9/dist-packages/amqp/transport.py:627: in _read
s = recv(n - len(rbuf))
/usr/local/lib/python3.9/dist-packages/httpretty/core.py:697: in recv
return self.forward_and_trace('recv', buffersize, *args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <httpretty.core.fakesock.socket object at 0x402d1bf760>, function_name = 'recv', a = (1342177288,), kw = {}
function = <built-in method recv of socket object at 0x402d1b19a0>
callback = <built-in method recv of socket object at 0x402d1b19a0>
def forward_and_trace(self, function_name, *a, **kw):
if self.truesock and not self.__truesock_is_connected__:
self.truesock = self.create_socket()
### self.connect_truesock()
if self.__truesock_is_connected__:
function = getattr(self.truesock, function_name)
if self.is_http:
if self.truesock and not self.__truesock_is_connected__:
self.truesock = self.create_socket()
### self.connect_truesock()
if not self.truesock:
raise UnmockedError()
callback = getattr(self.truesock, function_name)
> return callback(*a, **kw)
E ConnectionResetError: [Errno 104] Connection reset by peer
/usr/local/lib/python3.9/dist-packages/httpretty/core.py:666: ConnectionResetError
The producer function:
def send_amqp_message(message: BaseEvent, queue: str = "eventbus") -> None:
with Connection(MESSAGEBUS_CONNECTION_STRING) as conn:
if not conn.connected:
conn.connect()
queue = conn.SimpleQueue(queue)
queue.put({"event": message.name, "payload": asdict(message.data)}, serializer="uuid")
queue.close()
The RabbitMQ logs (on debug):
rabbitmq-1 | 2023-05-22 17:18:53.633771+00:00 [info] <0.2691.0> accepting AMQP connection <0.2691.0> (172.26.0.5:32944 -> 172.26.0.4:5672)
rabbitmq-1 | 2023-05-22 17:18:53.633833+00:00 [error] <0.2691.0> closing AMQP connection <0.2691.0> (172.26.0.5:32944 -> 172.26.0.4:5672):
rabbitmq-1 | 2023-05-22 17:18:53.633833+00:00 [error] <0.2691.0> {bad_header,<<1,0,0,0,0,0,177,0>>}
rabbitmq-1 | 2023-05-22 17:18:55.687146+00:00 [info] <0.2701.0> accepting AMQP connection <0.2701.0> (172.26.0.5:32958 -> 172.26.0.4:5672)
rabbitmq-1 | 2023-05-22 17:18:55.693462+00:00 [warning] <0.2701.0> closing AMQP connection <0.2701.0> (172.26.0.5:32958 -> 172.26.0.4:5672):
rabbitmq-1 | 2023-05-22 17:18:55.693462+00:00 [warning] <0.2701.0> client unexpectedly closed TCP connection
rabbitmq-1 | 2023-05-22 17:18:55.695395+00:00 [info] <0.2706.0> accepting AMQP connection <0.2706.0> (172.26.0.5:32968 -> 172.26.0.4:5672)
rabbitmq-1 | 2023-05-22 17:18:55.695585+00:00 [error] <0.2706.0> closing AMQP connection <0.2706.0> (172.26.0.5:32968 -> 172.26.0.4:5672):
rabbitmq-1 | 2023-05-22 17:18:55.695585+00:00 [error] <0.2706.0> {bad_header,<<1,0,0,0,0,0,177,0>>}
rabbitmq-1 | 2023-05-22 17:18:59.759150+00:00 [info] <0.2718.0> accepting AMQP connection <0.2718.0> (172.26.0.5:45838 -> 172.26.0.4:5672)
rabbitmq-1 | 2023-05-22 17:18:59.765966+00:00 [warning] <0.2718.0> closing AMQP connection <0.2718.0> (172.26.0.5:45838 -> 172.26.0.4:5672):
rabbitmq-1 | 2023-05-22 17:18:59.765966+00:00 [warning] <0.2718.0> client unexpectedly closed TCP connection
rabbitmq-1 | 2023-05-22 17:18:59.767295+00:00 [info] <0.2723.0> accepting AMQP connection <0.2723.0> (172.26.0.5:45842 -> 172.26.0.4:5672)
rabbitmq-1 | 2023-05-22 17:18:59.767437+00:00 [error] <0.2723.0> closing AMQP connection <0.2723.0> (172.26.0.5:45842 -> 172.26.0.4:5672):
rabbitmq-1 | 2023-05-22 17:18:59.767437+00:00 [error] <0.2723.0> {bad_header,<<1,0,0,0,0,0,177,0>>}
And the docker-compose, for what it's worth:
version: '2'
services:
rabbitmq:
image: rabbitmq:3-management-alpine
restart: always
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
RABBITMQ_LOG_LEVEL: debug
ports:
- 5672:5672
- 15672:15672
volumes:
- ~/.docker-conf/rabbitmq/data/:/var/lib/rabbitmq/
- ~/.docker-conf/rabbitmq/log/:/var/log/rabbitmq
redis:
image: redis:alpine
test_container:
command: /src/bin/debug
depends_on:
- rabbitmq
- redis
environment:
CELERY_URI: amqp://guest:guest@rabbitmq
TRUSTED_DOMAINS: 'service;example.com;another-example.com;test'
MESSAGEBUS_CONNECTION_STR: amqp://guest:guest@rabbitmq
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
volumes:
- '.:/src'
Pytest.ini
[pytest]
norecursedirs = ve
testpaths = /src/tests
junit_family=xunit2
; addopts=--tb=short -n 5 --dist=loadscope
As the traceback indicated,
httprettywas interferring and mocking the socket connection. Disablinghttprettyin the function call was the solution for me. Strangely enough, I could not find any refernence towards the Rabbit MQ URL.