How to use a gRPC client in a Celery task

469 Views Asked by At

I'm working on a FastApi application that uses Celery to execute tasks, and I'm trying to integrate a gRPC client within a Celery task. However, I'm running into an InactiveRpcError that I can't seem to resolve.

Here's my setup: I have a Celery worker that I run with the following command: Celery worker :

celery -A worker.worker.celery_app worker --loglevel=info --pool=gevent --concurrency=500

My gRPC client works fine outside of the Celery task, and my other Celery tasks work fine without the gRPC client. However, when I try to use the gRPC client within a Celery task, I get the following error: grpc._channel._InactiveRpcError: <grpc._channel._RPCState object at 0x7f0adb16d600>

Here's the relevant code:

async def send_latest_grpc_packets(filter_type: PaquetTypeEnum, lastest_iec_packets: Optional[Iec104Packets] = None):
    with grpc.insecure_channel(f"{settings.GRPC_SERVER_HOST}:{settings.GRPC_SERVER_PORT}") as channel:
        stub = ioa_packets_pb2_grpc.PacketsStub(channel)
        if lastest_iec_packets is None:
            lastest_iec_packets = get_lastest_iec_packets(filter_type)
        formatted_iec_packets = format_iec_instance_to_schema(lastest_iec_packets)
        packets_request = ioa_packets_pb2.PacketsRequest(
            packets=cast_packet_to_proto_schema(formatted_iec_packets),
            type=filter_type,
        )
        response = stub.SendPackets(packets_request, timeout=30)

@celery_app.task()
def send_cyclical_TM_task():
    asyncio.run(send_latest_grpc_packets(PaquetTypeEnum.TM))

I'm using the following versions of the libraries:

  • grpcio = "^1.53.0"
  • celery = "^5.2.7"

I've tried changing the Celery execution pool from prefork to gevent, but that didn't solve the problem.

My current horrible workaround is to start a task that calls one of my own endpoints, which then uses the gRPC client. However, I'd like to find a better solution, as I'd like to keep the process non-blocking and periodic.

If anyone has experience with integrating gRPC and Celery, or can help me understand why they seem to be incompatible in my case, I would greatly appreciate it. Thank you!

0

There are 0 best solutions below