I have a large input dataset that needs some processing that can be parallelized. I've been able to scale to a fairly large number of parallel processes by first storing a huge input pyarrow table to local disk like this:

# acquire "table"
with pyarrow.OSFile(path, "wb") as sink:
    with pyarrow.ipc.new_file(sink, table.schema) as writer:
        writer.write_table(table)

...then in each sub-process using it with zero heap allocation like this:

with pyarrow.memory_map(path) as source:
    table = pyarrow.ipc.open_file(source).read_all()

It works phenomenally well, kudos to the Arrow project for making it so easy.

Now the question. In the acquire table step marked above, input is a parquet table downloaded from an S3-like source (so it has e.g. download_fileobj returning a file handle). Compressed parquet is unsuitable for memory_map as-is, I must fully load it, decompress and then write to disk -- which allocates a lot of memory on the heap and it makes it fairly inefficient. What I would like to find is some stream filter parquet_to_arrow_filter, that converts a stream coming from HTTP response to raw Arrow on the fly:

# acquire "bucket"
with pyarrow.OSFile(path, "wb") as sink:
    sink.download(parquet_to_arrow_filter(bucket.download_fileobj(bucket_path)))

The expectation here is that an incoming stream could be decompressed block-by-block where each block is fixed size and reusable, so memory allocated on heap stays constant (more or less). I couldn't find such functionality in pyarrow. Does it exist?

I tried reading documentation and pyarrow.parquet source code but couldn't figure out how to assemble such filter from available parts.

0

There are 0 best solutions below