FileNotFoundError when re-reading s3 parquet partition that was cached by PyArrow fsspec before partition was altered

99 Views Asked by At

The sequence of events to replicate is as such:

  1. read in a s3 parquet partition using pandas.read_parquet (which is pyarrow.dataset under the hood).
  2. add another file into that partition.
  3. read the same s3 parquet again, and we will have a FileNotFoundError on this new file.

Below is a code snippet to replicate it:

import os
import boto3
import pandas as pd

df1 = pd.DataFrame([{'a': i, 'b': i} for i in range(10)])
df1.to_parquet('part1.parquet')
df2 = pd.DataFrame([{'a': i+100, 'b': i+100} for i in range(10)])
df2.to_parquet('part2.parquet')

s3_client = boto3.client('s3')
url = 's3://bucket_name/test01'
s3_client.upload_file('part1.parquet', 'bucket_name', os.path.join(url, 'p=x', 'part1.parquet'))
dfx1 = pd.read_parquet(url) # this is fine

s3_client.upload_file('part2.parquet', 'bucket_name', os.path.join(url, 'p=x', 'part2.parquet'))

dfx2 = pd.read_parquet(url) # this will generate the FileNotFoundError on part2.parquet

The exception traceback is as such:

  File "<env>/lib/python3.8/site-packages/pandas/io/parquet.py", line 493, in read_parquet
    return impl.read(
  File "<env>/lib/python3.8/site-packages/pandas/io/parquet.py", line 240, in read
    result = self.api.parquet.read_table(
  File "<env>/lib/python3.8/site-packages/pyarrow/parquet.py", line 1996, in read_table
    return dataset.read(columns=columns, use_threads=use_threads,
  File "<env>/lib/python3.8/site-packages/pyarrow/parquet.py", line 1831, in read
    table = self._dataset.to_table(
  File "pyarrow/_dataset.pyx", line 323, in pyarrow._dataset.Dataset.to_table
  File "pyarrow/_dataset.pyx", line 2311, in pyarrow._dataset.Scanner.to_table
  File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/_fs.pyx", line 1179, in pyarrow._fs._cb_open_input_file
  File "<env>/python3.8/site-packages/pyarrow/fs.py", line 394, in open_input_file
    raise FileNotFoundError(path)
FileNotFoundError: bucket_name/test01/p=x/part2.parquet

Note that if I open a separate python console and try to read in that parquet/partition, it will have no problem at all.

My suspicion is pyarrow does some sort of caching when accessing s3 parquets and when the new file is added, the new file is not in its cache and it even though it knows the new file needs to be read in, it cannot find it. But reloading the pandas and pyarrow module does not help to reset such a cache.

Below are my package versions

  • fsspec== 2022.2.0
  • pandas == 1.4.1
  • pyarrow == 7.0.0

[UPDATE] Overwriting part1.parquet do not work either, if you replace step 2 of the above to uploading part2.parquet to overwrite the existing part1.parquet. The cacher is aware that the original part1.parquet no longer exist. This exception traceback is as such:

File "pyarrow/_dataset.pyx", line 1680, in pyarrow._dataset.DatasetFactory.finish
  File "pyarrow/error.pxi", line 143, in pyarrow.lib.pyarrow_internal_check_status
  File "<env>/lib/python3.8/site-packages/fsspec/spec.py", line 1544, in read
    out = self.cache._fetch(self.loc, self.loc + length)
  File "/<env>/lib/python3.8/site-packages/fsspec/caching.py", line 377, in _fetch
    self.cache = self.fetcher(start, bend)
  File "<env>/lib/python3.8/site-packages/s3fs/core.py", line 1965, in _fetch_range
    raise FileExpired(
s3fs.utils.FileExpired: [Errno 16] The remote file corresponding to filename bucket_name/test01/p=x/part1.parquet and Etag "d64a2de4f9c93dff49ecd3f19c414f61" no longer exists.
2

There are 2 best solutions below

2
mdurant On BEST ANSWER

Explanation: s3fs caches file listings by default. That means that, by default,if you have listed some directory, we do not list it again unless there is a write operation with s3fs or you explicitly clear the cache.

  1. why not use s3fs for you file transfers too?
    fs = fsspec.filesystem("s3")
    fs.put(os.path.join(url, 'p=x', 'part1.parquet'), "bucket_name/part1.parquet")
  1. you can explicitly clear the cache
    fs = fsspec.filesystem("s3")
    fs.invalidate_cache()  # or more specific to specific dirs
  1. turn off or control caching, add one of these to your pandas IO calls
    storage_options={"use_listings_cache": False}
    storage_options={"listings_expiry_time": <int>}
0
michaelgbj On

Short answer:
The easiest way to circumvent this caching is to use this when calling read_parquet:

df = pandas.read_parquet(url, storage_options={'version_aware': True})

This hints the s3fs system that the s3 objects could not changed by other users, which in a sense hints the s3fs to not rely on caching.

Long story:
fsspec is hard to turn off/clear/reset. There is the storage_options handle in read_parquet that are commands to the storage filesystem level (See https://pandas.pydata.org/pandas-docs/version/1.5/reference/api/pandas.read_parquet.html).
This links to the available options for storage_options (here https://pandas.pydata.org/docs/user_guide/io.html#reading-writing-remote-files) but none of those give us the handle we need.
Clicking the S3FS documentation link that it provides, (https://s3fs.readthedocs.io/en/latest/index.html?highlight=host#s3-compatible-storage), the section is also not useful, but a section later (#bucket-version-awareness) states:

If your bucket has object versioning enabled then you can add version-aware support to s3fs. This ensures that if a file is opened at a particular point in time that version will be used for reading.

This mitigates the issue where more than one user is concurrently reading and writing to the same object.

Even though it is not stated, this is acceptable argument for storage_options, which can resolve this issue.