i have multiprocessing script with "pool.imap_unordered".
I ran into a problem when the script got stuck but i check CPU usage — nothing happening (by "top" command on ubuntu).
Going into the screen session, I see a process stuck on execution.
As I understand it, I do not use the problematic fork() method.
Also when i am running script on small amount of returning data freezes do not occur (But they occur on relatively small amounts - table < 5 MB in csv).
Can someone suggest what exactly can help? semaphore, lock or something else... I tried more processes with less data - didn't help. it may be easier to change to python parallel...
import multiprocessing
def my_func(df):
# modify df here
# df = df.head(1)
return df
if __name__ == "__main__":
df = pd.DataFrame({'a': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})
with multiprocessing.Pool(processes = (multiprocessing.cpu_count() - 1)) as pool:
groups = (g for _, g in df.groupby("a"))
print(df)
print(groups)
out = []
for res in pool.imap_unordered(my_func, groups):
out.append(res)
final_df = pd.concat(out)
also i tried
import multiprocessing
def my_func(df):
# modify df here
# df = df.head(1)
return df
if __name__ == "__main__":
df = pd.DataFrame({'a': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})
with multiprocessing.get_context("spawn").Pool(processes = (multiprocessing.cpu_count() - 1)) as pool:
groups = (g for _, g in df.groupby("a"))
print(df)
print(groups)
out = []
for res in pool.imap_unordered(my_func, groups):
out.append(res)
final_df = pd.concat(out)
You say your code doesn't work for big data. I don't have much to go on by that description but I will work on the assumption that lack of memory is the cause of your problem. If so, how large is your
dataframeand how much memory do you have for running user application? There may be no solution other than getting more memory. But before you do that, we can ask: What can be done to limit memory utilization in your current processing? Will it be enough so that the program now runs? The following is a rather lengthy analysis of the situation and I propose a couple of changes you could make that might help assuming my analysis is correct.One of the issues using method
imap_unorderedis that you have little control over how many tasks initially get submitted to the pool's task queue and continue to be submitted as tasks complete. If you had (1) a largedataframeand (2) a large number of tasks that need to be submitted and (3) the entiredataframeis being passed to your worker function for each task, then thatdataframewould initially be replicated on the task queue a number of time for which you have very little control and you could quickly run out of memory. The solution then would be to somehow limit how many tasks can be sitting on the task queue waiting to be processed at any point in time.But that is not exactly your situation because you are not passing to
my_functhe entiredfdataframefor each submitted task but rather a subset. Even if all subsets created by thegroupbymethod were sitting on the task queue together, the storage requirements of the task queue would approximately equal to the size of the entiredataframe. Then as these groups are processed and a results returned, the storage taken up by the task queue would be decreasing as the storage required for youroutlist would be increasing. The total storage requirements would probably not be changing all that much as the tasks are being processed. That is, the task queue will be decreasing in size asoutgrows in size. But when you createfinal_dfyou simultaneously have storage requirements fordf,out, andfinal_df. So you currently need to be able to hold in memory essentially 3 instances of yourdataframe.The simplest thing you can do is to delete the initial
dfdataframebefore you perform the concatenation of youroutlist. Will this resolve your memory problem? I don't know but now we only need enough memory to hold two copies of your largedataframe.The additional thing we could do is to control the rate at which tasks are queued up and thus limit the storage required by the task queue. The code below shows how we can do this by replacing method
imap_unorderedwithapply_asyncspecifying a callback. We use amultiprocessing.BoundedSemaphoreinitialized toNwhereNis the maximum number of tasks we want queued up at any time. Beforeapply_asynccan be called to submit the next task, the main process must firstacquirethe semaphore. It will be able to do thisNtimes without blocking. As tasks complete our callback function does areleaseon the semaphore allowing a new task to be submitted. But even with this change you would still need adequate storage to be able to hold two copies of yourdataframeand this change will probably not help. But if your actualmy_funcis returning something smaller than the passeddf, then this should help. If your code now works, you can simply comment out the callssemaphore.acquire()semaphore.release()to remove this change and see if it still continues to work.Prints: