I have my celery tasks setup like following:
@celeryapp.task
def heavy_task(x, y):
# some stuff
for _ in range(10000):
heavy_task_2.apply_async(args=(x,y),
countdown=random.randint(60,120))
return x+y
@celeryapp.task
def heavy_task_2(x, y):
# some stuff
return x+y
I have 5 workers(prefork) with 30 concurrency each. All running with prefetch_multiplier=1 and -Ofair argument. I am using redis broker with CELERY_ACKS_LATE=True
Now, I call heavy_task(1,2).delay() from a celery beat schedule, the tasks goes to any 1 worker and then all 10,000 tasks it creates reside with that worker only and not get published to brokers so that other workers can work on those tasks. The prefetch_count of the worker keeps on increasing to 10,000
Only after the memory in the original worker starts getting consumed close to 90% then these tasks get published to the broker and thus shifted to other workers. Sometimes worker also get killed by OS, so my tasks are lost forever since they are not "unacknowledged" in the redis broker.
What should I do to make these secondary tasks immediately go to broker, without burdening one worker for all tasks?