I would like to process many remote zst-compressed JSONL files that are available via http/https. I do not have enough space to store the file (I'm running many in parallel and they're many gigs), so I want to decompress them as they are being downloaded. I am testing the following code:
#!/usr/bin/env python3
import argparse
import json
import zstandard as zstd
import io
import requests
import time
from typing import Union
from urllib.parse import urlparse
def process_file(input_file: str) -> None:
"""
Process a zstandard compressed JSONL file or a stream.
This function takes a file path or a URL as input, decompresses it, and then
processes each JSON object one by one.
:param input_file: The input file path or URL to process.
"""
# Create a decompression object
# Increase the max_window_size if necessary
# Be careful as this can potentially use a lot of memory
# dctx = zstd.ZstdDecompressor(max_window_size=2**30) # for example, allow up to 1 GB
dctx = zstd.ZstdDecompressor()
# Check if the input is a URL
if input_file.startswith(('http://', 'https://')):
# Stream the file from the URL
response = requests.get(input_file, stream=True)
reader = dctx.stream_reader(response.raw)
else:
# If the input is a file path
f = open(input_file, 'rb')
reader = dctx.stream_reader(f)
text_stream = io.TextIOWrapper(reader, encoding='utf-8')
line_counter = 0
start_time = time.time()
for line in text_stream:
# Each line is a JSON string, so we parse it into a Python object
data = json.loads(line)
line_counter += 1
if line_counter % 100_000 == 0:
elapsed_time = time.time() - start_time
print(f'Processed {line_counter:,d} lines in {elapsed_time:,.2f} seconds ({line_counter / elapsed_time:,.2f} lines/sec)')
# Don't forget to close the file if it's not a URL
if not input_file.startswith(('http://', 'https://')):
f.close()
def main():
"""
Main function to parse command line arguments and process the input file.
"""
parser = argparse.ArgumentParser(description='Process a zstandard compressed JSONL file.')
parser.add_argument('--input-file', '-i', type=str, required=True, help='The input file to process')
args = parser.parse_args()
process_file(args.input_file)
if __name__ == "__main__":
main()
The local file on-the-fly decompression works, but the remote code raises a zstd.ZstdError: zstd decompress error: Frame requires too much memory for decoding exception; HOWEVER, I am certain it is not a memory issue. I set max_window_size=2**30 or 1GB and tested it on a 395K file (2.1M uncompressed) and still got the error. If I put response.content or response.raw or
response.raw.data in the reader = dctx.stream_reader(response.raw) line, I get the same error.
If I add:
response = requests.get(input_file, stream=True)
with open('test.zst', 'wb') as fh:
for chunk in response.raw:
print(".", end="")
fh.write(chunk)
print
exit()
It writes the file just fine and I can use zstd on the command line. Adding read_across_frames=True to the stream_reader does nothing:
reader = dctx.stream_reader(response.raw, read_across_frames=True)
And the documentation at https://python-zstandard.readthedocs.io/en/latest/decompressor.html is confusing at best but seems to be wrong in some places since some of the example code does not work.
How do I decompress a remote zst file on the fly?
More Info Using completely ex-novo code trying to use SFTP in stead of HTTP/HTTPS, I still get the same error.
#!/usr/bin/env python3
import argparse
import io
import bz2
import gzip
import paramiko
import zstandard as zstd
from urllib.parse import urlparse
class FileReader:
def __init__(self, filename):
"""
Initialize FileReader with a filename.
:param filename: The name of the file to read.
"""
self.filename = filename
self.file = None
self.sftp = None
def __enter__(self):
"""
Open the file when entering the context.
"""
self.open()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""
Close the file when exiting the context.
"""
self.close()
def __iter__(self):
"""
Make the FileReader object iterable.
"""
return self
def __next__(self):
"""
Provide the next line in the file.
"""
line = self.file.readline()
if not line:
# End of file
raise StopIteration
else:
return line
def open(self):
"""
Open the file for reading, decompressing it on the fly if necessary.
"""
parsed = urlparse(self.filename)
if parsed.scheme == 'sftp':
# Handle SFTP URLs
client = paramiko.SSHClient()
client.load_system_host_keys()
client.connect(parsed.hostname, username=parsed.username, password=parsed.password)
sftp = client.open_sftp()
f = sftp.file(parsed.path, 'rb')
if self.filename.endswith('.zst'):
dctx = zstd.ZstdDecompressor()
self.file = io.TextIOWrapper(dctx.stream_reader(f), encoding='utf-8')
elif self.filename.endswith('.bz2'):
self.file = bz2.BZ2File(f)
elif self.filename.endswith('.gz'):
self.file = gzip.GzipFile(fileobj=f)
else:
self.file = f
self.sftp = sftp
else:
if self.filename.endswith('.zst'):
dctx = zstd.ZstdDecompressor()
self.file = io.TextIOWrapper(dctx.stream_reader(open(self.filename, 'rb')), encoding='utf-8')
elif self.filename.endswith('.bz2'):
self.file = bz2.open(self.filename, 'rt')
elif self.filename.endswith('.gz'):
self.file = gzip.open(self.filename, 'rt')
else:
self.file = open(self.filename, 'r')
def close(self):
"""
Close the file.
"""
if self.file is not None:
self.file.close()
if self.sftp is not None:
self.sftp.close()
def main():
"""
Main function, mostly for testing.
"""
parser = argparse.ArgumentParser(description='Process a zstandard compressed JSONL file.')
parser.add_argument('--input-file', '-i', type=str, required=True, help='The input file to process')
args = parser.parse_args()
with FileReader(args.input_file) as reader:
for line in reader:
print(line)
exit()
if __name__ == "__main__":
main()
Result:
> filereader.py -i sftp://sftp.example.com/path/to/file.zst
Traceback (most recent call last):
File "filereader.py", line 114, in <module>
main()
File "filereader.py", line 109, in main
for line in reader:
File "filereader.py", line 45, in __next__
line = self.file.readline()
^^^^^^^^^^^^^^^^^^^^
zstd.ZstdError: zstd decompress error: Frame requires too much memory for decoding
Works fine with .gz, .bz2, and uncompressed files over SFTP. It's just zstandard that's causing issues.
The code above works if you set
Instead of
2**30if you are going to try and uncompress any zstandard files compressed with --long=31.For those who are interested: It appears that, for some (older?) versions of the zstd command-line tool, if you specify --long=31 it will make the long-distance matching up to 2GB in distance regardless of file size while some others (current?) appear to not allow matching that is >= the file size (which makes sense). So, when I was testing with a small file that was compress with an older zstd, it failed, but when I tried to recreate the problem, I could not since I was using a current version of zstd.
I may post the more complete code that handles files over SFTP/SSH compressed in zst, bz2, and gz format.