Splitting very large csv using chunks with conditions on the nth character of a string

247 Views Asked by At

I am trying to split a very large (22GB) CSV file, using both chunks and a condition on the nth character of a given column in my data.

I have been trying quite helplessly to combine this: Python: Split CSV file according to first character of the first column

with something like this but I have hit a wall. I had a condition on a column not being null here, but I would like instead to split my files according to the nth character of a given column.

Is there anyway to create smaller csv files based on such a condition. any help would be enormously appreciated.

A summary of my data would look like this:

sourcename date_naissance date_deces date_mariage place
dgfkf47 YYYYMMDD YYYYMMDD etc. etc.
fhfidk67 YYYYMMDD YYYYMMDD etc. etc.
kgodj45 YYYYMMDD etc. etc.
paoror76 YYYYMMDD wwwwwww etc. etc.
poldidj90 YYYYMMDD

what I want to do is create a series of smaller files I can use later to analyse the data by splitting them according to the 7th character of the column ID. I know how to do it in 5X10 as it fits in my memory and I simply use groupby, but I am stuck for a very large. as ask does not seem to let me iterate groupby.

My strategy now was to do all cleaning operation on Dask ,including creating a new column containing only the 7th character, and then output smaller files that could be loaded in pandas and groupedby this column.

I have got so far as this for now, but I would love to know if there is a simple way to do this:

import dask.dataframe as dd
import glob, os
import pyarrow.parquet
import pyarrow as pa
from dask import multiprocessing
PATH = r"/RAW DATA/TEST/"
os.chdir(PATH)
for file in glob.glob("**/*_clean.csv", recursive=True):
    ddf = dd.read_csv(file, encoding='iso-8859-1', sep=';', dtype={'insee': 'object',
       'ref_document': 'object', 'ref_interne_sourcename': 'object',
       'sexe': 'object', 'profession': 'object', 'date_naissance': 'object', 'date_mariage': 'object', 'date_deces': 'object'})
# read to parquet
    ddf.to_parquet('file_clean.csv.parquet', engine='pyarrow', schema={'insee': pa.string(),
       'ref_document': pa.string(), 'ref_interne_sourcename': pa.string(),
       'sexe': pa.string(), 'profession': pa.string(), 'date_naissance': pa.string(), 'date_mariage': pa.string(), 'date_deces': pa.string()})
    ddf = dd.read_parquet('file_clean.csv.parquet', engine='pyarrow')
# split dates
    ddf['date_naissance']= dd.to_datetime(ddf['date_naissance'], format='%Y%m%d', errors='coerce')
    ddf['date_mariage']= dd.to_datetime(ddf['date_mariage'], format='%Y%m%d', errors='coerce')
    ddf['date_deces']= dd.to_datetime(ddf["date_deces"], format='%Y%m%d', errors='coerce')
    ddf['jour_naissance'] = ddf['date_naissance'].dt.day
    ddf['mois_mariage'] = ddf['date_naissance'].dt.month
    ddf['annee_mariage'] = ddf['date_naissance'].dt.year
    ddf['jour_mariage'] = ddf['date_mariage'].dt.day
    ddf['mois_mariage'] = ddf['date_mariage'].dt.month
    ddf['annee_mariage'] = ddf['date_mariage'].dt.year
    ddf['jour_deces'] = ddf['date_deces'].dt.day
    ddf['mois_deces'] = ddf['date_deces'].dt.month
    ddf['annee_deces'] = ddf['date_deces'].dt.year
# drop columns
    del ddf['date_naissance']
    del ddf['date_mariage']
    del ddf['date_deces']
# create IDTAG column
    ddf['IDTAG'] = ddf['sourcename'].str[6].fillna('')
    namefile = os.path.splitext(file)[0]
    ddf.to_csv(namefile)
    for partfile in glob.glob('**/*.part'):
        os.rename(partfile, (os.path.splitext(partfile)[0]) + "_{}_part.csv".format(namefile))
    for partfilecsv in glob.glob("**/*_part.csv", recursive=True):
        part_df = pd.read_csv(partfilecsv, encoding='iso-8859-1', sep=';')
# get a list of columns
        cols = list(part_df)
# move the column to head of list
        cols.insert(0, cols.pop(cols.index('IDTAG')))
# reorder
        part_df = part_df.loc[:, cols]
