Python pathos.multiprocessing ProcessingPool. Why the pools exchange information?

1.6k Views Asked by At

Intro

I have a large amount of data on which specific calculations need to be performed. Unfortunately if I run the calculations one after another the whole process would take a day. This is why I use pathos.multiprocessing ProcessingPool on a machine with 32 CPUs. In that way the process takes about 30 minutes.

Issue

The expected behaviour is all calculations to run in parallel and to be completely independent from each other. What I noticed is that this is true for a small number of calculations (e.g. 40) but if I increase the number to 90 the data gets mixed. It seems like the processes "communicate" with each other...in other words somehow they have access to the same variables.

Question

Any idea what is going on?

Helpful link: https://pathos.readthedocs.io/en/latest/pathos.html

Code

The code below is a simplified version of what I have on my machine but the idea is the same. So, here is the code:

from pathos.multiprocessing import ProcessingPool, cpu_count

class TestMultiprocessing:
    def __init__(self):
        self.number = 0

    def do_something(self, args):
        for arg in args:
            self.number += arg
        return self.number

    def run(self):
        # Generate a big list
        l = []
        for i in range(0, 100):
            l.append([1,2,3])

        pool = ProcessingPool(cpu_count())
        results = pool.imap(self.do_something, l)
        pool.close()
        pool.join()
        pool.clear()
        results = list(results)

        for result in results:
            print(result)

tm = TestMultiprocessing()
tm.run()

Update

I can't show example code because it is confidential but I can give a short explanation about how the code works.

There is a CSV file containing a few thousand lines of data (~80 000 lines). The code I wrote has to run 800 tasks each performing specific calculations taking the data from the CSV file.

The machine has N CPUs and therefore Pathos breaks down all 800 tasks to M groups each containing N tasks (If the number of CPUs is even M * N = 800). Pathos creates N pools performing the required calculations in parallel. When the calculations complete each pool generates a list of dictionaries and continue with the next group of tasks - repeating that process M times.

Each dictionary from the list contains a unique key corresponding to the number of the task which can be from 1 to 800. In that way I can check if Pool X generates dictionaries related to task X (e.g. 103). The expectation is the results generated by Pool X to contain only key X.

It is important to mention that the above process consumes most of the memory of the machine!

After detailed observation of the generated results I noticed the following. If I run N tasks (e.g. 4 tasks corresponding to a machine with 4 CPUs), each list of dictionaries contains keys corresponding to the particular task. For instance, pool X generates a list of dictionaries with keys X.

BUT if I run more than N tasks, dictionaries corresponding to a task greater than N, contain data from previous tasks. For instance, pool Y generates a list of dictionaries with keys Y, X and so on.

My conclusion is that pool X does not clear the data (memory) from the previous calculations and the next task inherit the data from the previous one.

Solution

Even though I use pool.clear() that didn't help me to resolve the issue. The temporary solution I found is the following.

I break down "manually" the tasks to groups.

import numpy as np
import math

# Get the number of CPUs
cpus = cpu_count()

# Check how many groups of tasks we have to run
chunks = math.ceil(len(l)/cpus)

# Break down the list of tasks to chunks/groups
l_chunks = np.array_split(l, chunks)

# Loop the chunks
for l_chunk in l_chunks:
    pool = ProcessingPool(cpus)
    results = pool.imap(self.do_something, l_chunk.tolist())
    pool.close()
    pool.join()
    pool.clear()
    results = list(results)
    
    for result in results:
        print(result)

That fixed the issue for me.

0

There are 0 best solutions below