Im very confused.... we are using multiprocessing python lib to just copy some files in parallel processes, using map.async. Obviously i am doing something wrong, because sometimes (yes... sometimes), i´m getting a "connection refused error" when doing the pool.join.
Seems like for some reason i´m trying to join the processes but the Manager is still doing its stuff and crashes.
I would be very grateful if someone knows what can be happening.
Thank you a lot in advance! .:ismael:.
And this is the callstack i´m getting:
Error: ConnectionRefusedError : [Errno 111] Connection refused (Python.Runtime.PythonException)
File "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx", line 475, in generate_copy_jobs
pool.join()
File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/pool.py", line 623, in __exit__
self.terminate()
File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/pool.py", line 548, in terminate
self._terminate()
File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/util.py", line 224, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/pool.py", line 581, in _terminate_pool
cls._help_stuff_finish(inqueue, task_handler, len(pool))
File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/pool.py", line 568, in _help_stuff_finish
inqueue._reader.recv()
File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/connection.py", line 251, in recv
return _ForkingPickler.loads(buf.getbuffer())
File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/managers.py", line 943, in RebuildProxy
return func(token, serializer, incref=incref, **kwds)
File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/managers.py", line 793, in __init__
self._incref()
File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/managers.py", line 847, in _incref
conn = self._Client(self._token.address, authkey=self._authkey)
File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/connection.py", line 492, in Client
c = SocketClient(address)
File "/opt/custom/lib/python3/lib/python3.7/multiprocessing/connection.py", line 620, in SocketClient
s.connect(address)
Here is the snippet of the code:
def copyfile(data_tuple):
copy_info, shared_data_dict = data_tuple
src, dst, size, file_id = copy_info
copy_op(src, dst)
shared_data_dict[file_id] = dict(size=size, src=src, dst=dst)
manager = Manager()
shared_dict = manager.dict()
total_amount_of_data = 0
transfer_list_parameters = []
for item in transfer_list:
total_amount_of_data += item[2]
transfer_list_parameters.append((item, shared_dict))
total_data_mb = round(total_amount_of_data / (1024 * 1024), 2)
with Pool(processes=copy_workers) as pool:
result = pool.map_async(copyfile, transfer_list_parameters, chunksize=chunk_size,
error_callback=copy_error_callback)
while not result.ready():
print_progress()
sleep(10)
pool.close()
pool.join()