Kombu - RabbitMQ - Pytest - [Errno 104] Connection reset by peer

589 Views Asked by At

I'm using Kombu to connect to RabbitMQ, and found myself in some strange behavior. I have a generic producer function (see below), that I use throughout our code base. This works very well in most cases, but some of our unit tests started failing here. The connection is closed when opening the with block and opening the connection manually results in a kombu.exceptions.OperationalError: [Errno 104] Connection reset by peer.

Everything is run with docker-compose, also the tests are run inside a container. Though the reset pack indicates a network issue, I don't have the feeling it's particularly network related as I have no issues sending the message from other places in the code. I suppose it could have something to do with the testing setup, but I have no idea where to start looking.

  • I validated that the message is properly serialized
  • I validated that the connection string is working
  • I validated that the two containers have a network link
  • I validated compatible versions (Kombu 5.2.3, AMQP 5.1.1, RabbitMQ 3.11.8)
  • There's no open concurrent open connections
  • There's no messages in the queue
  • There's a lot of stack and git tickets that mention the 'Connection reset by peer', but more information than a reset package is send from remote in this context is hard to find, especially stuff that's relevant.

The strange thing is really that this function works fine when called, but fails in a fraction of our tests. There's not much strange going on with our pytest setup though (init file at the bottom).


The error traceback:

platform/resources.py:39: in publish_event
    send_amqp_message(event)
messagebus/producer.py:16: in send_amqp_message
    conn.connect()
/usr/local/lib/python3.9/dist-packages/kombu/connection.py:275: in connect
    return self._ensure_connection(
/usr/local/lib/python3.9/dist-packages/kombu/connection.py:434: in _ensure_connection
    return retry_over_time(
/usr/local/lib/python3.9/dist-packages/kombu/utils/functional.py:312: in retry_over_time
    return fun(*args, **kwargs)
/usr/local/lib/python3.9/dist-packages/kombu/connection.py:878: in _connection_factory
    self._connection = self._establish_connection()
/usr/local/lib/python3.9/dist-packages/kombu/connection.py:813: in _establish_connection
    conn = self.transport.establish_connection()
/usr/local/lib/python3.9/dist-packages/kombu/transport/pyamqp.py:201: in establish_connection
    conn.connect()
/usr/local/lib/python3.9/dist-packages/amqp/connection.py:329: in connect
    self.drain_events(timeout=self.connect_timeout)
/usr/local/lib/python3.9/dist-packages/amqp/connection.py:525: in drain_events
    while not self.blocking_read(timeout):
/usr/local/lib/python3.9/dist-packages/amqp/connection.py:530: in blocking_read
    frame = self.transport.read_frame()
/usr/local/lib/python3.9/dist-packages/amqp/transport.py:312: in read_frame
    payload = read(size)
/usr/local/lib/python3.9/dist-packages/amqp/transport.py:627: in _read
    s = recv(n - len(rbuf))
/usr/local/lib/python3.9/dist-packages/httpretty/core.py:697: in recv
    return self.forward_and_trace('recv', buffersize, *args, **kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <httpretty.core.fakesock.socket object at 0x402d1bf760>, function_name = 'recv', a = (1342177288,), kw = {}
function = <built-in method recv of socket object at 0x402d1b19a0>
callback = <built-in method recv of socket object at 0x402d1b19a0>

    def forward_and_trace(self, function_name, *a, **kw):
        if self.truesock and not self.__truesock_is_connected__:
            self.truesock = self.create_socket()
            ### self.connect_truesock()
    
        if self.__truesock_is_connected__:
            function = getattr(self.truesock, function_name)
    
        if self.is_http:
            if self.truesock and not self.__truesock_is_connected__:
                self.truesock = self.create_socket()
                ### self.connect_truesock()
    
        if not self.truesock:
            raise UnmockedError()
    
        callback = getattr(self.truesock, function_name)
>       return callback(*a, **kw)
E       ConnectionResetError: [Errno 104] Connection reset by peer

/usr/local/lib/python3.9/dist-packages/httpretty/core.py:666: ConnectionResetError

The producer function:

def send_amqp_message(message: BaseEvent, queue: str = "eventbus") -> None:
    with Connection(MESSAGEBUS_CONNECTION_STRING) as conn:
        if not conn.connected:
            conn.connect()
        queue = conn.SimpleQueue(queue)
        queue.put({"event": message.name, "payload": asdict(message.data)}, serializer="uuid")
        queue.close()

The RabbitMQ logs (on debug):

rabbitmq-1  | 2023-05-22 17:18:53.633771+00:00 [info] <0.2691.0> accepting AMQP connection <0.2691.0> (172.26.0.5:32944 -> 172.26.0.4:5672)
rabbitmq-1  | 2023-05-22 17:18:53.633833+00:00 [error] <0.2691.0> closing AMQP connection <0.2691.0> (172.26.0.5:32944 -> 172.26.0.4:5672):
rabbitmq-1  | 2023-05-22 17:18:53.633833+00:00 [error] <0.2691.0> {bad_header,<<1,0,0,0,0,0,177,0>>}
rabbitmq-1  | 2023-05-22 17:18:55.687146+00:00 [info] <0.2701.0> accepting AMQP connection <0.2701.0> (172.26.0.5:32958 -> 172.26.0.4:5672)
rabbitmq-1  | 2023-05-22 17:18:55.693462+00:00 [warning] <0.2701.0> closing AMQP connection <0.2701.0> (172.26.0.5:32958 -> 172.26.0.4:5672):
rabbitmq-1  | 2023-05-22 17:18:55.693462+00:00 [warning] <0.2701.0> client unexpectedly closed TCP connection
rabbitmq-1  | 2023-05-22 17:18:55.695395+00:00 [info] <0.2706.0> accepting AMQP connection <0.2706.0> (172.26.0.5:32968 -> 172.26.0.4:5672)
rabbitmq-1  | 2023-05-22 17:18:55.695585+00:00 [error] <0.2706.0> closing AMQP connection <0.2706.0> (172.26.0.5:32968 -> 172.26.0.4:5672):
rabbitmq-1  | 2023-05-22 17:18:55.695585+00:00 [error] <0.2706.0> {bad_header,<<1,0,0,0,0,0,177,0>>}
rabbitmq-1  | 2023-05-22 17:18:59.759150+00:00 [info] <0.2718.0> accepting AMQP connection <0.2718.0> (172.26.0.5:45838 -> 172.26.0.4:5672)
rabbitmq-1  | 2023-05-22 17:18:59.765966+00:00 [warning] <0.2718.0> closing AMQP connection <0.2718.0> (172.26.0.5:45838 -> 172.26.0.4:5672):
rabbitmq-1  | 2023-05-22 17:18:59.765966+00:00 [warning] <0.2718.0> client unexpectedly closed TCP connection
rabbitmq-1  | 2023-05-22 17:18:59.767295+00:00 [info] <0.2723.0> accepting AMQP connection <0.2723.0> (172.26.0.5:45842 -> 172.26.0.4:5672)
rabbitmq-1  | 2023-05-22 17:18:59.767437+00:00 [error] <0.2723.0> closing AMQP connection <0.2723.0> (172.26.0.5:45842 -> 172.26.0.4:5672):
rabbitmq-1  | 2023-05-22 17:18:59.767437+00:00 [error] <0.2723.0> {bad_header,<<1,0,0,0,0,0,177,0>>}

And the docker-compose, for what it's worth:

version: '2'
services:
    rabbitmq:
        image: rabbitmq:3-management-alpine
        restart: always
        environment:
            RABBITMQ_DEFAULT_USER: guest
            RABBITMQ_DEFAULT_PASS: guest
            RABBITMQ_LOG_LEVEL: debug
        ports:
            - 5672:5672
            - 15672:15672
        volumes:
            - ~/.docker-conf/rabbitmq/data/:/var/lib/rabbitmq/
            - ~/.docker-conf/rabbitmq/log/:/var/log/rabbitmq
    redis:
        image: redis:alpine
    test_container:
        command: /src/bin/debug
        depends_on:
            - rabbitmq
            - redis
        environment:
            CELERY_URI: amqp://guest:guest@rabbitmq
            TRUSTED_DOMAINS: 'service;example.com;another-example.com;test'
            MESSAGEBUS_CONNECTION_STR: amqp://guest:guest@rabbitmq
        build:
            context: .
            dockerfile: Dockerfile
        ports:
            - "8000:8000"
        volumes:
            - '.:/src'

Pytest.ini

[pytest]
norecursedirs = ve
testpaths = /src/tests
junit_family=xunit2
; addopts=--tb=short -n 5 --dist=loadscope
1

There are 1 best solutions below

0
Niels Uitterdijk On

As the traceback indicated, httpretty was interferring and mocking the socket connection. Disabling httpretty in the function call was the solution for me. Strangely enough, I could not find any refernence towards the Rabbit MQ URL.

import httpretty


def send_amqp_message(message: BaseEvent, queue: str = "eventbus") -> None:
    httpretty.disable()
    with Connection(MESSAGEBUS_CONNECTION_STRING) as conn:
        if not conn.connected:
            conn.connect()
        queue = conn.SimpleQueue(queue)
        queue.put({"event": message.name, "payload": asdict(message.data)}, serializer="uuid")
        queue.close()
    httpretty.enable()