Imagine you have a replay observable, created through one of the Replay
overloads.
Is there a variant of Replay
that produces a replay observable with a method that lets me remove an item/message/element from the replay buffer?
I need the replay observable in order to avoid a race condition: two observables A
and B
produce related messages. If A
produces a message m
, then B
may follow suit with a corresponding message m'
(but that's optional).
In order to filter out m'
from all of B
's messages, the subscriber to A
registers a subscriber with B
(filtered for the exact m'
message), since only when m
is received will the program be able to predict what a potentially incoming m'
will look like.
However, sometimes m'
arrives even before the subscriber to A
was scheduled to run, meaning there is no subscriber for m'
registered yet. I can solve this by wrapping B
as a replay observable, but I want to be able to clean up processed messages from the replay buffer, because I know each of B
's messages will be processed by exactly one subscriber.