Why reading an small subset of the rows with Parquet Dataset take the same time than reading the whole file?

9.4k Views Asked by At

I'm developing a program to analyze some historical prices of some assets. The data is structured and analyzed as a pandas dataframe. The columns are the dates and the rows are the assets. Previously I was using the transpose of this, but this format gave me better reading time. I saved this data in a parquet file and now I want to read an interval of dates from A to B for example and an small set of assets, analyze it and then repeat the same process with the same assets but in the interval from B + 1 to C. The problem is that even if I use a unique row, the parquet read take the same time that if I read the whole file. Is there a way to improve this behaviour?, It would be good that, once it filter the rows, it saves where the blocks in memory are to speed up the nexts reads. Do I have to write a new file with the assets filtered?.

I tried writing the parquet file with a small number of row groups and smaller data page size to avoid the complete reading, but this doesn't gave me a good results in terms of time.

Other question that I have is the follwing. Why if we read the complete parquet file using a Parquet Dataset and use_legacy_dataset = False, it takes more time than reading the same parquet dataset with use_legacy_dataset = True?

Code example:

import pandas as pd 
import numpy as np
import time
import pyarrow.parquet as pq

# generating the small data for the example, the file weight like 150MB for this example, the real data 
# has 2 GB
dates = pd.bdate_range('2019-01-01', '2020-03-01')
assets = list(range(1000, 50000))

historical_prices = pd.DataFrame(np.random.rand(len(assets), len(dates)), assets, dates)
historical_prices.columns = historical_prices.columns.strftime('%Y-%m-%d')

# name of the index
historical_prices.index.name = 'assets'

# writing the parquet file using the lastest version, in the comments are the thigns that I tested
historical_prices.to_parquet(
    'historical_prices.parquet', 
    version='2.0', 
    data_page_version='2.0', 
    writer_engine_version='2.0',
    # row_group_size=100,
    # compression=None
    # use_dictionary=False,
    # data_page_size=1000,
    # use_byte_stream_split=True,
    # flavor='spark',
)


# reading the complete parquet dataset 
start_time = time.time()

historical_prices_dataset = pq.ParquetDataset(
    'historical_prices.parquet', 
    use_legacy_dataset=False
)
historical_prices_dataset.read_pandas().to_pandas()

print(time.time() - start_time)


# Reading only one asset of the parquet dataset
start_time = time.time()


filters = [('assets', '=', assets[0])]
historical_prices_dataset = pq.ParquetDataset(
    'historical_prices.parquet', 
    filters=filters, 
    use_legacy_dataset=False
)

historical_prices_dataset.read_pandas().to_pandas()

print(time.time() - start_time)

# this is what I want to do, read by intervals.
num_intervals = 5

for i in range(num_intervals):
    start = int(i * len(dates) / num_intervals)
    end = int((i + 1) * len(dates) / num_intervals)
    interval = list(dates[start:end].strftime('%Y-%m-%d'))
    historical_prices_dataset.read_pandas(columns=interval).to_pandas()

    # Here goes some analyzing process that can't be done in parallel due that the results of every interval
    # are used in the next interval

print(time.time() - start_time)


2

There are 2 best solutions below

2
On BEST ANSWER

I was using the transpose of this, but this format gave me better reading time.

Parquet supports individual column reads. So if you have 10 columns of 10k rows and you want 5 columns then you'll read 50k cells. If you have 10k columns of 10 rows and you want 5 columns then you'll read 50 cells. So presumably this is why the transpose gave you better reading time. I don't think I have enough details here. Parquet also supports reading individual row groups, more on that later.

You have roughly 49,000 assets and 300 dates. I'd expect you to get better performance with assets as columns but 49,000 is a lot of columns to have. It's possible that either you are having to read too much column metadata or you are dealing with CPU overhead from keeping track of so many columns.

It is a bit odd to have date values or asset ids as columns. A far more typical layout would be to have three columns: "date", "asset id", & "price".

The problem is that even if I use a unique row, the parquet read take the same time that if I read the whole file

Yes, if you have a single row group. Parquet does not support partial row group reads. I believe this is due to the fact that the columns are compressed. However, I do not get the same results you are getting. The middle time in your example (the single asset read) is typically ~60-70% of the time of the first read. So it is faster. Possibly just because there is less conversion to do to get to pandas or maybe there is some optimization I'm not aware of.

The problem is that even if I use a unique row, the parquet read take the same time that if I read the whole file. Is there a way to improve this behaviour?, It would be good that, once it filter the rows, it saves where the blocks in memory are to speed up the nexts reads. Do I have to write a new file with the assets filtered?.

Row groups may be your answer. See the next section.

I tried writing the parquet file with a small number of row groups and smaller data page size to avoid the complete reading, but this doesn't gave me a good results in terms of time.

This is probably what you are after (or you can use multiple files). Parquet supports reading just one row group out of a whole file. However, 100 is too small of a number for row_group_size. Each row group creates some amount of metadata in the file and has some overhead for processing. If I change that to 10,000 for example then the middle read is twice as fast (and now only 30-40% of the full table read).

Other question that I have is the follwing. Why if we read the complete parquet file using a Parquet Dataset and use_legacy_dataset = False, it takes more time than reading the same parquet dataset with use_legacy_dataset = True?

