Why python multiprocessing pool.join is raising a "[Errno 111] Connection refused (Python.Runtime.PythonException)"?

70 Views Asked by At

Im very confused.... we are using multiprocessing python lib to just copy some files in parallel processes, using map.async. Obviously i am doing something wrong, because sometimes (yes... sometimes), i´m getting a "connection refused error" when doing the pool.join.

Seems like for some reason i´m trying to join the processes but the Manager is still doing its stuff and crashes.

I would be very grateful if someone knows what can be happening.

Thank you a lot in advance! .:ismael:.

And this is the callstack i´m getting:

Error: ConnectionRefusedError : [Errno 111] Connection refused (Python.Runtime.PythonException)
File "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", line 475, in generate_copy_jobs
    pool.join()
  File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/pool.py", line 623, in __exit__
    self.terminate()
  File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/pool.py", line 548, in terminate
    self._terminate()
  File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/util.py", line 224, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/pool.py", line 581, in _terminate_pool
    cls._help_stuff_finish(inqueue, task_handler, len(pool))
  File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/pool.py", line 568, in _help_stuff_finish
    inqueue._reader.recv()
  File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/managers.py", line 943, in RebuildProxy
    return func(token, serializer, incref=incref, **kwds)
  File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/managers.py", line 793, in __init__
    self._incref()
  File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/managers.py", line 847, in _incref
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/connection.py", line 492, in Client
    c = SocketClient(address)
  File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/connection.py", line 620, in SocketClient
    s.connect(address)

Here is the snippet of the code:

def copyfile(data_tuple):
 
    copy_info, shared_data_dict = data_tuple
    src, dst, size, file_id = copy_info
    copy_op(src, dst)
 
    shared_data_dict[file_id] = dict(size=size, src=src, dst=dst)
 
 
manager = Manager()
shared_dict = manager.dict()
total_amount_of_data = 0
transfer_list_parameters = []
for item in transfer_list:
    total_amount_of_data += item[2]
    transfer_list_parameters.append((item, shared_dict))
total_data_mb = round(total_amount_of_data / (1024 * 1024), 2)
with Pool(processes=copy_workers) as pool:
    result = pool.map_async(copyfile, transfer_list_parameters, chunksize=chunk_size,
                            error_callback=copy_error_callback)
    while not result.ready():
        print_progress()
 
        sleep(10)
 
    pool.close()
    pool.join()
0

There are 0 best solutions below