What makes Python Multiprocessing raise different errors when sharing objects between processes?

137 Views Asked by At

Context: I want to create attributes of an object class in parallel by distributing them in the available cores. This question was answered in this post here by using the python Multiprocessing Pool.

The MRE for my task is the following using Pyomo 6.4.1v:

from pyomo.environ import *
import os
import multiprocessing
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, NamespaceProxy
import types


class ObjProxy(NamespaceProxy):
    """Returns a proxy instance for any user defined data-type. The proxy instance will have the namespace and
    functions of the data-type (except private/protected callables/attributes). Furthermore, the proxy will be
    pickable and can its state can be shared among different processes. """

    def __getattr__(self, name):
        result = super().__getattr__(name)
        if isinstance(result, types.MethodType):
            def wrapper(*args, **kwargs):
                return self._callmethod(name, args, kwargs)
            return wrapper
        return result

@classmethod
def create(cls, *args, **kwargs):
    # Register class
    class_str = cls.__name__
    BaseManager.register(class_str, cls, ObjProxy, exposed=tuple(dir(cls)))

    # Start a manager process
    manager = BaseManager()
    manager.start()

    # Create and return this proxy instance. Using this proxy allows sharing of state between processes.
    inst = eval("manager.{}(*args, **kwargs)".format(class_str))
    return inst

ConcreteModel.create = create

class A:
    def __init__(self):
        self.model = ConcreteModel.create()

    def do_something(self, var):
        if var == 'var1':
            self.model.var1 = var
        elif var == 'var2':
            self.model.var2 = var
        else:
            print('other var.')

    def do_something2(self, model, var_name, var_init):        
        model.add_component(var_name,var_init)

    def init_var(self):
        print('Sequentially')
        self.do_something('var1')
        self.do_something('test')
        
        print(self.model.var1)
        print(vars(self.model).keys())

        
        # Trying to create the attributes in parallel
        print('\nParallel')
        self.__sets_list = [(self.model,'time',Set(initialize = [x for x in range(1,13)])),
                            (self.model,'customers',Set(initialize = ['c1','c2','c3'])),
                            (self.model,'finish_bulks',Set(initialize = ['b1','b2','b3','b4'])),
                            (self.model,'fermentation_types',Set(initialize = ['ft1','ft2','ft3','ft4'])),
                            (self.model,'fermenters',Set(initialize = ['f1','f2','f3'])),
                            (self.model,'ferm_plants',Set(initialize = ['fp1','fp2','fp3','fp4'])),
                            (self.model,'plants',Set(initialize = ['p1','p2','p3','p4','p5'])),
                            (self.model,'gran_plants',Set(initialize = ['gp1','gp2','gp3','gp4','gp4']))]

        with Pool(7) as pool:
            pool.starmap(self.do_something2,self.__sets_list)
        
        self.model.time.pprint()
        self.model.customers.pprint()

def main(): # The main part run from another file
    obj = A()

    obj.init_var()
    
    # Call other methods to create other attributes and the solver step.
    # The other methods are similar to do_something2() just changing the var_init to Var() and Constraint().


if __name__ == '__main__':
    multiprocessing.set_start_method("spawn")

    main = main()

  • Ouput
Sequentially
other var.
var1
dict_keys(['_tls', '_idset', '_token', '_id', '_manager', '_serializer', '_Client', '_owned_by_manager', '_authkey', '_close'])

Parallel
WARNING: Element gp4 already exists in Set gran_plants; no action taken
time : Size=1, Index=None, Ordered=Insertion
    Key  : Dimen : Domain : Size : Members
    None :     1 :    Any :   12 : {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}
customers : Size=1, Index=None, Ordered=Insertion
    Key  : Dimen : Domain : Size : Members
    None :     1 :    Any :    3 : {'c1', 'c2', 'c3'}

I change the number of parallel processes for testing, but it raises different errors, and other times it runs without errors. This is confusing for me, and I did not figure out what is the problem behind it. I did not find another post that had a similar problem, but I saw some posts discussing that pickle does not handle large data. So, the errors that sometimes I gotcha are the following:

  • Error 1

    Unserializable message: Traceback (most recent call last):
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/managers.py", line 300, in serve_client
      send(msg)
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/connection.py", line 211, in send
      self._send_bytes(_ForkingPickler.dumps(obj))
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
      cls(buf, protocol).dump(obj)
    SystemError: <method 'dump' of '_pickle.Pickler' objects> returned NULL without setting an error
    
  • Error 2

    Unserializable message: Traceback (most recent call last):
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/managers.py", line 300, in serve_client
      send(msg)
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/connection.py", line 211, in send
      self._send_bytes(_ForkingPickler.dumps(obj))
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
      cls(buf, protocol).dump(obj)
    RuntimeError: dictionary changed size during iteration
    
  • Error 3

    *** Reference count error detected: an attempt was made to deallocate the type 32727 (? ***
    *** Reference count error detected: an attempt was made to deallocate the type 32727 (? ***
    *** Reference count error detected: an attempt was made to deallocate the type 32727 (? ***
    
    Unserializable message: Traceback (most recent call last):
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/managers.py", line 300, in serve_client
      send(msg)
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/connection.py", line 211, in send
      self._send_bytes(_ForkingPickler.dumps(obj))
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
      cls(buf, protocol).dump(obj)
    numpy.core._exceptions._ArrayMemoryError: <unprintble MemoryError object>
    
  • Error 4

    Unserializable message: Traceback (most recent call last):
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/managers.py", line 300, in serve_client
      send(msg)
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/connection.py", line 211, in send
      self._send_bytes(_ForkingPickler.dumps(obj))
    File "/home/.../anaconda3/envs/.../lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
      cls(buf, protocol).dump(obj)
    AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'
    

So, there are different errors, and it looks like it is not stable. I hope that someone has had and solved this problem. Furthermore, if someone has implemented other strategies for this task, please, feel free to post your answer in this issue here

Tkx.

0

There are 0 best solutions below