I have a script where I need to create a large data frame from querying a database. I have created a class which divides the query into separate processes dividing the Primary Key on the where clause and pushing the batches of records into a queue. The helper functions that do this work are below.
def query_generator(conn, query, batch_size, reduce_info=None):
"""Creates a generator for a query which yields fetched batches
from a query
inputs:
conn -- database connection
query -- query to run
batch_size -- size of batches to pull
yields:
list of dictionaries consisting of the records"""
print(query)
conn.execute('DECLARE c SCROLL CURSOR FOR ' + query)
print('Cursor Created')
vals = True
need_columns = reduce_info is not None
while vals:
res = conn.execute('FETCH {} FROM c'.format(batch_size))
vals = res.fetchall()
if need_columns:
reduce_info['columns'] = res.keys()
need_columns = False
yield vals
conn.execute('Close c')
def query_to_queue(query_tuple, batch_size, fetch_queue, reduce_info=None):
"""Yields batchees from the query generator and puts them in
the multiprocessing queue until the full queries ahve run
inputs:
query_tuple -- tuple consisting of a connection object and a query
batch_size -- the size of the batches which should be fetched
fetch_queue -- multiprocessing queue to put the batches into"""
conn, query = query_tuple
qg = query_generator(conn, query, batch_size, reduce_info)
while True:
try:
fetch_queue.put(next(qg))
except StopIteration:
break
def reduce_from_queue(fetch_queue, reduce_func, combine_func, reducer,status_dict, reduce_info={}, reduce_list=None, timeout=1):
"""Pulls batches out of the multiprocessing queue, to create 1 list of
dictioanries with all of the records, then create a dataframe of all
of the records.
inputs:
fetch_queue -- multiprocessing queue where workers are putting fetched records
reduce_func -- Function for processing each batch yielded from DB
combine_func -- Function for combining each batch to the current data
reducer -- process number for reducer for recording the status
status_dict -- dictionary recording the status of each reducer
reduce_info -- kwargs needed for reduce_func
reduce_list -- list to append the results of combine to (if none return results)
timeout -- timeout for returning empty from fetch_queue
outputs:
comb_data -- only if reduce_list is None"""
loop = True
has_returned = False
first_val = True
while loop:
try:
val = fetch_queue.get(timeout=timeout)
if (type(val) == str) and (val == 'STOP'):
loop = False
break
while 'columns' not in reduce_info:
print('waiting')
time.sleep(2)
new_data = reduce_func(val, **reduce_info)
has_returned = True
if (first_val and new_data is not None):
comb_data = new_data
first_val = False
elif (new_data is not None):
comb_data = combine_func(new_data, comb_data)
except Empty:
if has_returned:
loop = False
except Exception as e:
print(e)
loop = False
raise(e)
if reduce_list is not None:
reduce_list.append(comb_data)
status_dict[reducer] = True
else:
status_dict[reducer] = True
return comb_data
# Note Query Parallel is from a class, but only uses class attributes to get a connection pool. Trying to limit this to relevant code
def query_parallel(self, query, filter_list, reduce_func=None, combine_func= None, final_combine_func=None, n_reducers=1, batch_size=2000, reduce_timeout=1000):
"""Run a query in parallel from multiple connections each querying
a different partition or part of the primary key.
inputs:
query -- Query to run with one formatting field at the beginning of the
filters field for insertng the filters
filter_list -- list of filterswhich call different partitions or divide up
the primary key
batch_size -- size of each batch returned from the database
outputs:
df -- dataframe with the results of the query"""
print('Starting Query Parallel')
conn_list = self.dial_pool(len(filter_list))
try:
query_list = [query.format(f) for f in filter_list]
fetch_queue = MpQueue()
fetch_process_list = list()
manager = Manager()
reduce_list = manager.list()
status_dict = manager.dict()
reduce_info = manager.dict()
process_dict = dict()
for i, conn in enumerate(conn_list):
query_tuple = (conn, query_list[i])
arg_dict = {'query_tuple': query_tuple,
'batch_size': batch_size,
'fetch_queue': fetch_queue}
if i == 0:
arg_dict['reduce_info'] = reduce_info
p = Process(target=query_to_queue,
kwargs=arg_dict)
p.start()
fetch_process_list.append(p)
print('NReducers: ',n_reducers)
print(n_reducers)
for i in range(n_reducers):
print('Reducer Started')
status_dict[i] = False
p = Process(target=reduce_from_queue,
kwargs={'fetch_queue': fetch_queue,
'reduce_func': reduce_func,
'status_dict': status_dict,
'combine_func': combine_func,
'reduce_list': reduce_list,
'reduce_info': reduce_info,
'reducer': i})
p.start()
process_dict[i] = p
for t in fetch_process_list:
t.join()
print('Fetch Process Joined')
print('All Fetch Joined')
for i in range(n_reducers):
fetch_queue.put('STOP')
print('Stop Message Sent')
for pid, p in process_dict.items():
for i in range(reduce_timeout // 2):
if not p.is_alive():
break
if status_dict[pid]:
p.terminate()
break
time.sleep(2)
p.join(timeout=10)
print('starting final reduce')
def null_func(res):
return res
res = combine_from_list(fetch_list=reduce_list,
combine_func=final_combine_func,
reduce_queue=None)
except Exception as e:
print(e)
traceback.print_exc()
finally:
for c in conn_list:
c.close()
return res
# Input functions used for creating a dataframe, could be changed to do inline analysis or other reduction
def records_to_df_list(records, columns):
return([pd.DataFrame(records, columns=columns)])
def combine_df_list(comb_data, new_data):
return(comb_data + new_data)
def concat_df_lists(comb_list):
t = time.time()
df_list = list()
for sublist in comb_list:
df_list += sublist
print(t - time.time())
return pd.concat(df_list)
My current implementation combines all of the batch lists in my fetch_queue and converts them into a pandas data frame once all the data has been queried. However, this often takes just as long or longer than pulling all of the data from the db.
My original thought was to have a "reducer process" which was creating smaller dataframes from the fetch_queue during the fetch process and appending them to a list and combining with pandas concat once all the data was fetched. However, due to the exponential copying in pd.concat this is slower than just waiting for the end to make the data frame for records. (I do have an option of reducing from the batches if this is necessary)
It seems like I am missing some efficiency to be able to begin the work of processing the data frame for the data which is sitting in memory before all of the data is loaded.
Update
Running
def query_pool(self, query, filter_list):
"""Run a query in parallel from multiple connections each querying
a different partition or part of the primary key.
inputs:
query -- Query to run with one formatting field at the beginning of the
filters field for insertng the filters
filter_list -- list of filters which call different partitions or divide up
the primary key
outputs:
df -- dataframe with the results of the query"""
try:
query_list = [query.format(f) for f in filter_list]
p = Pool(len(query_list))
partial_get_df = partial(get_df, connection_string = self.con_str)
df_list = p.map(partial_get_df, query_list)
except Exception as e:
print(e)
traceback.print_exc()
finally:
p.close()
return pd.concat(df_list)
def get_df(query, connection_string):
e = sa.create_engine(connection_string)
conn = e.connect()
df = pd.read_sql(con=conn, sql=query)
conn.close()
return df
Ran in 13 seconds, approximately the same time it took to pull the data into the parallel processes, making the pandas processing negligible. I am not sure how the implementation of Pool.map() passes data into the main process, but it appears to do so with much less overhead then trying to use queues.