The following code:

def get_pipeline(workers):
    pipeline_options = PipelineOptions(['--direct_num_workers', str(workers)])
    return beam.Pipeline(options=pipeline_options,
                         runner=fn_api_runner.FnApiRunner(
                             default_environment=beam_runner_api_pb2.Environment(
                                 urn=python_urns.SUBPROCESS_SDK,
                                 payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
                                         % sys.executable.encode('ascii'))))

with get_pipeline(4) as pipeline:
  _ = (  
        pipeline
        | 'ReadTestData' >> beam.io.ReadFromParquet(input_files, columns=all_columns)
        | "write" >> beam.io.WriteToText("/tmp/txt2")
  )

uses only one worker out of 4 available and generates only one big output file (even though there are many input files).

How do I force the Beam pipeline to work in parallel i.e. how do I force every input file to get processed separately by a different worker?

1

There are 1 best solutions below

0
On

Which version of beam are you using ?

I have the same issue with beam 2.16.0 but version 2.17.0 seems to have the expected behavior.

You might want to try with this version instead while keeping your code as it is.