I'm looking for the Rx method that will take an observable and put the latest item on a 'cooldown', so that when items are coming in slower than the cooldown they're just forwarded but when they're coming in faster you just get the latest value after each cooldown period.
Said a different way, I want to switch to sampling with period t
when items are separated by less than t
time (and switch back when they're spread out).
This is really similar to what Observable.Throttle does, except that the timer is not reset whenever a new item arrives.
The application I have in mind is for sending 'latest value' updates across the network. I don't want to communicate a value unless it has changed, and I don't want to spam a rapidly changing value so much that I swamp out other data.
Is there a standard method that does what I need?
Strilanc, given your concern about unwanted activity when the source stream is quiet, you might be interested in this method of pacing events - I wasn't going to add this otherwise, as I think J. Lennon's implementation is perfectly reasonable (and much simpler), and the performance of the timer isn't going to hurt.
There is one other interesting difference in this implementation - it differs from the
Sample
approach because it emits events occurring outside the cooldown period immediately rather than at the next sampling interval. It maintains no timer outside the cooldown.EDIT - Here is v3 solving the issue Chris mentioned in the comments - it ensures that changes occurring during the cool-down themselves trigger a new cool-down period.
This works by initially using a
GroupByUntil
to pack all events into the same group for the duration of the cool-down period. It watches for changes and emits the final change (if any) as the group expires.Then the resulting events are projected into a streams whose OnCompleted is delayed by the cool-down period. These streams are then concatenated together. This prevents events being any closer together than the cool-down, but otherwise they are emitted as soon as possible.
Here are the unit tests (updated for v3 edit), which you can run using nuget packages
rx-testing
andnunit
: