I am very new to reactive programming and I stuck at one point.
I try to implement if my collection has reaches a certain number element or certain time has paseed i need to trigger some method.
Purpose of use: I will process the events coming from the database, but I need to mark where I left off after every 200 events OR save last processed event every 10 seconds because maybe there won't be an event to reach 200 for a long time. But OnNext do not wait finish saveCheckPoint method and continue acepting processed item. I need to wait in before OnNext to sake of data consistency.
To simulate my design you can check this code, how can i achieve my requirement?
Subject<string> events = new Subject<string>();
events
.Buffer(TimeSpan.FromSeconds(10), 200)
.SelectMany(async x => await SaveCheckpoint(x)).Subscribe();
async Task<bool> SaveCheckpoint(IList<string> i)
{
var lastProcessedEvent = i.LastOrDefault();
if (lastProcessedEvent != null)
{
//Save checkpoint to db or somewhere else
await Task.Delay(1000);
Console.WriteLine($"Saved checkpoint of : {lastProcessedEvent}");
}
return await Task.FromResult(true);
}
for (int j = 1; j < 10000; j++)
{
Console.WriteLine("Event processed");
// I need this step must be wait until subscriber to comlete task
events.OnNext($"Event - {j}");
}
I am expecting to find a way to wait complete of Subscribe method before onNext
As for me you need to do it without async/await execution, like this:
This produces the following output: