Stop creating Innecesary threads if I have data

48 Views Asked by At

I am trying to parallelize a function that gets data from an API, but depending on the arguments that I give I recieve the information by blocks. so for example in some cases I recieve 5 blocks and sometimes 100 blocks.

When I am trying to obtain blocks if one return none means there is not more data so if block 40 return none but block 39 return some data means that the first 39 blocks contain some data and the other blocks is possible that if the code detect a block is none doesn't create threads for the blocks that are bigger than this and extract all the blocks with data the efficient way possible

This is my actual code:

blocksData = []
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
      futures = {executor.submit(func, step), i for i in range(maxBlocks)}
      for future in concurrent.futures.as_completed(futures):
             if future.result() is not None:
                  blocksData += future.result()
return blocksData

def func(step):
   #Logic
   if len(block) = 0:
       return None
    else:
        return block
1

There are 1 best solutions below

0
Hai Vu On

Your code creates maxBlocks threads at a time, which account for a lot of wasted process. It is better to create threads in batches, with pause in between.

The size of the batch determine the balance between wasted threads and performance. For example, the smaller the batch size, the less waste, but performance might suffer due to not able to utilize the system's resources.

Here is a sample code:

import logging
import random
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Event

logging.basicConfig(
    level=logging.DEBUG,
    format="%(levelname)-8s | %(funcName)-18s | %(message)s",
)

SIMULATED_BLOCKS_COUNT = random.randint(10, 30)
MAX_BLOCKS = 1000


def func(step: int, the_end: Event):
    # Simulate the API call, which takes time
    time.sleep(random.randint(1, 3))

    # Simulate the no-more-blocks condition
    if step >= SIMULATED_BLOCKS_COUNT:
        logging.debug("step=%d, No more blocks", step)
        the_end.set()
        return None

    # Simulate the success condition
    return f"Block-{step}"


def main():
    """Entry"""
    the_end = Event()  # Signify there is no more blocks
    futures = {}  # {step: Futures}
    batch_size = 10

    with ThreadPoolExecutor(max_workers=10) as executor:
        for step in range(MAX_BLOCKS):
            if the_end.is_set():
                break

            if step > 0 and step % batch_size == 0:
                logging.debug("step=%d, wait for next batch", step)
                the_end.wait(timeout=5)

            futures[step] = executor.submit(func, step, the_end)

    # Determine the number of blocks that are not None
    # and collect the blocks
    blocks_count = min(i for i, future in futures.items() if future.result() is None)
    blocks_data = [futures[i].result() for i in range(blocks_count)]

    # Report
    logging.info("---- %d BLOCKS ----", blocks_count)
    for data in blocks_data:
        logging.info("Data: %s", data)


if __name__ == "__main__":
    main()

Sample output:

DEBUG    | main               | step=10, wait for next batch
DEBUG    | main               | step=20, wait for next batch
DEBUG    | main               | step=30, wait for next batch
DEBUG    | func               | step=24, No more blocks
DEBUG    | func               | step=26, No more blocks
DEBUG    | func               | step=27, No more blocks
DEBUG    | func               | step=30, No more blocks
DEBUG    | func               | step=28, No more blocks
DEBUG    | func               | step=29, No more blocks
DEBUG    | func               | step=25, No more blocks
INFO     | main               | ---- 24 BLOCKS ----
INFO     | main               | Data: Block-0
INFO     | main               | Data: Block-1
INFO     | main               | Data: Block-2
INFO     | main               | Data: Block-3
INFO     | main               | Data: Block-4
INFO     | main               | Data: Block-5
INFO     | main               | Data: Block-6
INFO     | main               | Data: Block-7
INFO     | main               | Data: Block-8
INFO     | main               | Data: Block-9
INFO     | main               | Data: Block-10
INFO     | main               | Data: Block-11
INFO     | main               | Data: Block-12
INFO     | main               | Data: Block-13
INFO     | main               | Data: Block-14
INFO     | main               | Data: Block-15
INFO     | main               | Data: Block-16
INFO     | main               | Data: Block-17
INFO     | main               | Data: Block-18
INFO     | main               | Data: Block-19
INFO     | main               | Data: Block-20
INFO     | main               | Data: Block-21
INFO     | main               | Data: Block-22
INFO     | main               | Data: Block-23

Notes

  • In the code, I used SIMULATED_BLOCKS_COUNT to simulate the actual blocks count the API returns
  • func now takes in an additional Event object called the_end. If the API returns None, then func will set this event.
  • In the loop in main, we monitor the_end: If it is set, then we know we are done
  • Look at output, for a blocks count of 24 and batch size of 10, we created 30 threads, waisting 6 of them. For a batch size of 10, we might waste up to 10 threads.
  • The pause time is also crucial: Shorter pause time will waste more threads, but longer pause time might get the job done in longer time.