I'm trying run an apache beam pipeline on Dataflow. The pipeline is supposed to list all files from a location in a bucket and then download all the files. I start by listing and filtering all the files in a bucket and then expanding the files to download them in a ParDo. However the self.list_files seems to have a weird throughput which slows down after a while.
My pipeline code:
pipeline
| "Get list of zones" >> beam.create(keys)
| "Get snapshots filenames" >> beam.FlatMap(self.list_files)
| "Load snapshots" >> beam.ParDo(LoadFiles())
The code for listing all the files:
def list_files(
self, element: Key
) -> list[tuple[Key, str]]]:
blob_names_with_size = beam.io.gcsio.GcsIO().list_files(
f"{SNAPSHOTS_BASE_URI}/{element}/",
with_metadata=False,
)
blob_names = [blob_name[0] for blob_name in blob_names_with_size]
input_files_refs = filter_files(blob_names)
return [
(element, input_files_ref)
for input_files_ref in input_files_refs
]
When I look at the throughput, it seems to scale at first but after a while it starts yielding only a few elements at a time which then slows down all subsequent steps of the pipeline due to the low throughput of this step. Any ideas why this might be and how to improve this ?