Efficient Creation of Panda's DataFrame's from Parallel Queries

464 Views Asked by At

I have a script where I need to create a large data frame from querying a database. I have created a class which divides the query into separate processes dividing the Primary Key on the where clause and pushing the batches of records into a queue. The helper functions that do this work are below.

def query_generator(conn, query, batch_size, reduce_info=None):
    """Creates a generator for a query which yields fetched batches
       from a query

       inputs:
       conn --  database connection
       query -- query to run
       batch_size -- size of batches to pull

       yields:
       list of dictionaries consisting of the records"""
    print(query)
    conn.execute('DECLARE c SCROLL CURSOR FOR ' + query)
    print('Cursor Created')
    vals = True
    need_columns = reduce_info is not None
    while vals:
        res = conn.execute('FETCH {} FROM c'.format(batch_size))
        vals = res.fetchall()
        if need_columns:
            reduce_info['columns'] = res.keys()
            need_columns = False
        yield vals
    conn.execute('Close c')


def query_to_queue(query_tuple, batch_size, fetch_queue, reduce_info=None):
    """Yields batchees from the query generator and puts them in
       the multiprocessing queue until the full queries ahve run

       inputs:
       query_tuple -- tuple consisting of a connection object and a query
       batch_size -- the size of the batches which should be fetched
       fetch_queue -- multiprocessing queue to put the batches into"""
    conn, query = query_tuple
    qg = query_generator(conn, query, batch_size, reduce_info)
    while True:
        try:
            fetch_queue.put(next(qg))
        except StopIteration:
            break

def reduce_from_queue(fetch_queue, reduce_func, combine_func, reducer,status_dict, reduce_info={}, reduce_list=None, timeout=1):
    """Pulls batches out of the multiprocessing queue, to create 1 list of
       dictioanries with all of the records, then create a dataframe of all
       of the records.

       inputs:
       fetch_queue -- multiprocessing queue where workers are putting fetched records
       reduce_func -- Function for processing each batch yielded from DB
       combine_func -- Function for combining each batch to the current data
       reducer -- process number for reducer for recording the status
       status_dict -- dictionary recording the status of each reducer
       reduce_info -- kwargs needed for reduce_func
       reduce_list -- list to append the results of combine to (if none return results)
       timeout -- timeout for returning empty from fetch_queue

       outputs:
       comb_data -- only if reduce_list is None"""
    
    loop = True
    has_returned = False
    first_val = True
    while loop:
        try:
            val = fetch_queue.get(timeout=timeout)        
            if (type(val) == str) and (val == 'STOP'):
                loop = False
                break
            while 'columns' not in reduce_info:
                print('waiting')
                time.sleep(2)
            new_data = reduce_func(val, **reduce_info)
            has_returned = True
            if (first_val and new_data is not None):
                comb_data = new_data
                first_val = False
            elif (new_data is not None):
                comb_data = combine_func(new_data, comb_data)

        except Empty:
            if has_returned:
                loop = False
        except Exception as e:
            print(e)
            loop = False
            raise(e)
    if reduce_list is not None:
        reduce_list.append(comb_data)
        status_dict[reducer] = True
    else:
        status_dict[reducer] = True
        return comb_data

