Read timeout in pd.read_parquet from S3, and understanding configs

1.1k Views Asked by At

I'm trying to simplify access to datasets in various file formats (csv, pickle, feather, partitioned parquet, ...) stored as S3 objects. Since some users I support have different environments with limited options for upgrading (big company, don't ask), I need to develop multiple solutions that achieve similar use cases.

One method which has worked is this:

assert pd.__version__ > '1.3.0'
import s3fs
import pandas as pd
from typing import Callable

read_methods = [pd.read_csv, pd.read_parquet, pd.read_feather, pd.read_pickle, ]

def s3_pd_read(loc: str, read_fn: Callable, **kwargs) -> pd.DataFrame:
    df = read_fn(
        's3a://bucket/data/month=1',
        storage_options = {
            "key"          : os.getenv("AWS_ACCESS_KEY_ID"),
            "secret"       : os.getenv("AWS_SECRET_ACCESS_KEY"),
            "client_kwargs": {
                'verify'      : os.getenv('AWS_CA_BUNDLE'),
                'endpoint_url': 'https://endpoint.com/'
            }
        },
        # Pass keyword args to pandas read method
        **kwargs
    )
    return df

But configuring this to improve performance and availability is challenging. For example, I'd like to set configs similar to s3fs.S3FileSystem config_kwargs "connect_timeout", "read_timeout", "max_pool_connections".

pd.read_parquet docs mention this about storage_options config:

Extra options that make sense for a particular storage connection, e.g. host, port, username, password, etc. For HTTP(S) URLs the key-value pairs are forwarded to urllib as header options. For other URLs (e.g. starting with “s3://”, and “gcs://”) the key-value pairs are forwarded to fsspec. Please see fsspec and urllib for more details.

I checked the fsspec docs but it's not obvious what key-value pairs are available to set, nor what they do. One link in it directed to http client sessions (not S3).

Similarly for the S3FileSystem + pyarrow.parquet (which doesn't work in some user environments), I have this code which works:

from s3fs import S3FileSystem
import pyarrow.parquet as pq

s3 = S3FileSystem(
    s3_additional_kwargs={'access_key': os.environ['AWS_ACCESS_KEY_ID'], 'secret_key': os.environ['AWS_SECRET_ACCESS_KEY']},
    client_kwargs={
        'verify': os.getenv('AWS_CA_BUNDLE'), 'endpoint_url': 'https://endpoint.com/'
    },
    config_kwargs = {'connect_timeout': 600, 'read_timeout': 600, 'max_pool_connections': 50}
)

df = pq.read_table(
    "bucket/data/month=1", 
    filesystem=s3
).to_pandas()

But the docs indirectly lead to client_kwarg docs, but regarding s3_additional_kwargs, it's not obvious which parameters are referred to (e.g. copy, or get-object), which get used, and with what preference when this S3FileSystem is passed to some pyarrow function:

s3_additional_kwargs:dict of parameters that are used when calling s3 api methods. Typically used for things like “ServerSideEncryption”. client_kwargs:dict of parameters for the botocore client

Can someone help elucidate this spaghetti of documentation? For example, in the case of using only pd.read_parquet, I'd like to pass some sort of connection and/or read timeout config to reduce timeouts. How can I do things like that?

1

There are 1 best solutions below

1
On

i recommend before calling read_parquet() to check if file is present, in this way you wont need to rely on the default or custom timeout