I have an apparently quiet usual problem of consolidation of results after a multiprocessing with concurrent.futures.
I have found a related question Efficiently consolidate results of concurrent.futures parallel execution? and How to append dataframe to an empty dataframe using concurrent but I can't make it work for me.
I have a list of positions, for each of them I build a timeserie based on cashflows and financial statements (other dataframes). Each timeserie is a process and at the end I need to consolidate the results in a big dataframe. Order of processing and in final dataframe is irrelevant.
My code is:
with concurrent.futures.ProcessPoolExecutor() as executor:
# results_list = [executor.submit(PerfCalc, FX_Rates, DB_Posiciones.iloc[i], TS_Flujos_light.loc[(n_flujos_light_pos == DB_Posiciones['Id. Pos.'].values[i])], TS_Ind_Fin_light.loc[(n_indfin_light_fondo == DB_Posiciones['Id. Pos.'].values[i][-15:-3])]) for i in range(len(DB_Posiciones['Id. Pos.']))]
results_list = []
for i in range(len(DB_Posiciones['Id. Pos.'])):
pos = DB_Posiciones['Id. Pos.'].values[i]
fondo = pos[-15:-3]
filt_n_flujos = (n_flujos_light_pos == pos)
filt_n_indfin = (n_indfin_light_fondo == fondo)
results_list.append(executor.submit(PerfCalc, FX_Rates, DB_Posiciones.iloc[i], TS_Flujos_light.loc[(filt_n_flujos)], TS_Ind_Fin_light.loc[(filt_n_indfin)]))
# Very slow solution:
for f in results_list:
df = df.append(f.result())
When I remove the append loop and replace the slow solution by:
df = pd.concat(results_list, ignore_index=True)
I have the following error:
TypeError: cannot concatenate object of type '<class 'generator'>'; only Series and DataFrame objs are valid
I also tried to change the result in the first loop by :
results = pd.concat(executor.submit(PerfCalc, FX_Rates, DB_Posiciones.iloc[i], TS_Flujos_light.loc[(filt_n_flujos)], TS_Ind_Fin_light.loc[(filt_n_indfin)]))
Then the error is:
TypeError: 'Future' object is not iterable
And when I change executor.submit by executor.map I have the following error regarding the 1st variable extracted from the dataframe received as 2nd argument of the function "DB_Posiciones.iloc[I]:
TypeError: 'Timestamp' object is not subscriptable
I tried to change the variable to a fixed date but the problem continues, so I think it more related to how map works. Because of this error and the fact that the output order is irrelevant I think it is better for me to work with submit but I am open to suggestions.
Finally found a simple way... List comprehensions! I take the results of the future object with a list comprehension and then use pd.concat! I post it here if it can help anyone: