asynchronous processing of data but sequential file save in multiprocessing

75 Views Asked by At

I'm processing really large log file - e.g. 300 GB and I have a script which chunk reads the file and asynchronously process the data (need to read some key:values from it) in pool of processes and save it to a parquet file.

def process_line(line: bytes):
    """Parse values with regex from the line."""
    data ....
    return data


def process_and_save(chunk: list[bytes], lock: multiprocessing.Lock):
    """Process chunk of lines and save the result to parquet file."""
    result = [process_line(line) for line in chunk]
    lock.acquire()
    try:
        write(.....)
    finally:
        lock.release()
    return


def main():
    """Process log file, parse data, save to parquet file."""
    # create a pool of processes
    mamanager = multiprocessing.Manager()
    # to asure saving to parquet happens only one at a time
    lock = mamanager.Lock()
    pool = multiprocessing.Pool(processes=NUM_PROCESSES)
    # delete file if exists
    if os.path.exists(PARQUET_FILE_PATH):
        os.remove(PARQUET_FILE_PATH)
    with lzma.open(LOG_FILE_PATH, 'rb') as file:
        while True:
            # readlines wont cut the rows in half!
            chunk = file.readlines(BYTES_PER_CHUNK)
            if not chunk:
                break
            pool.apply_async(process_and_save, (chunk, lock))

    # close the pool of processes
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

The problem is the blocks of processed data sometimes skips each other and are not saved sequentiall and that is what I need to solve - to process the data asynchronously but ensure that they are saved to the file in order they have been read.

If there is other better approach to effectively do such task, I'm open to it. Thanks.

1

There are 1 best solutions below

2
SIGHUP On

Looks like you need to pseudo-randomly process the chunks you've read from the log file.

You could build a stack (of finite size) then randomly select items from that stack for processing.

Something like this:

from multiprocessing import Pool, Manager
from os import remove
from random import randrange

RSTACK = 5
PARQUET_FILE_PATH = 'foo.parquet'
LOG_FILE_PATH = 'foo.log'
NUM_PROCESSES = 10
BYTES_PER_CHUNK = 16*1024

def write_parquet(data, lock):
    with lock:
        ... # process the data and write the parquet file here

def main():
    try:
        remove(PARQUET_FILE_PATH)
    except FileNotFoundError:
        ...

    with Manager() as manager:
        lock = manager.Lock()
        try:
            pool = Pool(NUM_PROCESSES)
            with open(LOG_FILE_PATH, 'rb') as log:
                stack = []
                while chunk := log.readlines(BYTES_PER_CHUNK):
                    # check if stack is full
                    if len(stack) == RSTACK:
                        # stack is full so pop an item at random for processing
                        ba = stack.pop(randrange(0, RSTACK))
                        pool.apply_async(write_parquet, (ba, lock))
                    # append chunk to the stack
                    stack.append(chunk)
                # drain the stack
                while stack:
                    ba = stack.pop(randrange(0, len(stack)))
                    pool.apply_async(write_parquet, (ba, lock))
        finally:
            # wait for all the async subprocesses to complete
            pool.close()
            pool.join()

if __name__ == '__main__':
    main()