Dask dealing with gzipped files during processing

13 Views Asked by At

Is there any right way of processing gzipped jsonl files in Dask with Bags?

I've managed to successfully process the file and store it in parquet chunks only because the file was unarchived, but we get it from a data provider in gzip, a lot of these files and they are heavy.

Obviously gzipped format doesn't allow me to use blocksize=1*8 which leads to Signal 9 and OOM errors in Dask.

Downloading the file to worker and unarchiving also seems sub-optimal.

This code properly converts the unarchived file:

def process_partition(partition, file_name):
    partition_index = str(uuid.uuid4())
    records = [json.loads(record) for record in partition]
    df = pd.DataFrame(records)
    storage_filename = f"s3://{S3_BRONZE_BUCKET}/{S3_BRONZE_PATH}{file_name.split('/')[-1].replace('.jsonl', f'_partition_{partition_index}.parquet')}"
    df.to_parquet(storage_filename, storage_options=STORAGE_OPTIONS, index=False)

file = files[0]
file_name = file.split('/')[-1]
# Read the unarchived file in bytesized chunks
chunks = dask_bag.read_text(f"s3://{file}", blocksize=1e8)

with ProgressBar():
    results = chunks.map_partitions(lambda part: process_partition(part, file_name))
    results.compute()
    print(results)

Thank you

0

There are 0 best solutions below