Repeatedly process big list of images with changing parameters using multiple cores in python

83 Views Asked by At

I have a big list of images list_img, say 20k that I need to process multiple times with changing arguments out of a list params = [arg1, arg2, ...]. Ideally, I want to use multiple processes to do so. But I need all processes to first use arg1 and then arg2 on chunks of my list list_img. The processing time for each arg in params varies greatly. So if I would distribute the list params over my processes instead of the list of images (core 1: arg1, core 2: arg2, ...) it happens that after a while most of the processes are idle (finished) while very few are still crunching data.

My current (working) solution looks like that:

from multiprocessing import Pool
import numpy as np

def calc_image(argument, image):
    val = argument * image    # not the real process, just demo
    return val

if __name__ == "__main__":
    pool = Pool(processes=8)
    list_img = [np.ones((100, 100))] * 20000    # for demo only
    params = list(range(100))    # for demo only
    for par in params:
        par_list = [par] * len(list_img)
        return_vals = pool.starmap(calc_image, zip(par_list, list_img))
    pool.close()

How can I avoid to copy the list list_img every time the variable par changes in the for-loop? I also would like to avoid using global variables, if possible.

2

There are 2 best solutions below

2
RaJa On BEST ANSWER

This is my current workaround for the problem. I'm still interested in a better solution - maybe more elegant.

I've switched from using Poolto a collection of Process:

from multiprocessing import Queue, Process
import numpy as np

def process_image(list_images, queue_in, queue_out):
    for arg in iter(queue_in.get, "STOP"):
        processed_images = []
        for img in list_images:
            result = arg * img
            processed_images.append(result)
        queue_out.put(processed_images)

if __name__ == "__main__":
    list_img = [np.ones((100, 100))] * 20000    # for demo only
    splits = np.split(list_img, 4)   # split into 4 chunks
    my_pool = []
    queue_in = Queue()
    queue_out = Queue()
    # starting a bunch of process, each owning a part of the list of images
    # so list is only copied once
    for n in range(4):
        proc = Process(target=process_image, args=(splits[n], queue_in, queue_out))
        proc.start()
        my_pool.append(proc)
    params = list(range(100))    # for demo only
    for par in params:
        for n in my_pool:
            queue_in.put(par)    # each process gets the same element and starts crunching
        return_vals = []
        for n in my_pool:
            return_vals.append(queue_out.get(block=True)) # wait for results
    for element in my_pool:
        creature_tasks.put("STOP")   # indicate processes to close
    for element in pool:
        element.join()

The trick is that I only copy the list of images only once during the creation of the processes. Each worker gets its own sub-list of the total list during initialization which has been split before. Later I provide the argument that should be used to process the images in a small loop. As the processes are blocking until queue_in contains elements, I just have to provide the respective argument exactly the same times as I have processes. This way the images are not copied again.

Copying the results back (from the processes to the main-process) can't be avoided.

1
VeeyesPlus On

Try using "Partial Functions" from functools module. https://docs.python.org/3/library/functools.html#functools.partial

Create a new function for example, "partial_calc_image" and pass partial() function on *args which in this case is "calc_image" function and **keywords arg which is image list "list_img"

refer to below answer: https://stackoverflow.com/a/72524256/22072290

Furthermore, starmap() can be passed on "partial_calc_image".

return_vals = pool.starmap(partial_calc_image, zip(par_list, list_img))