Polars with Rust: Out of Memory Error when Processing Large Dataset in Docker Using Streaming

37 Views Asked by At

I am using Polars ({ version = "0.38.3", features = ["lazy", "streaming", "parquet", "fmt", "polars-io", "json"] }) with Rust (v1.77.0) to process a large dataset (larger than available memory) inside a Docker container. The Docker container's memory is intentionally limited to 6GB using --memory=20gb and --shm-size=20gb. I am encountering an out of memory error while performing calculations on the dataset.

Here's an overview of my workflow:

1- Load the dataset from a Parquet file using scan_parquet to create a LazyDataframe.
2- Perform transformations on the dataframe, which is unnesting.
3- Write the resulting data to disk as a Parquet file using sink_parquet.

Here is a code snippet that demonstrates the relevant parts of my Rust code:

use jemallocator::Jemalloc;
use polars::{
    prelude::*,
};
use std::time::Instant;

#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

fn main() {
    let now = Instant::now();

    let mut lf = LazyFrame::scan_parquet(
        "./dataset.parquet",
        ScanArgsParquet {
            low_memory: true,
            ..Default::default()
        },
    )
    .unwrap()
    .with_streaming(true);

    lf = lf.unnest(["fields"]);

    let query_plan = lf.clone().explain(true).unwrap();
    println!("{}", query_plan);

    lf.sink_parquet("./result.parquet".into(), Default::default())
        .unwrap();

    let elapsed = now.elapsed();
    println!("Elapsed: {:.2?}", elapsed);
}

Despite using LazyFrame and enabling low_memory mode in ScanArgsParquet, I still encounter an out of memory error during the execution of the code.

I have tried the following:

  • Using the jemallocator crate as the global allocator.
  • Enabling streaming mode using with_streaming(true) for the LazyFrame operations.
  • Using the low_memory: true in the scan_parquet function.

The printed plan indicates that every operation should be run in the streaming engine:

--- STREAMING
UNNEST by:[fields]

    Parquet SCAN ./resources/dataset.parquet
    PROJECT */2 COLUMNS  --- END STREAMING

  DF []; PROJECT */0 COLUMNS; SELECTION: "None"

However, I am still running into memory issues when processing the large dataset (Parquet file size = 20GB).

My questions are:

  • Why I'm getting the OOM error while everything is indicating it is using the streaming engine ?
  • Is there another way to leverage disk-based processing or chunking the data to handle datasets larger than memory?

Any guidance or suggestions on how to resolve this issue would be greatly appreciated. Thank you in advance!

0

There are 0 best solutions below