Repeated Calls to ProcessPoolExecutor context get slower (Python)

321 Views Asked by At

I am developing a MCTS algorithm and I am attempting to parallelize some of the work by rolling out multiple leaves in parallel. After doing a batch of rollouts I then want to go back and add my results to the tree ( undo my virtual losses) before selecting another batch of leaves to rollout. This is working fine except for speed - I am finding successive loops around a ProcessPoolExecutor context get slower. The code section is as follows:

for _ in range(8):
   tick = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
       value_estimates = executor.map(NeuralNet.evaluate, leaves, chunksize=round(batch_size/8)+1)
   tock = time.time()         

I then get the following timing results.

Took 2.3520002365112305 sec to run 500 times

Took 2.5691237449645996 sec to run 500 times

Took 2.8875749111175537 sec to run 500 times

Took 3.2885916233062744 sec to run 500 times

Took 3.43363618850708 sec to run 500 times

Took 3.6769683361053467 sec to run 500 times

Took 3.948704719543457 sec to run 500 times

Took 4.299146890640259 sec to run 500 times

The same pattern occurs for larger loops the time taken just continues to grow and grow.

What is the reason for this? Is there any way of getting each loop to perform at the same pace?

Thanks in Advance

1

There are 1 best solutions below

0
On

I will suggest to use the library multiprocessing. Perhaps could be interesting.

import numpy as np from concurrent.futures._base import as_completed import concurrent.futures import time from multiprocessing import Pool, cpu_count

data = np.random.randint(0, 1000000, 100000000) N = 10000



def label_data(i):
    print(f'Started executing iteration {i} for data of length {len(data)}')
    #print(f'Started executing iteration {i} for data of length ')

t0= time.clock() futures = {} with concurrent.futures.ProcessPoolExecutor() as executor:
    for i in range(N):
        executor.submit(label_data, i)

for future in as_completed(futures):
    iteration = futures[future] t1 = time.clock() - t0 print("Time elapsed: ", t1) # CPU seconds elapsed (floating point) time_concurrent
= t1 """ multiprocessing.cpu_count()-1  """


t0= time.clock() with Pool(cpu_count-1) as p:
    p.map(label_data, [i for i in range(0,N)]) t1 = time.clock() - t0 print("Time elapsed: ", t1) # CPU seconds elapsed (floating point) time_multiprocessing = t1

print("------------------------------") print("Concurrent library time: ", time_concurrent) print("Multiprocessing library time: ", time_multiprocessing)

Here there are some of the results in that example

Concurrent library time:  4.678165
Multiprocessing librar

y time: 0.017128999999999728