How to read a csv and process rows using dask?

2.3k Views Asked by At

I want to read a 28Gb csv file and print the contents. However, my code:

import json
import sys
from datetime import datetime
from hashlib import md5

import dask.dataframe as dd
import dask.multiprocessing
import pandas as pd

from kyotocabinet import *


class IndexInKyoto:

    def hash_string(self, string):
        return md5(string.encode('utf-8')).hexdigest()

    def dbproc(self, db):
        db[self.hash_string(self.row)] = self.row

    def index_row(self, row):
        self.row = row
        DB.process(self.dbproc, "index.kch")

start_time = datetime.utcnow()
row_counter = 0
ob = IndexInKyoto()
df = dd.read_csv("/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv", blocksize=1000000)
df = df.compute(scheduler='processes')     # convert to pandas
df = df.to_dict(orient='records')
for row in df:
    ob.index_row(row)
print("Total time:")
print(datetime.utcnow-start_time)

is not working. When I run the command htop I can see dask running but there is no output whatsoever. Nor there is any index.kch file created. I rant the same thing without using dask and it was running fine; I was using Pandas streaming api (chunksize) but it was too slow and hence, I want to use dask.

1

There are 1 best solutions below

6
On BEST ANSWER
df = df.compute(scheduler='processes')     # convert to pandas

Do not do this!

You are loading the pieces in separate processes, and then transferring all the data to be stitched into a single data-frame in the main process. This will only add overhead to your processing, and create copies of the data in memory.

If all you want to do is (for some reason) print every row to the console, then you would be perfectly well using Pandas streaming CSV reader (pd.read_csv(chunksize=..)). You could run it using Dask's chunking and maybe get a speedup is you do the printing in the workers which read the data:

df = dd.read_csv(..)

# function to apply to each sub-dataframe
@dask.delayed
def print_a_block(d):
    for row in df:
        print(row)

dask.compute(*[print_a_block(d) for d in df.to_delayed()])

Note that for row in df actually gets you the columns, maybe you wanted iterrows, or maybe you actually wanted to process your data somehow.