This new datasets API is pretty new (new as of 1.0.0 which released in July). It's possible there is just a bit more overhead. You are not doing anything that would take advantage of the new datasets API (e.g. using scan or non-parquet datasets or new filesystems). So while use_legacy_datasets shouldn't be faster it should not be any slower either. They should take roughly the same amount of time.


It sounds like you have many assets (tens of thousands) and you want to read a few of them. You also want to chunk the read into smaller reads (which you are using the date for).

First, instead of using the date at all, I would recommend using dataset.scan (https://arrow.apache.org/docs/python/dataset.html). This will allow you to process your data one row group at a time.

Second, is there any way you can group your asset ids? If each asset ID has only a single row you can ignore this. However, if you have (for example) 500 rows for each asset ID (or 1 row for each asset ID/date pair) can you write your file so that it looks something like this...

asset_id  date  price
A         1     ?
A         2     ?
A         3     ?
B         1     ?
B         2     ?
B         3     ?

If you do this AND you set the row group size to something reasonable (try 10k or 100k and then refine from there) then you should be able to get it so that you are only reading 1 or 2 row groups per asset ID.

0
On

I found another approach that give me better times for my specific cases, of course, this is a not very general solution. It has some not pyarrow's functions, but it do what I thought the filters of pyarrow do when we read multiple times the same rows. When the number of row groups to read grow, the parquet dataset gave better performance.

import pandas as pd
import numpy as np
import time
import pyarrow.parquet as pq
from typing import Dict, Any, List


class PriceGroupReader:
    def __init__(self, filename: str, assets: List[int]):
        self.price_file = pq.ParquetFile(filename)
        self.assets = assets
        self.valid_groups = self._get_valid_row_groups()

    def _get_valid_row_groups(self):
        """
        I don't fine a parquet function to make this row group search, so I did this manual search.
        Note: The assets index is sorted, so probably this can be improved a lot.
        """
        start_time = time.time()
        assets = pd.Index(self.assets)
        valid_row_groups = []
        index_position = self.price_file.schema.names.index("assets")

        for i in range(self.price_file.num_row_groups):
            row_group = self.price_file.metadata.row_group(i)
            statistics = row_group.column(index_position).statistics
            if np.any((statistics.min <= assets) & (assets <= statistics.max)):
                valid_row_groups.append(i)

        print("getting the row groups: {}".format(time.time() - start_time))
        return valid_row_groups

    def read_valid_row_groups(self, dates: List[str]):
        
        row_groups = []
        for row_group_pos in self.valid_groups:
            df = self.price_file.read_row_group(row_group_pos, columns=dates, use_pandas_metadata=True).to_pandas()
            df = df.loc[df.index.isin(self.assets)]
            row_groups.append(df)

        df = pd.concat(row_groups)
    

        """
        # This is another way to read the groups but I think it can consume more memory, probably is faster.
        df = self.price_file.read_row_groups(self.valid_groups, columns=dates, use_pandas_metadata=True).to_pandas()
        df = df.loc[df.index.isin(self.assets)]
        """
        
        return df


def write_prices(assets: List[int], dates: List[str]):
    historical_prices = pd.DataFrame(np.random.rand(len(assets), len(dates)), assets, dates)

    # name of the index
    historical_prices.index.name = 'assets'

    # writing the parquet file using the lastest version, in the comments are the thigns that I tested
    historical_prices.to_parquet(
        'historical_prices.parquet',
        version='2.0',
        data_page_version='2.0',
        writer_engine_version='2.0',
        row_group_size=4000,
        # compression=None
        # use_dictionary=False,
        # data_page_size=1000,
        # use_byte_stream_split=True,
        # flavor='spark',
    )


# generating the small data for the example, the file weight like 150MB, the real data weight 2 GB
total_dates = list(pd.bdate_range('2019-01-01', '2020-03-01').strftime('%Y-%m-%d'))
total_assets = list(range(1000, 50000))
write_prices(total_assets, total_dates)

# selecting a subset of the whole assets
valid_assets = total_assets[:3000]

# read the price file for the example
price_group_reader = PriceGroupReader('historical_prices.parquet', valid_assets)

# reading all the dates, only as an example
start_time = time.time()
price_group_reader.read_valid_row_groups(total_dates)
print("complete reading: {}".format(time.time() - start_time))

# this is what I want to do, read by intervals.
num_intervals = 5

start_time = time.time()
for i in range(num_intervals):
    start = int(i * len(total_dates) / num_intervals)
    end = int((i + 1) * len(total_dates) / num_intervals)
    interval = list(total_dates[start:end])
    df = price_group_reader.read_valid_row_groups(interval)
    # print(df)

print("interval reading: {}".format(time.time() - start_time))


filters = [('assets', 'in', valid_assets)]
price_dataset = pq.ParquetDataset(
    'historical_prices.parquet', 
    filters=filters, 
    use_legacy_dataset=False
)

start_time = time.time()
price_dataset.read_pandas(columns=total_dates).to_pandas()
print("complete reading with parquet dataset: {}".format(time.time() - start_time))

start_time = time.time()
for i in range(num_intervals):
    start = int(i * len(total_dates) / num_intervals)
    end = int((i + 1) * len(total_dates) / num_intervals)
    interval = list(total_dates[start:end])
    df = price_dataset.read_pandas(columns=interval).to_pandas()

print("interval reading with parquet dataset: {}".format(time.time() - start_time))