How to create a Stream out of IObservable<byte>?

63 Views Asked by At

I've been thinking how to create a bridge between IObservable<byte> and Stream, but I'm completely lost.

The use case for this is when a function needs you to provide a Stream, but you have your data in an IObservable<Byte>.

Since the Stream is pull-based and IObservable is push-based, I don't even know if it's feasible to do such a thing without trying calling byteSequence.ToEnumerable().

The problem is that, in order to create a Stream out of it, I'd need to do

new MemoryStream(byteSequence.ToEnumerable().ToArray());

This will work, but will need to allocate ALL the bytes in memory and that is a deal breaker for many use cases.

UPDATE

I've been inspired by Marc Gravell's comment and I tried my luck up with this code that is using Pipes. Apparently, it works! I don't know if it's completely OK, but I've leave here for reference.

public static Stream ToStream(this IObservable<byte> observable, int bufferSize = 4096)
{
    var pipe = new System.IO.Pipelines.Pipe();

    var reader = pipe.Reader;
    var writer = pipe.Writer;

    observable
        .Buffer(bufferSize)
        .Select(buffer => Observable.FromAsync(async () => await writer.WriteAsync(buffer.ToArray())))
        .Concat()
        .Subscribe(_ => { }, onCompleted: () => { writer.Complete(); }, onError: exception => writer.Complete(exception));

    return reader.AsStream();
}

Feel free to improve it.

0

There are 0 best solutions below