How to have parallelization in Key-Value Databases?

289 Views Asked by At

My intent is to version large csv files and hence, I am using key-value databases where key will be the column(s) from a complete row and value will be the row itself. For eg:

Name, Age, Roll.No
Aviral, 22, 1
Apoorv, 19, 2

If I put Roll no as the key, my intent is to have key in the DB as rollno(probably its hash) and value as the complete row: Aviral, 22, 1

I have completed the above implementation but in order to work on large csv files(even 20gb with 534M rows), speed is too slow. I am implementing dask but its slower than normal pandas sequential streaming. My doubt is, how can I have parallel inserts in a key-value database?

import json
import sys
from datetime import datetime
from hashlib import md5

import dask.dataframe as dd
import dask.multiprocessing
import pandas as pd

from kyotocabinet import *

class IndexInKyoto:

    def hash_string(self, string):
        return md5(string.encode('utf-8')).hexdigest()

    def dbproc(self, db):
        db[self.hash_string(self.key)] = self.row

    def index_row(self, key, row):
        self.row = row
        self.key = key
        DB.process(self.dbproc, "index.kch")

# destination = "/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv"
destination = "10M_rows.csv"
df = dd.read_csv(destination)
df_for_file_attributes = pd.read_csv(destination, nrows=2)
column_list = list(df_for_file_attributes)

# df = df.compute(scheduler='processes')     # convert to pandas

start_time = datetime.utcnow()
row_counter = 0
ob = IndexInKyoto()

# function to apply to each sub-dataframe
def print_a_block(d):
    #for row in d.itertuples(index=False):
    # print(row)
    print("a block called!")
    d = d.to_dict(orient='records')
    for row in d:
        key = str(row["0"])
        row = json.dumps(row, default=str)
        ob.index_row(key, row)

print("Calling compute!")
dask.compute(*[print_a_block(d) for d in df.to_delayed()])
print(datetime.utcnow() - start_time)

There are 1 best solutions below


Kyotocabinet doesn't allow you to parallelize inserts (, Each Writer will block till another Writer completes so you can't parallelize inserts in kyotocabinet, but Redis will allow such insert, to optimize further use Redis pipelining ( which will batch your data and reduce the RTT to a great extent while loading huge data.

The Reason why your task is running slower than sequential processing is overhead of managing multiprocess writing a DB sequentially.