Issue Pickling Dataflow Pipeline on Airflow

211 Views Asked by At

I am running a pipeline that follows a general format as such:

def dataflow_job_run(files_list, bucket):
    import apache_beam as beam
    from apache_beam.io import ReadAllFromText
    from apache_beam.io import filesystems
    from apache_beam.options.pipeline_options import PipelineOptions
    from <custom_module> import func1

    class SpecialOutputClass(beam.DoFn):
        def process(self, element):
            path = bucket+element[0]
            writer = filesystems.FileSystems.create(f'{path}')
            writer.write(bytes(element[1], 'utf-8')
            writer.close()

    dataflow_options = <dataflow_options_list>
    options = PipelineOptions(dataflow_options)

    transform  = (p | beam.Create(files_list) | ReadAllFromText(with_filename=True) | beam.Map(func1) | beam.GroupByKey() | beam.ParDo(SpecialOutputClass()))

    result = p.run()

This code works perfectly when I run it locally, but when I try to run this in Airflow(v1.10), I run into an error.. I cut the pipeline into separate components, and the one that causes the issue s the last ParDo:

File "/usr/local/lib/python3.8/site-packages/apache_beam/internal/dill_pickler.py", line 285, in loads
return dill.loads(s)
File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 826, in _import_module
return __import__(import_name)
ModuleNotFoundError: No module named 'unusual_prefix_*' 

Where the '*' just represents the name of the dag. Based on the error message, it seems like the problem might be the way that DAGs are pickled in Airflow.. and for some reason, it just doesn't pickle the ParDo class properly... is there a way to go around this problem?

EDIT: If I change the SpecialOutputClass into a function and run beam.Map() it'll work.. so clearly there is something going on with the ParDo() aspect. While this clears up some of the problem, I am curious how to allow the original code to work. Some additional considerations, I do not have donot_pickle = True and I am running on a CeleryExecutor.

0

There are 0 best solutions below