I have an apparently quiet usual problem of consolidation of results after a multiprocessing with concurrent.futures.

I have found a related question Efficiently consolidate results of concurrent.futures parallel execution? and How to append dataframe to an empty dataframe using concurrent but I can't make it work for me.

I have a list of positions, for each of them I build a timeserie based on cashflows and financial statements (other dataframes). Each timeserie is a process and at the end I need to consolidate the results in a big dataframe. Order of processing and in final dataframe is irrelevant.

My code is:

with concurrent.futures.ProcessPoolExecutor() as executor:
    # results_list = [executor.submit(PerfCalc, FX_Rates, DB_Posiciones.iloc[i], TS_Flujos_light.loc[(n_flujos_light_pos == DB_Posiciones['Id. Pos.'].values[i])], TS_Ind_Fin_light.loc[(n_indfin_light_fondo == DB_Posiciones['Id. Pos.'].values[i][-15:-3])]) for i in range(len(DB_Posiciones['Id. Pos.']))]
    results_list = []
    for i in range(len(DB_Posiciones['Id. Pos.'])):
        pos = DB_Posiciones['Id. Pos.'].values[i]
        fondo = pos[-15:-3]
        filt_n_flujos = (n_flujos_light_pos == pos)
        filt_n_indfin = (n_indfin_light_fondo == fondo)
        results_list.append(executor.submit(PerfCalc, FX_Rates, DB_Posiciones.iloc[i], TS_Flujos_light.loc[(filt_n_flujos)], TS_Ind_Fin_light.loc[(filt_n_indfin)]))

    # Very slow solution:
    for f in results_list:
          df = df.append(f.result())

When I remove the append loop and replace the slow solution by:

          df = pd.concat(results_list, ignore_index=True)

I have the following error:

TypeError: cannot concatenate object of type '<class 'generator'>'; only Series and DataFrame objs are valid

I also tried to change the result in the first loop by :

    results = pd.concat(executor.submit(PerfCalc, FX_Rates, DB_Posiciones.iloc[i], TS_Flujos_light.loc[(filt_n_flujos)], TS_Ind_Fin_light.loc[(filt_n_indfin)]))

Then the error is:

TypeError: 'Future' object is not iterable

And when I change executor.submit by executor.map I have the following error regarding the 1st variable extracted from the dataframe received as 2nd argument of the function "DB_Posiciones.iloc[I]:

TypeError: 'Timestamp' object is not subscriptable

I tried to change the variable to a fixed date but the problem continues, so I think it more related to how map works. Because of this error and the fact that the output order is irrelevant I think it is better for me to work with submit but I am open to suggestions.

1

There are 1 best solutions below

0
On

Finally found a simple way... List comprehensions! I take the results of the future object with a list comprehension and then use pd.concat! I post it here if it can help anyone:

with concurrent.futures.ProcessPoolExecutor() as executor:
    results_list = [executor.submit(PerfCalc, FX_Rates, DB_Posiciones.iloc[i], TS_Flujos_light.loc[(n_flujos_light_pos == DB_Posiciones['Id. Pos.'].values[i])], TS_Ind_Fin_light.loc[(n_indfin_light_fondo == DB_Posiciones['Id. Pos.'].values[i][-15:-3])]) for i in range(len(DB_Posiciones['Id. Pos.']))]
    results = [f.result() for f in results_list]
    TS_Performance = pd.concat(results)