I have a huge list of strings called term_list that I process one-by-one in a function called run_mappers(). One of the args is a csv_writer object. I append results to a list called from_mapper in the function. I write that list to a csv file using the csv_writer object. In my scouring for help, I read that multiprocessing module is not recommended for csv writing because it it pickles and csv_writer objects can't be pickled (can't find reference for this now in my billion tabs open on my desktop). I am not sure if multiprocessing is best suited for my task anyway.
def run_mappers(individual_string, other_args, csv_writer):
# long processing code goes here, ending up with processed_result
from_mapper.append(processed_result)
csv_writer.writerow(processed_result)
I want to speed up processing of this huge list, but am trying to control for memory usage by splitting the list into batches to process (term_list_batch). So I try:
def parallelize_mappers(term_list_batch, other_args, csv_writer):
future_to_term = {}
terms_left = len(term_list_batch)
with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor:
future_to_term = {executor.submit(run_mappers, term_list_batch, other_args, csv_writer): term for term in term_list_batch}
try:
for future in concurrent.futures.as_completed(future_to_term, timeout=180): # timeout after 3 min
term = future_to_term[future]
try:
result = future.result()
# Process result if needed
except Exception as exc:
print(f"Job {term} generated an exception: {exc}")
finally:
terms_left -= 1
if terms_left % 10 == 0:
gc.collect()
time.sleep(2)
except concurrent.futures.TimeoutError:
print("Timeout occurred while processing futures")
for key, future in future_to_term.items():
if key not in results:
future.cancel()
When I get a Timeouterror, my process just hangs and I'm not sure what to do to keep moving forward in my huge term_list. I also don't want to terminate the program. I just want to keep moving through term_list, or process the next batch. If a thread fails or something, I just want to ignore the term or toss the whole thread and continue processing term_list to write as many results to the file as I can.
Amongst my many attempts to trouble-shoot, I tried something like this, but am posting the one above as my best shot since it crunched through a few hundred terms before stalling on me. Other tries I've had had just died, had some Runtime error, had threads deadlocking, etc.
For reference, another attempt is below:
def parallelize_mappers(term_list_batch, other_args, csv_writer):
timeout = 120
terminate_flag = threading.Event()
# Create a thread for each term
threads = []
for term in term_list_batch:
thread = threading.Thread(target=run_mappers, args=(term, other_args, csv_writer, terminate_flag))
threads.append(thread)
thread.start()
# Wait for all threads to complete or timeout
for thread in threads:
thread.join(timeout)
# If the thread is still alive, it has timed out
if thread.is_alive():
print("Thread {} timed out. Terminating...".format(thread.name))
terminate_flag.set() # Set the flag to terminate the thread
Then I added a while not terminate_flag.is_set() to the run_mappers() function before executing rest of processing code. But this is just unbearably slow. Thank you in advance.
Mock code to reproduce/term_list to process below:
term_list = ['Dementia',
'HER2-positive Breast Cancer',
'Stroke',
'Hemiplegia',
'Type 1 Diabetes',
'Hypospadias',
'IBD',
'Eating',
'Gastric Cancer',
'Lung Cancer',
'Carcinoid',
'Lymphoma',
'Psoriasis',
'Fallopian Tube Cancer',
'Endstage Renal Disease',
'Healthy',
'HRV',
'Recurrent Small Lymphocytic Lymphoma',
'Gastric Cancer Stage III',
'Amputations',
'Asthma',
'Lymphoma',
'Neuroblastoma',
'Breast Cancer',
'Healthy',
'Asthma',
'Carcinoma, Breast',
'Fractures',
'Psoriatic Arthritis',
'ALS',
'HIV',
'Carcinoma of Unknown Primary',
'Asthma',
'Obesity',
'Anxiety',
'Myeloma',
'Obesity',
'Asthma',
'Nursing',
'Denture, Partial, Removable',
'Dental Prosthesis Retention',
'Obesity',
'Ventricular Tachycardia',
'Panic Disorder',
'Schizophrenia',
'Pain',
'Smallpox',
'Trauma',
'Proteinuria',
'Head and Neck Cancer',
'C14',
'Delirium',
'Paraplegia',
'Sarcoma',
'Favism',
'Cerebral Palsy',
'Pain',
'Signs and Symptoms, Digestive',
'Cancer',
'Obesity',
'FHD',
'Asthma',
'Bipolar Disorder',
'Healthy',
'Ayerza Syndrome',
'Obesity',
'Healthy',
'Focal Dystonia',
'Colonoscopy',
'ART',
'Interstitial Lung Disease',
'Schistosoma Mansoni',
'IBD',
'AIDS',
'COVID-19',
'Vaccines',
'Beliefs',
'SAH',
'Gastroenteritis Escherichia Coli',
'Immunisation',
'Body Weight',
'Nonalcoholic Steatohepatitis',
'Nonalcoholic Fatty Liver Disease',
'Prostate Cancer',
'Covid19',
'Sarcoma',
'Stroke',
'Liver Diseases',
'Stage IV Prostate Cancer',
'Measles',
'Caregiver Burden',
'Adherence, Treatment',
'Fracture of Distal End of Radius',
'Upper Limb Fracture',
'Smallpox',
'Sepsis',
'Gonorrhea',
'Respiratory Syncytial Virus Infections',
'HPV',
'Actinic Keratosis']
The way I see it, you want to parallel or multitask
run_mappers()because this function might take a long time to finish. The CSV writing part does not need to be run in parallel because it is done relatively quick.The first step is to redesign
run_mappers()NOT to take in as parameter a CSV writer. Instead, this function should return theprocessed_result. This function might raise an exception and we will ignore the result for that thread. To be useful, I will write the errors out toerr.txtOutput
Notes
run_mappers()codeother_argslook like, so I fake itThreadPoolExecutorwithProcessPoolExecutorand compare the timing to see which solution works more efficently