I have a 18 GB .parquet file with a ~300M rows of accounting data (which I cannot share) and split in to 53 row groups. My task is to 'clean' the data by retaining in each cell, only specific words from a dictionary. The reading of the file is trouble free. The processing of the file ends in a segmentation fault on a 20-core, 128GB RAM Ubuntu 22.04 desktop.
Using Python and the Dask library, I convert the data in to a Dask data frame with the following columns:
['rowid', 'txid', 'debit', 'credit', 'effective_date', 'entered_date', 'user_id', 'transaction', 'memo', 'type', 'account', 'total_amt']
The columns to be cleaned in this file are memo, type, and account. My approach is to take each of those columns and apply a filter_field and a hash_field method to them:
if isinstance(data, dd.DataFrame):
# clean memo columns
# data = data.repartition(npartitions=20) <incl. this line in experiments with partition size>
result = [data[col].apply(lambda x: self.filter_field(text=x, word_dict=word_dict), meta=data[col]) for col in memo_columns]
for i, col in enumerate(memo_columns): # this loop seems to be req'd to assign values
data[col] = result[i]
# second: hash the name/id fields
id_cols = name_columns + id_columns + account_columns
result = [data[col].apply(lambda x: self.hash_field(text=x), meta=data[col]) for col in id_cols]
for i, col in enumerate(id_cols):
data[col] = result[i]
del result
gc.collect()
The filter_field takes each cell, removes symbols, then checks to see if remaining words are in a dictionary, and if they are not the words are dropped.
The hash_field is just shake_256(text.encode(encoding='utf8')).hexdigest(20).
I know the filtering is sound b/c everything runs fine on near identical files of up to 128M rows. Two things happen when I run this larger file:
- at some point I get a segmentation fault. Sometimes it happens early in the processing, sometimes later.
- rarely are more than 3 cores processing at more 30-50% and when there are more the additional cores are at ~1-2% (observed via htop)
What I would like to know:
- is there a better approach than the looping/vectorizing I used; or
- how can I get more cores working on the process.
Notes:
- I tried various approaches with partition size changes, both by varying number and size. There was no visible improvement in processing and the large file still threw a seg fault.
- what I expect my approach to be doing is taking each column presented, dividing it in to ~20 pieces, then processing those pieces in parallel (one piece per processor).