# Note Query Parallel is from a class, but only uses class attributes to get a connection pool.  Trying to limit this to relevant code
   def query_parallel(self, query, filter_list,  reduce_func=None, combine_func= None, final_combine_func=None, n_reducers=1, batch_size=2000, reduce_timeout=1000):
        """Run a query in parallel from multiple connections each querying
           a different partition or part of the primary key.

           inputs:
           query -- Query to run with one formatting field at the beginning of the
                  filters field for insertng the filters
           filter_list -- list of filterswhich call different partitions or divide up
                        the primary key
           batch_size --  size of each batch returned from the database

           outputs:
           df -- dataframe with the results of the query"""
        print('Starting Query Parallel')
        conn_list = self.dial_pool(len(filter_list))
        try:
            query_list = [query.format(f) for f in filter_list]
            fetch_queue = MpQueue()

            fetch_process_list = list()
            manager = Manager()
            reduce_list = manager.list()
            status_dict = manager.dict()
            reduce_info = manager.dict()
            process_dict = dict()
            for i, conn in enumerate(conn_list):
                query_tuple = (conn, query_list[i])
                arg_dict = {'query_tuple': query_tuple,
                            'batch_size': batch_size,
                            'fetch_queue': fetch_queue}
                if i == 0:
                    arg_dict['reduce_info'] = reduce_info
                p = Process(target=query_to_queue,
                            kwargs=arg_dict)
                p.start()
                fetch_process_list.append(p)
          
                    
            print('NReducers: ',n_reducers)
            print(n_reducers)
            for i in range(n_reducers):
                print('Reducer Started')
                status_dict[i] = False
                p = Process(target=reduce_from_queue,
                            kwargs={'fetch_queue': fetch_queue,
                                    'reduce_func': reduce_func,
                                    'status_dict': status_dict,
                                    'combine_func': combine_func,
                                    'reduce_list': reduce_list,
                                    'reduce_info': reduce_info,
                                    'reducer': i})
                
                p.start()
                process_dict[i] = p
            for t in fetch_process_list:
                t.join()
                print('Fetch Process Joined')
            print('All Fetch Joined')
            for i in range(n_reducers):
                fetch_queue.put('STOP')
            print('Stop Message Sent')
            for pid, p in process_dict.items():
                for i in range(reduce_timeout // 2):
                    if not p.is_alive():
                        break
                    if status_dict[pid]:
                        p.terminate()
                        break
                    time.sleep(2)
                    
                p.join(timeout=10)
            print('starting final reduce')
            def null_func(res):
                return res
            res = combine_from_list(fetch_list=reduce_list,
                                    combine_func=final_combine_func,
                                    reduce_queue=None)
        except Exception as e:
            print(e)
            traceback.print_exc()
        finally:
            for c in conn_list:
                c.close()
        return res


# Input functions used for creating a dataframe, could be changed to do inline analysis or other reduction
def records_to_df_list(records, columns):
    return([pd.DataFrame(records, columns=columns)])

def combine_df_list(comb_data, new_data):
    return(comb_data + new_data)

def concat_df_lists(comb_list):
    t = time.time()
    df_list = list()
    for sublist in comb_list:
        df_list += sublist
    print(t - time.time())
    return pd.concat(df_list)  

My current implementation combines all of the batch lists in my fetch_queue and converts them into a pandas data frame once all the data has been queried. However, this often takes just as long or longer than pulling all of the data from the db.

My original thought was to have a "reducer process" which was creating smaller dataframes from the fetch_queue during the fetch process and appending them to a list and combining with pandas concat once all the data was fetched. However, due to the exponential copying in pd.concat this is slower than just waiting for the end to make the data frame for records. (I do have an option of reducing from the batches if this is necessary)

It seems like I am missing some efficiency to be able to begin the work of processing the data frame for the data which is sitting in memory before all of the data is loaded.

Update

Running

    def query_pool(self, query, filter_list):
        """Run a query in parallel from multiple connections each querying
           a different partition or part of the primary key.

           inputs:
           query -- Query to run with one formatting field at the beginning of the
                  filters field for insertng the filters
           filter_list -- list of filters which call different partitions or divide up
                        the primary key

           outputs:
           df -- dataframe with the results of the query"""
        try:
            query_list = [query.format(f) for f in filter_list]
            p = Pool(len(query_list))
            partial_get_df = partial(get_df, connection_string = self.con_str)
            df_list = p.map(partial_get_df, query_list)
        except Exception as e:
            print(e)
            traceback.print_exc()
        finally:
            p.close()
        return pd.concat(df_list)

def get_df(query, connection_string):
    e = sa.create_engine(connection_string)
    conn = e.connect()
    df = pd.read_sql(con=conn, sql=query)
    conn.close()
    return df

Ran in 13 seconds, approximately the same time it took to pull the data into the parallel processes, making the pandas processing negligible. I am not sure how the implementation of Pool.map() passes data into the main process, but it appears to do so with much less overhead then trying to use queues.

0

There are 0 best solutions below