Fastparquet doesn't seem to be pushing down filters

1.3k Views Asked by At

I have created a parquet file using dask's dataframe to_parquet method using fastparquet as the engine. Reading the file using the fastparquet.ParquetFile i get the following info.

from fastparquet import ParquetFile
file = ParquetFile('data/raw_data_fastpar.par/')
file.dtypes
OrderedDict([(u'@timestamp', dtype('<M8[ns]')),
         (u'@version', dtype('O')),
         (u'_id', dtype('O')),
         (u'browser_build', dtype('O')),
         (u'browser_device', dtype('O')),
         (u'browser_major', dtype('float64')),
         (u'browser_minor', dtype('float64')),
         (u'browser_name', dtype('O')),
         (u'browser_os', dtype('O')),
         (u'browser_os_name', dtype('O')),
         (u'dst', dtype('O')),
         (u'dst_port', dtype('float64')),
         (u'http_req_header_contentlength', dtype('O')),
         (u'http_req_header_host', dtype('O')),
         (u'http_req_header_referer', dtype('O')),
         (u'http_req_header_useragent', dtype('O')),
         (u'http_req_headers', dtype('O')),
         (u'http_req_method', dtype('O')),
         (u'http_req_secondleveldomain', dtype('O')),
         (u'http_req_url', dtype('O')),
         (u'http_req_version', dtype('O')),
         (u'http_resp_code', dtype('O')),
         (u'http_resp_header_contentlength', dtype('O')),
         (u'http_resp_header_contenttype', dtype('O')),
         (u'http_resp_headers', dtype('O')),
         (u'http_user', dtype('O')),
         (u'received_from', dtype('O')),
         (u'redis_db', dtype('O')),
         (u'src', dtype('O')),
         (u'src_port', dtype('float64')),
         (u'type', dtype('O')),
         (u'month', u'category'),
         (u'day', u'category')])


file.schema.text
u'- schema: \n
| - @timestamp: INT64, TIMESTAMP_MICROS, OPTIONAL\n
| - @version: BYTE_ARRAY, UTF8, OPTIONAL\n
| - _id: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_build: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_device: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_major: DOUBLE, OPTIONAL\n
| - browser_minor: DOUBLE, OPTIONAL\n
| - browser_name: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_os: BYTE_ARRAY, UTF8, OPTIONAL\n
| - browser_os_name: BYTE_ARRAY, UTF8, OPTIONAL\n
| - dst: BYTE_ARRAY, UTF8, OPTIONAL\n
| - dst_port: DOUBLE, OPTIONAL\n
| - http_req_header_contentlength: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_host: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_referer: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_header_useragent: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_headers: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_method: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_secondleveldomain: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_url: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_req_version: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_code: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_header_contentlength: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_header_contenttype: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_resp_headers: BYTE_ARRAY, UTF8, OPTIONAL\n
| - http_user: BYTE_ARRAY, UTF8, OPTIONAL\n
| - received_from: BYTE_ARRAY, UTF8, OPTIONAL\n
| - redis_db: BYTE_ARRAY, UTF8, OPTIONAL\n
| - src: BYTE_ARRAY, UTF8, OPTIONAL\n
| - src_port: DOUBLE, OPTIONAL\n  
| - type: BYTE_ARRAY, UTF8, OPTIONAL'

So the fields are correct. Since they were timeseries data, month and day were used for partitioning the data. Total number of data is 22815984. Now I try to read the parquet using filters keyword, and I get a weird behavior.

# this works
import datetime
since = datetime.datetime(year=2018, month=10, day=1)
filters = [('@timestamp', '>', np.datetime64(since)),]

raw_data = dd.read_parquet('data/raw_data_fastpar.par/', engine='fastparquet', columns=['http_user', 'dst', 'dst_port', 'http_req_method'], filters=filters)

raw_data.count().compute()

http_user          3835971
dst                3835971
dst_port           3835971
http_req_method    3835971
dtype: int64

which is correct, and filtering was pushed down. When I change the filter to another field,

filters = [('http_req_method', '=', 'GET'),]

It gets back all the data

http_user          22815984
dst                22815984
dst_port           22815984
http_req_method    22815984
dtype: int64

Doing it manually, it works:

raw_data = dd.read_parquet('data/raw_data_fastpar.par/', engine='fastparquet', columns=['http_user', 'dst', 'dst_port', 'http_req_method'])
raw_data.loc[raw_data.http_req_method == 'GET'].count().compute()
http_user          14407709
dst                14407709
dst_port           14407709
http_req_method    14407709
dtype: int64

Also changing filter to a non existing field, raises no exception of what so ever, so this is also strange. Is there something that I am missing regarding parquet and filtering?

Dask DataFrame Structure:
    http_user   dst     dst_port    http_req_method
npartitions=612                 
    object      object  float64         object
    ...         ...     ...             ...
    ...         ...     ...             ...     
... ...         ...     ...             ...
    ...         ...     ...             ...
Dask Name: read-parquet, 612 tasks
1

There are 1 best solutions below

2
On BEST ANSWER

The filters= option is included as an optimisation for cases where it makes sense, to avoid considering sections of the data that are certain not to contain any valid data.

In the docs:

This implements row-group (partition) -level filtering only, i.e., to prevent the loading of some chunks of the data, and only if relevant statistics have been included in the metadata.

For example, if you have a set of row-groups where a column of interest is monotonically increasing, then a filter on that column will probably be able to exclude many of the row-groups (aka partitions). On the other hand, if every row-group contains values throughout the range of that column, then this kind of filter will have any effect.

data[raw_data.http_req_method == 'GET']

This does something different: now every row-group is loaded as a partition, and then is filtered in the memory of the workers. Dask may be able to load only certain partitions only in the special case that you are filtering on the index.

If you want the optimisation, but your data is not structured such that the partition boundaries perfectly aligns with your filter condition, you will need to use both methods.

Please raise an issue, if you think the docstring can be clearer.