How to read data from parquet partitioned dataset to polars

106 Views Asked by At

I use a large dataset already written to disk as a parquet partitioned dataset.

How can I read such data directly into Polars to get some aggregate calculation results? I want to avoid converting parquet data to pandas (pq_df.to_pandas()) as my data is larger than my computer memory.

Here is a reproducible example code. I appreciate your input.

import polars as pl    # Version 0.20.3
import pyarrow as pa   # Version 11.0.0
import pyarrow.parquet as pq
 
 
pl_df = pl.DataFrame({
                          "Name": ["ABC","DEF","GHI",'JKL'],
                          "date": ["2024-01-01","2024-01-10","2023-01-29","2023-01-29"],
                          "price":[1000,1500,1800,2100] ,
                          })
 
pl_df = pl_df.with_columns(date= pl.col("date").cast(pl.Date))
 
# write Polars data frame to disk as parquet dataset    
pq.write_to_dataset( pl_df.to_arrow(), root_path=r"C:\Users\desktop PC\Downloads\test_pl", partition_cols=["date"],
                        compression ='gzip',existing_data_behavior='overwrite_or_ignore')
                        
# Have a schema object of data written to parquet dataset
pd_df_schema = pa.Schema.from_pandas(pl_df.to_pandas())
 
# Read data written to parquet dataset
pq_df = pq.read_table(r"C:\Users\desktop PC\Downloads\test_pl",
                      schema=pd_df_schema,
                      )
 
# I want to use this parquest object to create a aggregate result via Polars with out using #"pq_df.to_pandas()" method.
 
df = (pl.from_pandas(pq_df.to_pandas()).lazy()
      .group_by(["date"])
      .agg(
          [
              pl.col("price").sum().alias("grouped_sum"),
              pl.col("price").count().alias("grouped_count"),])
      ).collect(streaming=True)
1

There are 1 best solutions below

6
On BEST ANSWER

you can use from_arrow() method:

(
    pl.from_arrow(pq_df).lazy()
    .group_by("date")
    .agg(
        pl.col("price").sum().alias("grouped_sum"),
        pl.col("price").count().alias("grouped_count")
    ).collect(streaming=True)
)

┌──────┬─────────────────────┬───────┐
│ Name ┆ date                ┆ price │
│ ---  ┆ ---                 ┆ ---   │
│ str  ┆ datetime[ms]        ┆ i64   │
╞══════╪═════════════════════╪═══════╡
│ GHI  ┆ 2023-01-29 00:00:00 ┆ 1800  │
│ JKL  ┆ 2023-01-29 00:00:00 ┆ 2100  │
│ ABC  ┆ 2024-01-01 00:00:00 ┆ 1000  │
│ DEF  ┆ 2024-01-10 00:00:00 ┆ 1500  │
└──────┴─────────────────────┴───────┘

but probably proper way to do that would be to use scan_parquet() functionality which allows you to scan the path:

(
    pl.scan_parquet(r"test_pl/*/*.parquet")
    .group_by("date")
    .agg(
        pl.col("price").sum().alias("grouped_sum"),
        pl.col("price").count().alias("grouped_count")
    ).collect(streaming=True)
)

┌────────────┬─────────────┬───────────────┐
│ date       ┆ grouped_sum ┆ grouped_count │
│ ---        ┆ ---         ┆ ---           │
│ str        ┆ i64         ┆ u32           │
╞════════════╪═════════════╪═══════════════╡
│ 2024-01-01 ┆ 1000        ┆ 1             │
│ 2024-01-10 ┆ 1500        ┆ 1             │
│ 2023-01-29 ┆ 3900        ┆ 2             │
└────────────┴─────────────┴───────────────┘

additionally, you can always use duckdb for that:

duckdb.sql("""
    select
        a.date,
        sum(a.price) as grouped_sum,
        sum(a.price) as grouped_count
    from read_parquet('test_pl/*/*.parquet') as a
    group by
        a.date
""").pl()

┌────────────┬─────────────┬───────────────┐
│ date       ┆ grouped_sum ┆ grouped_count │
│ ---        ┆ ---         ┆ ---           │
│ date       ┆ f64         ┆ i64           │
╞════════════╪═════════════╪═══════════════╡
│ 2024-01-01 ┆ 1000.0      ┆ 1             │
│ 2024-01-10 ┆ 1500.0      ┆ 1             │
│ 2023-01-29 ┆ 3900.0      ┆ 2             │
└────────────┴─────────────┴───────────────┘