Listing files in GCS with apache beam low throughput

68 Views Asked by At

I'm trying run an apache beam pipeline on Dataflow. The pipeline is supposed to list all files from a location in a bucket and then download all the files. I start by listing and filtering all the files in a bucket and then expanding the files to download them in a ParDo. However the self.list_files seems to have a weird throughput which slows down after a while.

My pipeline code:

            pipeline
            | "Get list of zones"                                       >> beam.create(keys)
            | "Get snapshots filenames"                                 >> beam.FlatMap(self.list_files)
            | "Load snapshots"                                          >> beam.ParDo(LoadFiles())

The code for listing all the files:

    def list_files(
        self, element: Key
    ) -> list[tuple[Key, str]]]:
        blob_names_with_size = beam.io.gcsio.GcsIO().list_files(
            f"{SNAPSHOTS_BASE_URI}/{element}/",
            with_metadata=False,
        )
        blob_names = [blob_name[0] for blob_name in blob_names_with_size]
        input_files_refs = filter_files(blob_names)
        return [
            (element, input_files_ref)
            for input_files_ref in input_files_refs
        ]

When I look at the throughput, it seems to scale at first but after a while it starts yielding only a few elements at a time which then slows down all subsequent steps of the pipeline due to the low throughput of this step. Any ideas why this might be and how to improve this ? Low output of the list file step

0

There are 0 best solutions below