How do I pass a generator to celery.chord instead of a list?

1.1k Views Asked by At

I have a celery task that processes each line in a super large text file in parallel. I also have a celery task that needs to run after each line is processed - it amalgamates and processes the output of each line. Because these are such huge datasets that I'm working with, is there any way I can have celery work with generators, as opposed to lists?

def main():
    header_generator = (processe.s(line) for line in file)
    callback = finalize.s()
    # Want to loop through header_generator and kick off tasks
    chord(header_generator)(callback)

@celery.task
def process(line):
    # do stuff with line, return output
    return output

@celery.task
def finalize(output_generator):
    # Want to loop through output_generator and process the output
    for line in output_generator:
        # do stuff with output
    # do something to signal the completion of the file

If this isn't possible - without forking celery - is there another strategy that someone could recommend?

1

There are 1 best solutions below

0
On

At the time of this writing, generators passed to groups and chords are immediately expanded. I had a similar problem, so I added support for it and created a pull request against celery 3.x here: https://github.com/celery/celery/pull/3043

Currently only redis is supported. Hopefully the PR will be merged before celery 3 is released.