Use the number of active worker as a variable in a function

55 Views Asked by At

I'm using ThreadPoolExecutor to create multiple worker in order to download simultaneously multiple table from database with an API. The API limitation is 200 request per minute (rate_limit=60/200), and as I'm trying to download 5 table i use 5 worker (num_worker=5 ). Then my time sleep is time_sleep=rate_limit*num_worker .

I would like to use a variable like active_num_workerto adapt the time sleep in order to increase the download speed of the remaining tables when other workers have finished their task.

enter image description here

Main code:

# create list of tables
list_of_travelperks_tables=['users','trips','invoices','bookings','suppliers']


# initialize empty list for dataframes
dfs = []
num_workers=5
rate_limit = 60/200   #rate limit is 200 requests per minute

def load_and_convert_table(i):
    # Load the JSON data
    data = load_travelperk_table(i, os.environ['TP_API_KEY'], rate_limit*num_workers)
    # Convert the list of dictionaries to a DataFrame
    df = pd.json_normalize(data)
    return i, df


# Create a dictionary to store the dataframes
dfs_dict = {}

with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
    futures = executor.map(load_and_convert_table, list_of_travelperks_tables)
    for i, df in futures:
        df_name = f'df_{i}'
        dfs_dict[df_name] = df  # Store the dataframe in the dictionary
        dfs.append((df_name, df))

load_travelperk_table function:

def load_travelperk_table(table_name: str, ApiKey: str, time_sleep: int, parameters=None):

        #code to download
        time.sleep(time_sleep)

    return data

How should I proceed, once a worker finished its task the parameter in the other worker are changed?

0

There are 0 best solutions below