How to wait for all tasks from ProcessPoolExecutor to be completed before checking on with as_completed?

41 Views Asked by At
def main():
    start_time = time()
    cpu_cores = os.cpu_count() - LIMIT
    part_size = len(CHARACTERS)//cpu_cores
    
    for length in range(MAX_LENGTH):
        
        with Manager() as manager:
            event = manager.Event()
            
            with ProcessPoolExecutor() as executor:
                attacks = {}
                
                for part in range(cpu_cores):
                    if part == cpu_cores - 1:
                        first_bit = CHARACTERS[part_size * part :]
                    else:
                        first_bit = CHARACTERS[part_size * part : part_size * (part+1)]
                    
                    future = executor.submit(brute_force, SECRET_STRING, first_bit, event, *([CHARACTERS] * length)).result()
                    attacks[future] =  f"Brute Force {length} - {first_bit}"
                
                for attack in as_completed(attacks):
                    password = attack.result()
                    if password:
                        attack_type = attacks[attack]
                        print(f"Password found by {attack_type} attack: {password}")
                        event.set()
                        break
        
        if not password:
            print(f"Password not found on length of {length}")
        elif password:
            break
        
    end_time = time()
    duration = end_time - start_time
    print(f"Total execution time: {duration:.5f} seconds\n")
Traceback (most recent call last):
  File "/home/almus/Documents/BruteForce/crack_4.0.py", line 72, in <module>
    main()
  File "/home/almus/Documents/BruteForce/crack_4.0.py", line 53, in main
    for attack in as_completed(attacks):
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 224, in as_completed
    with _AcquireFutures(fs):
  File "/usr/lib/python3.10/concurrent/futures/_base.py", line 151, in __enter__
    future._condition.acquire()
AttributeError: 'NoneType' object has no attribute '_condition'

The problem seems came from as_completed checking the map when the executor have not submitting the result.

When i use ProcessPoolExecutor and as_comppleted on something fast like multiplying every number in a list, it works well. This time i use ProcessPoolExecutor and as_completed on tasks that takes time. This is the problem, is there a way to wait for all the tasks to complete before the main code goes to as_completed?

1

There are 1 best solutions below

0
TheHungryCub On

You can make use of concurrent.futures.wait() function. This function allows you to wait for a collection of futures to complete.

import os
from concurrent.futures import ProcessPoolExecutor, as_completed, wait
from time import time
from multiprocessing import Manager

LIMIT = 1  # Assuming you have this defined somewhere
MAX_LENGTH = 10  # Assuming you have this defined somewhere
SECRET_STRING = "secret"  # Assuming you have this defined somewhere
CHARACTERS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"  # Assuming you have this defined somewhere


def main():
    start_time = time()
    cpu_cores = os.cpu_count() - LIMIT
    part_size = len(CHARACTERS) // cpu_cores

    for length in range(MAX_LENGTH):

        with Manager() as manager:
            event = manager.Event()

            with ProcessPoolExecutor() as executor:
                attacks = {}

                for part in range(cpu_cores):
                    if part == cpu_cores - 1:
                        first_bit = CHARACTERS[part_size * part :]
                    else:
                        first_bit = CHARACTERS[part_size * part : part_size * (part + 1)]

                    future = executor.submit(
                        brute_force, SECRET_STRING, first_bit, event, *([CHARACTERS] * length)
                    )
                    attacks[future] = f"Brute Force {length} - {first_bit}"

                # Wait for all tasks to complete
                wait(attacks)

                for attack in as_completed(attacks):
                    password = attack.result()
                    if password:
                        attack_type = attacks[attack]
                        print(f"Password found by {attack_type} attack: {password}")
                        event.set()
                        break

        if not password:
            print(f"Password not found on length of {length}")
        elif password:
            break


def brute_force(secret, first_bit, event, *args):
    # Your brute force logic goes here
    pass


if __name__ == "__main__":
    main()

Output:

Password not found on length of 0
Password not found on length of 1
Password not found on length of 2
Password not found on length of 3
Password not found on length of 4
Password not found on length of 5
Password not found on length of 6
Password not found on length of 7
Password not found on length of 8
Password not found on length of 9