I am looking for something similar to the exhaustMap operator from rxjs, but RX.NET does not seem to have such an operator.
What I need to achieve is that, upon every element of the source stream, I need to start an async handler, and until it finishes, I would like to drop any elements from the source. As soon as the handler finishes, resume taking elements.
What I don't want is to start an async handler upon every element - while the handler runs, I want to drop source elements.
I also suspect I need to cleverly use the defer operator here?
Thank you!
Here is an implementation of the
ExhaustMapoperator. The source observable is projected to anIObservable<(Task<TResult>, int)>, where each subsequent task is either the previous one if it's still running, or otherwise a new task associated with the current item. Repeated occurrences of the same task are then removed with theDistinctUntilChangedoperator, and finally the observable is flattened with theConcatoperator.The tasks returned by the
functionare not guaranteed to be distinct. For example the methodasync Task<T> Return<T>(T result) => result;returns always the sameTaskforresult = 1orresult = false. Hence the need for the incrementedIdin the above implementation, that individualizes the tasks, so that theDistinctUntilChangeddoesn't filter out tasks from separatefunctioninvocations.Usage example:
Output:
Online demo.
Here is an alternative implementation of the
ExhaustMap, where thefunctionproduces anIObservable<TResult>instead of aTask<TResult>: