I read a lot of posts about parallelization using the multiprocessing
module but none of them quite answered my question.
I have a very long generator giving me parameter values and for each I want to compute some function value. However, I only want to save the best n
many, since I am only interested in the best ones and saving all of the results would blow up the RAM.
There way I see it, there are two ways to do this: 1) use a common shared memory between the processes where the best values are saved or 2) keep separate lists of the best results for each core/process and later manually merge these lists together.
I think the second method would be better, however I am not sure how to implement this. This is what I got so far:
import numpy as np
import multiprocessing
from functools import partial
def get_generator(length: int):
for i in range(length):
yield [i, i + 1]
def some_func(x, other_stuff):
y = np.sum(x)
return y
def task(other_stuff, x: np.ndarray):
val = some_func(x, other_stuff)
if val > task.some_dict['min']:
task.l.append(val)
task.some_dict['min'] = val
return
def task_init(l, some_dict):
task.l = l
task.some_dict = some_dict
task.some_dict['min'] = np.NINF
n = 20
generator = get_generator(n)
other_stuff = np.nan
func = partial(task, other_stuff)
l = multiprocessing.Manager().list()
some_dict = multiprocessing.Manager().dict()
p = multiprocessing.Pool(None, task_init, [l, some_dict])
p.imap(func, generator, chunksize=10000)
p.close()
p.join()
This would be somewhat similar to what I want to do. But I really care about performance and in the actual code the comparison/saving of the best values will be more complex so I think that the shared memory approach would be really slow.
My question boils down to: If I have e.g. 8 cores, how could I have 8 lists of the best results each for one core that will be returned, so that the cores work completely independent and rather quick?
Thank you very much!
These are my comments put into action. I hope your actual task is a more complicated computation or it would be hardly worth using multiprocessing.
Prints:
Update
I found it best with the following experiment to allocate to the pool a number of processes equal to
mp.cpu_count() - 1
to leave the main process a free proceesor to handle the results returned by the workers. I also experimented with thechunksize
parameter:On my desktop (running other processes, such as streaming music), the above code did better with assigning
mp.cpu_count() - 1
tocpu_count
(2.4 seconds vs, 2.5 seconds). Here are other timings (rounded to one decimal place):The result for a chunksize value of 1000 is a bit of an anomaly. I would suggest trying different values, otherwise
N // (mp.cpu_count() - 1)
. This is assuming you can computeN
, the number of items in the iterable. When you have a generator as the iterable, you would have to, in the general case, convert it first to a list, to be able to get its length. Even achunksize
value of 1 in this particular benchmark did not do that much worse. But this is what I have learned from varying the amount of workworker_process
has to do:The more work (i.e. CPU) your worker process has to do to complete its task, the less sensitive it is to the
chunksize
parameter. If it returns after using very little CPU, then the overhead of transferring the next chunk becomes significant and you want to keep the number of chunk transfers to a small value (i.e. you want a largechunksize
value). But if the process is long running, the overhead of transferring the next chunk will not be as impactful.In the following code the worker process's CPU requirements are trivial:
The timings:
In the following code the worker process's CPU requirements are more substantial:
The timings:
Update 2
According to Python multiprocessing: understanding logic behind
chunksize
, when methodsmap
,starmap
ormap_async
are called withchunksize=None
there is a specific algorithm used to compute achunksize
, which I have used in the code below. I don't know why the default value for methodsimap
andimap_unordered
is 1 and does not use this same algorithm. Perhaps because that wouldn't be "lazy" as implied by the description of these methods. In the following code, which repeats the previous benchmark, I use a redefinition of the same algorithm for computing the defaultchunksize
:Timings: