I've tried with this, but it seems to have concurrency issues.
I don't fully understand what is wrong.
public static IObservable<byte> ToObservable(this Stream stream, int bufferSize = 4096)
{
var buffer = new byte[bufferSize];
return Observable
.FromAsync(async ct => (bytesRead: await stream.ReadAsync(buffer, 0, buffer.Length, ct).ConfigureAwait(false), buffer))
.Repeat()
.TakeWhile(x => x.bytesRead != 0)
.Select(x => x.buffer)
.SelectMany(x => x);
}
Your likely concurrency issue is that by the time that the
.SelectMany(x => x)is executing over the buffer, the call tostream.ReadAsyncis overwriting the buffer.You need to ensure that a copy of the buffer is returned int the
FromAsync.This version covers off on those issues:
I tested your original code and my version with this:
Yours failed every time and my succeeded.
One thing, with your function, that you might consider, is that the observable cannot be run concurrently and it it cannot be repeated as it doesn't manage the lifecycle of the stream and you have a single shared buffer.
You can fix this by changing the signature to this:
Now the code is much more robust.