How to cache items from hot observable into cold observable right after subscription

145 Views Asked by At

I have one global hot observable, which producing values independently. I have transformed global observable to derived hot. I need to create cold observable, which will replay all items from derived hot observable with keeping derived hot observable as hot without replay.

ConsoleOutput output = new();
ConsoleKey? pressedKey = null;
IDisposable? coldSourceSubscription = null;

var globalHotSource = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().AutoConnect();
var derivedHotSource = globalHotSource.DelaySubscription(TimeSpan.FromSeconds(4));
var derivedColdSource = derivedHotSource; // How to make it cold?

globalHotSource.Subscribe(x => output.AddToLine(0, x, "Initial HOT:"));
derivedHotSource.Subscribe(x => output.AddToLine(1, x, "Derived HOT:"));

while ((pressedKey = Console.ReadKey().Key) != ConsoleKey.Escape || coldSourceSubscription is null)
{
    if (pressedKey != ConsoleKey.Enter) continue;
    coldSourceSubscription = derivedColdSource.Subscribe(x => output.AddToLine(2, x, "Derived COLD:"));
}

Console.ReadLine();

ConsoleOutput is a helper class and here is the code for it:

class ConsoleOutput
{
    readonly List<List<string>> _lines = new(Enumerable.Repeat<List<string>>(null, 128));
    public ConsoleOutput()
    {
        Observable.Interval(TimeSpan.FromMilliseconds(500))
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(_ => Refresh());
    }

    public void AddToLine(int lineNumber, object item, string initialItemFormat)
    {
        (_lines[lineNumber] ??= new List<string>() { string.Format(initialItemFormat, lineNumber) }).Add(item.ToString());
    }

    private void Refresh()
    {
        var output = _lines.Where(x => x is { })
            .Select(x => x.Aggregate(string.Empty, (x, y) => x + " " + y))
            .DefaultIfEmpty()
            .Aggregate((x, y) => x + "\n" + y);

        Console.Clear();
        Console.WriteLine(output);
    }
}

And here is the sample output from console:

 Initial HOT: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14
 Derived HOT: 4 5 6 7 8 9 10 11 12 13 14
 Derived COLD: 7 8 9 10 11 12 13 14

First the code will produce items on the first line for global observable, after 4 seconds the derived one will produce it's items on the second line and the last one will be cold (which is not cold at the moment) which will produce values after user presses 'Enter' in console. I need for cold observable to show the same values, as the derived one.

The easiest way I could think of to solve it is to use ReplaySubject and insert .Do(x => replaySubject.OnNext(x)) to derived observable and then assign replaySubject as observable to the target cold observable. However, is it possible to achieve the same without using subject?

0

There are 0 best solutions below