I have a workflow that looks like that:
from celery import shared_task, chord, chain, signature
def get_data():
return list(1,2,3,4)
@shared_task()
def enrich_nums(num: int):
return {num: num%0}
@shared_task()
def run workflow():
data = get_data()
tasks = [enrich_nums(i) for i in data]
return chord(tasks, signature("task.another.place", queue="another.queue")).delay()
I would like to mock that "signature" to catch the output of it.
Versions: python 3.11.2 celery 5.3.6 pytest-celery
broker: rabbitmq backend: mongodb
I tried to create a task in the test function and assign it, failed.
from tasks import run_workflow
def test_workflow(
celery_app: Celery,
celery_worker: WorkController,
):
@celery_app.task(name="task.another.place", queue="another.queue")
def mock_task(*args, **kwargs):
raise ValueError("im here!")
celery_app.register_task(mock_task)
celery_worker.reload()
run_workflow().delay().get()
I was expecting the error to be raised, but a message is inserted into the broker and does not run in the worker.