I'm simulating a calculation over a number of inputs which take a long while. Whenever a calculation is done (i.e. on_next is emitted), I hope to reactively append the result to a results data frame and print the final DF when on_completed is emitted. However, the DF is empty, why are no values accumulating?
This is with Python 3.9.9 and rxpy 3.2.0.
import time
from random import random, randint
from rx import create
import pandas as pd
import rx
print(rx.__version__)
def accumulate(result, i):
# Here the values should accumulate !?
result = result.append(pd.DataFrame({'a': [i]}))
def average_df(observer, scheduler):
for pid in pids:
time.sleep(random()*0.8)
observer.on_next(randint(0, 100))
observer.on_completed()
def print_result(result):
print(result)
# Client
if __name__ == "__main__":
result = pd.DataFrame({'a': []})
# Observable
pids = [1, 2, 3, 4]
source = create(average_df)
source.subscribe(
on_next = lambda i: accumulate(result, i),
on_error = lambda e: print("Error: {0}".format(e)),
on_completed = lambda: print_result(result)
)
The pandas append function returns a new object. In accumulate, this new object is set to the local variable result. This does not update the result variable that is in the main block.
You can get the final dataframe by returning the accumulated values at each step, thanks to the scan operator:
Also for more simplifications, you can use the to_pandas operators of the rxsci library (disclaimer, I am the author). Here is a solution with rxsci, and a more reactive way to create the source observable: