Python progress bar in conjunction with fork processes

34 Views Asked by At

I'm trying to incorporate a progress bar into a piece of code that parallelizes operations using os.fork. I have tried with a rich progress bar but due to the way progress updates are handled across processes it is not working, i.e. the progress bar update within the child process, is not reflected in the parent process.

My code looks something like this:

from rich.progress import Progress
import os
with Progress() as progress:
    task = progress.add_task(total=len(args_list))
    for balanced_batch in even_batches_it:
        if os.fork():
            # do something
        else:
            try:
                for my_tuple in balanced_batch:
                    do_something(my_tuple)
                progress.advance(task, advance=len(balanced_genes_batch))
            except Exception as exception:
              # do something

Any feedback/advice is highly appreciated!

2

There are 2 best solutions below

0
5rod On

In my experience, it's best to just create a python file specifically dedicated for doing this task. It makes things a lot easier whenever I want to add a progress bar to my code, since threading doesn't really work.

I created some code so that when you input a function to it, it runs the function alongside a progress bar:

from rich.progress import Progress
import time
import inspect

def run(func, sleep=0):
    update = lambda progress, task: progress.update(task, advance=1)

    function = inspect.getsource(func)
    mylist = function.split('\n')
    mylist.remove(mylist[0])
    length = len(mylist)
    for num, line in enumerate(mylist):
        mylist[num] = line[8:] + "\nupdate(progress, task)\ntime.sleep(sleep)" #update the progress bar

    myexec = ''
    for line in mylist:
        myexec += line + '\n'

    with Progress() as progress:
        task = progress.add_task("Working...", total=length)
        exec(myexec)

if __name__ == '__main__':
    def myfunc():
        #chose to import these modules for progress bar example since they take a LONG time to import on my computer
        import pandas
        import openai

    run(myfunc)

This code is a little inefficient since it uses exec(). However, it is very powerful! By adding update(progress, task) to each line at the end of the function you want to run, the progress bar updates each line of the function. Here's how you would implement your code:

Save this example code as a file, named whatever (ex progress_bar.py). Now import the function run() from the python file. And now you can run your code:

from progress_bar import run
import os

def myfunc():
    for balanced_batch in even_batches_it:
        if os.fork():
            # do something
        else:
            try:
                for my_tuple in balanced_batch:
                    do_something(my_tuple)
                progress.advance(task, advance=len(balanced_genes_batch))
            except Exception as exception:
              # do something

run(myfunc) #runs this function so the progress bar will update alongside it, updating every line of code in the function

Last thing to note is that your code is actually structured for a loop, so you don't need my function to run it (as per the documentation), but this code may be more useful to you in the future, given that your current progress bar isn't working properly.

0
aviso On

With anything that is outputting to the terminal in a stateful way, you should only have a single process handling it. This means you need to relay the progress to the main process and let it handle the progress bar.

There is an example of this using queues in the Enlighten GitHub repo. However, it doesn't use fork(). It uses multiprocessing.