# group by IDTAG
        def firstletter(Index):
            firstentry = part_df.iloc[Index, 0]
            return firstentry[0]
        for letter, group in part_df.groupby(firstletter):
            group.to_csv((os.path.splitext(partfilecsv)[0]) + '_source_{}.csv'.format(letter))
1

There are 1 best solutions below

0
On

Well this how I managed to do it - followed by another script to merge the smaller files produced. Surely a more elegant solution exists. Happy to learn.

import dask.dataframe as dd
import glob, os
import pyarrow.parquet
import pyarrow as pa
from dask import multiprocessing
PATH = "path"
os.chdir(PATH)
for file in glob.glob("**/*_clean.csv", recursive=True):
    ddf = dd.read_csv(file, encoding='iso-8859-1', sep=';', dtype={'insee': 'object',
       'ref_document': 'object', 'ref_interne_sourcename': 'object',
       'sexe': 'object', 'profession': 'object', 'date_naissance': 'object', 'date_mariage': 'object', 'date_deces': 'object', 'lon': 'float'})
# read to parquet
    ddf.to_parquet('file_clean.csv.parquet', engine='pyarrow', schema={'insee': pa.string(),
       'ref_document': pa.string(), 'ref_interne_sourcename': pa.string(),
       'sexe': pa.string(), 'profession': pa.string(), 'date_naissance': pa.string(), 'date_mariage': pa.string(), 'date_deces': pa.string()})
    ddf = dd.read_parquet('file_clean.csv.parquet', engine='pyarrow', dtype={'insee': 'object',
       'ref_document': 'object', 'ref_interne_sourcename': 'object',
       'sexe': 'object', 'profession': 'object', 'date_naissance': 'object', 'date_mariage': 'object', 'date_deces': 'object', 'lon': 'float'})
# split dates
    ddf['date_naissance']= dd.to_datetime(ddf['date_naissance'], format='%Y%m%d', errors='coerce')
    ddf['date_mariage']= dd.to_datetime(ddf['date_mariage'], format='%Y%m%d', errors='coerce')
    ddf['date_deces']= dd.to_datetime(ddf["date_deces"], format='%Y%m%d', errors='coerce')
    ddf['jour_naissance'] = ddf['date_naissance'].dt.day
    ddf['mois_naissance'] = ddf['date_naissance'].dt.month
    ddf['annee_naissance'] = ddf['date_naissance'].dt.year
    ddf['jour_mariage'] = ddf['date_mariage'].dt.day
    ddf['mois_mariage'] = ddf['date_mariage'].dt.month
    ddf['annee_mariage'] = ddf['date_mariage'].dt.year
    ddf['jour_deces'] = ddf['date_deces'].dt.day
    ddf['mois_deces'] = ddf['date_deces'].dt.month
    ddf['annee_deces'] = ddf['date_deces'].dt.year
# drop columns
    del ddf['date_naissance']
    del ddf['date_mariage']
    del ddf['date_deces']
# create IDTAG column
    ddf['IDTAG'] = ddf['sourcename'].str[6].fillna('')
# save new part as csv
    namefile = os.path.splitext(file)[0]
    ddf.to_csv(namefile, index=False, encoding='iso-8859-1')
    for partfile in glob.glob('**/*.part'):
        os.rename(partfile, (os.path.splitext(partfile)[0]) + "_{}_part.csv".format(namefile))
# create dataframe for each part_csv file
for partfilecsv in glob.glob("**/*_part.csv", recursive=True):
    part_df = pd.read_csv(partfilecsv, encoding='iso-8859-1', dtype={'insee': 'object',
       'ref_document': 'object', 'ref_interne_sourcename': 'object',
       'sexe': 'object', 'profession': 'object', 'date_naissance': 'object', 'date_mariage': 'object', 'date_deces': 'object', 'lon': 'float', 'IDTAG': 'float'})
# get a list of columns
    cols = list(part_df)
# move the column to head of list
    cols.insert(0, cols.pop(cols.index('IDTAG')))
# reorder
    part_df = part_df.loc[:, cols]
# group by new column
    for idtag, group in part_df.groupby('IDTAG'):
        group.to_csv((os.path.splitext(partfilecsv)[0]) + '_source_{}.csv'.format(idtag), index=False)
# remove part_files
    os.remove(partfilecsv)```