Apache Beam/Dataflow Pipeline Scaling Issue

23 Views Asked by At

I've been working on an Apache Beam pipeline with Dataflow to process and upload data into BigQuery. However, I'm encountering a scaling issue – after attempting to process more than 1,000 files at a time, I'm hitting memory errors.

Upon further investigation, it seems like my pipeline is processing all stages in batches rather than streaming each file or element individually. I would like to configure my pipeline to stream each file individually and limit the number of files processed at a time to avoid memory issues.

If anyone has experience with Apache Beam and Dataflow, I would greatly appreciate any insights or tips on how to make my pipeline more efficient and scalable.

Here's a simplified version of my current pipeline structure:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.runners import DirectRunner
from google.cloud import storage

class GetFiles(beam.DoFn):

    def __call__(self, *args, **kwargs):
        return self.process(*args, **kwargs)

    def process(self, element)
        # Get the id from the element
        file_path = element.get("id")

        # Get the name of the gcp bucket
        gcp_bucket_name = re.match("[REGEX HERE]", file_path)
        # Get the name of the gcp directory prefix
        gcp_directory_prefix = re.search("[REGEX HERE]", file_path)

        # Get the blobs associated with the file_path
        storage_client = storage.Client()
        bucket = storage_client.get_bucket(gcp_bucket_name)
        blobs = bucket.list_blobs(prefix=gcp_directory_prefix)

        # For each blob, pass the id on to the next PTransform
        for blob in blobs:
            yield {"id": blob.id}

class Extract(beam.DoFn):

    def __call__(self, *args, **kwargs):
        return self.process(*args, **kwargs)

    def process(self, element):
        # Get the name of the file
        file_name = element.get("file_name")
        bucket = element.get("bucket")

        # Get the data from GCS storage
        storage_client = storage.Client()
        bucket = storage_client.get_bucket(bucket)
        blob = bucket.get_blob(file_name)

        blob_data = blob.download_as_string()

        # Get some data for the columns
        json_data = json.loads(blob_data)

        # Get the id
        id = json_data.get("id")
        
        # Return the id with the blob data
        return {"id": id, "data": blob_data}

pipeline_options = PipelineOptions(
    streaming=True
)

schema = {
    "fields": [
        {"name": "id", "type": "STRING"},
        {"name": "data", "type": "JSON"}
    ]
}

pipeline = beam.Pipeline(DirectRunner(), options=pipeline_options)

extraction = ( pipeline
    | "Get file parameters" >> beam.Create(["bucket_name/path/to/directory", "bucket_name/path/to/directory"])
    | "Get the list of files" >> beam.ParDo(GetFiles())
    | "Extract the Json Data" >> beam.ParDo(Extract())
    | "Upload the Data to BigQuery" >> WriteToBigQuery(
        table="project.dataset.table",
        schema=schema,
        method="STREAMING_INSERTS",
        batch_size=1
    )
)

results = pipeline.run()
results.wait_until_finish()

Please forgive minor bugs, this code is only an example

The workflow works, until I reach around 1,000 or more files, then I get an malloc error. I believe this is because my pipeline is attempting to process all the files at once, instead of streaming.

Any suggestions, best practices, or sample code snippets would be immensely helpful. Thanks in advance for your expertise!

0

There are 0 best solutions below