I'm using ThreadPoolExecutor to create multiple worker in order to download simultaneously multiple table from database with an API. The API limitation is 200 request per minute (rate_limit=60/200), and as I'm trying to download 5 table i use 5 worker (num_worker=5 ). Then my time sleep is time_sleep=rate_limit*num_worker .
I would like to use a variable like active_num_workerto adapt the time sleep in order to increase the download speed of the remaining tables when other workers have finished their task.
Main code:
# create list of tables
list_of_travelperks_tables=['users','trips','invoices','bookings','suppliers']
# initialize empty list for dataframes
dfs = []
num_workers=5
rate_limit = 60/200 #rate limit is 200 requests per minute
def load_and_convert_table(i):
# Load the JSON data
data = load_travelperk_table(i, os.environ['TP_API_KEY'], rate_limit*num_workers)
# Convert the list of dictionaries to a DataFrame
df = pd.json_normalize(data)
return i, df
# Create a dictionary to store the dataframes
dfs_dict = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = executor.map(load_and_convert_table, list_of_travelperks_tables)
for i, df in futures:
df_name = f'df_{i}'
dfs_dict[df_name] = df # Store the dataframe in the dictionary
dfs.append((df_name, df))
load_travelperk_table function:
def load_travelperk_table(table_name: str, ApiKey: str, time_sleep: int, parameters=None):
#code to download
time.sleep(time_sleep)
return data
How should I proceed, once a worker finished its task the parameter in the other worker are changed?
