Why only one periodic task created?

419 Views Asked by At

I want to create multiple periodic tasks by a loop, but only the last of the list was created.For example:

@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    a = [1,3,4,7,8,10]
    for i in a:
        sender.add_periodic_task(crontab(hour=i), task.s())

In the task schedule when I run celery beat -A tasks - l debug, I see only the task executed at 10 o'clock. Why?

1

There are 1 best solutions below

1
On

The tasks are stored in a dictionary by key. The key is given by the name argument or the repr() of the sig argument. Here the sig argument is task.s() and it is the same for every loop. So as it goes through the loop it overwrites the same key for each schedule. To fix provide a unique name:

sender.add_periodic_task(crontab(hour=i), task.s(), name='whatever-{}'.format(i))

Here is the relevent source from celery:

def add_periodic_task(self, schedule, sig,
                      args=(), kwargs=(), name=None, **opts):
    key, entry = self._sig_to_periodic_task_entry( 
        schedule, sig, args, kwargs, name, **opts)
    if self.configured:
        self._add_periodic_task(key, entry)
    else:
        self._pending_periodic_tasks.append((key, entry))

    return key

def _sig_to_periodic_task_entry(self, schedule, sig,
                                args=(), kwargs={}, name=None, **opts):
    sig = (sig.clone(args, kwargs)
           if isinstance(sig, abstract.CallableSignature)
           else self.signature(sig.name, args, kwargs))
    return name or repr(sig), { # <------------------------------- key created here
        'schedule': schedule,
        'task': sig.name,
        'args': sig.args,
        'kwargs': sig.kwargs,
        'options': dict(sig.options, **opts),
    }

def _add_periodic_task(self, key, entry):
    self._conf.beat_schedule[key] = entry # <--------------------- key can be overwritten

edit: As pointed out by @GharianiMohamed the docs state that the hour argument of chrontab can be "a (list of) integers from 0-23 that represent the hours of a day of when execution should occur." So a better way of handling this to remove the loop entirely:

@app.on_after_finalize.connect
def setup_periodic_tasks(sender, **kwargs):
    a = [1,3,4,7,8,10]
    sender.add_periodic_task(crontab(hour=a), task.s())