Dask dataframe create folders instead of files when saving processed files to parquet

64 Views Asked by At

I have some very large parquet files that i wanna make some processing, merging and cleaning and then and then, save those files into another folder. I am using dask dataframe since its the only way i can read those files without getting out of memory. The strategy i used was to split each file into a bunch of intermediate parquet files and process them individually, one by one in order to save memory. This is how i split the dataframe df:

def splitting_files(df, chunk_size):
#The chunk size i pass as a parameter and it refers to the number of rows 
total_rows = len(df)
row_indices = list(range(0, total_rows, chunk_size))
if row_indices[-1] < total_rows:
    row_indices.append(total_rows)

for i in range(len(row_indices) - 1):
    start_idx = row_indices[i]
    end_idx = row_indices[i+1]

    #In this part, dask dataframe does not support iloc, so i had to use loc. 
    chunk_df = df.loc[start_idx: end_idx]
    
    #Calling the processing function here
    chunk_df = processing_files(chunk_df)
    
    #Here i am saving the intermediate chunks parquet files
    chunk_df.to_parquet("some intermediate folder", engine = 'fastparquet', index = False)

In the end, the idea was to concat all of those intermediate, already processed files into a single parquet file. The problem seems to be in the last line of code. When it saves the intermediate files to parquet, it saves a bunch of folders and not the parquet files as shown in the image attached. Because of that, in the end, i cannot concatenate all the intermediate files inside that folder because all i have are folders, not files.

After a lot of reading and research, i learned that if i put the compute() method before saving to parquet, i'll turn the dask dataframe into a pandas dataframe. By doing that, i'll be able to save the actual parquet files instead of folders. The problem is that this process defeats the purpose of using dask dataframe and it takes a lot of time to process.

So in summary, doing the code down below works, but its way inneficient and it takes too long:

chunk_df.compute().to_parquet("some intermediate folder", engine = 'fastparquet', index = False)

Isn't there another way to save 'to_parquet' the files i wanna save and not the folders?

1

There are 1 best solutions below

0
On

Try the following:

chunk_df.to_parquet("some intermediate folder", engine='pyarrow', write_index=False, name_function=lambda x: f'something_{x}.parquet')