Chain and group tasks methods with celery

546 Views Asked by At

(is that ok? xD)

Well, I have the following situation in a project with Celery. Let me put you all in context.

I had a lot of tasks that consume an API to make some configurations in a service we use via web services (SOAP, i'm using suds for this but that's another story).

So, I realized that all the tasks were almost the same thing until some point of the configuration everything was like the same flow:

configure_this -> configure_some_special_case_things -> configure_other_things -> configure_another_set_of_things

Since the order is not important, I can configure the special case things at the end of the process without any problem, and all the steps but the "special_case_configurations" were the same in all cases, so I abstracted this logic into something like this:

class BaseConfigurator(object):
    def __init__(self, some_args, *args, **kwargs):
        self.shared_state = some_args

    def _configure_this(self):
        #some steps that modifies the shared state

    def _configure_other_things(self):
        #some other steps that modifies the shared state

    def _configure_another_set_of_things(self):
        # another set of steps that modifies the shared state

    def _special_case_steps(self):
        # to be implemented by subclasses
        pass

    def configure(self):
        self._configure_this()
        self._configure_ahother_things()
        self._configure_another_set_of_things()
        self._special_case_steps()

then... I subclass this one and implement the _special_case_steps

class ParticularConfigurator(BaseConfigurator):
    def __init__(self, some_args, *args, **kwargs):
        super(ParticularConfigurator, self).__init__(self, some_args, *args, **kwargs)

    def _special_case_steps(self):
        # actual implementation, this overwrites the parent's "pass"

So, the way it used to work was with several "configurator" tasks that called the "general configurators" tasks and the "special case configurator" task in order into a celery.chain, so, I had a lot of duplicated code, I think this way is more elegant and this way I can reuse my "configure" method properly.

I'm aware of this using class methods as celery tasks

And all the naming caveats, but for any reason I tried to group and chain some task_methods and I got several pickling errors with objects that i'm not using directly in my task code, I use them in functions that are called by my task_method.

I got it working by instantiating my configurator class in a separate task that discriminates which case are we configuring and decides which configurator to call.

@celery.task(queue='user_tasks', max_retries=3, default_retry_delay=120, on_success=handlers.configuration_success)
def run_configuration(case, data):
    switch = {
        'default': DefaultConfigurator # I know, this is not declared but it's just for the example, let's pretend it is a subclass of BaseConfigurator :-)
        'custom': ParticularConfigurator
    }
    switch[case](data).configure()

Now, i'm worried about this, because i'm not taking advantage of celery's capabilities with celery.chain and celery.group and I guess this way my code will run slower than if I was able to chain and group task_methods, am I right?

Should I be subclassing Task instead of making tasks out of instance methods?

Thank you for taking the time to read.

0

There are 0 best solutions below