I've recently discovered the System.IO.Pipelines namespace and it looks interesting. I've been trying to implement the IDuplexPipe interface in the context of a simple TCP server which accepts connections and then communicates back and forth with the connected client.
However, I'm struggling to make it stable. It feels like I've misunderstood something fundamental. I've also been googling for reference implementations of the interface to guide me in the right direction.
This is to my knowledge the most complete document on System.IO.Pipelines out there. The exmple I've provided below is heavily borrowing from it.
- https://github.com/davidfowl/DocsStaging/blob/master/Pipelines.md
- https://devblogs.microsoft.com/dotnet/system-io-pipelines-high-performance-io-in-net/
My question: what would a typical implementation of the IDuplexPipe interface look like in the context of a TCP server?
Btw, this is what I have currently. The idea is to setup a new "duplex communication" by providing an established SslStream:
public class DuplexCommunication : IDuplexPipe
{
public PipeReader Input => _receivePipe.Reader;
public PipeWriter Output => _transmitPipe.Writer;
private readonly SslStream _stream;
// Data received from the SslStream will end up on this pipe
private readonly Pipe _receivePipe = new Pipe();
// Data that is to be transmitted over the SslStream ends up on this pipe
private readonly Pipe _transmitPipe = new Pipe();
private readonly CancellationToken _cts;
private Task _receive;
private Task _transmit;
public DuplexCommunication(SslStream stream, CancellationToken cts)
{
_stream = stream;
_cts = cts;
_receive = Receive();
_transmit = Transmit();
}
private async Task Receive()
{
Exception error = null;
try
{
while (!_cts.IsCancellationRequested)
{
var buffer = _receivePipe.Writer.GetMemory(1);
var bytes = await _stream.ReadAsync(buffer, _cts);
_receivePipe.Writer.Advance(bytes);
if (bytes == 0) {
break;
}
var flush = await _receivePipe.Writer.FlushAsync(_cts);
if (flush.IsCompleted || flush.IsCanceled)
{
break;
}
}
}
catch (Exception ex)
{
// This might be "stream is closed" or similar, from when trying to read from the stream
Console.WriteLine($"DuplexPipe ReceiveTask caugth an exception: {ex.Message}");
error = ex;
}
finally
{
await _receivePipe.Writer.CompleteAsync(error);
}
}
private async Task Transmit()
{
Exception error = null;
try
{
while (!_cts.IsCancellationRequested)
{
var read = await _transmitPipe.Reader.ReadAsync(_cts);
var buffer = read.Buffer;
if (buffer.IsEmpty && read.IsCompleted)
{
break;
}
foreach (var segment in buffer)
{
await _stream.WriteAsync(segment, _cts);
}
_transmitPipe.Reader.AdvanceTo(buffer.End);
await _stream.FlushAsync(_cts);
}
}
catch (Exception e)
{
Console.WriteLine($"DuplexPipe Transmit caught an exception: {e.Message}");
error = e;
}
finally
{
await _transmitPipe.Reader.CompleteAsync(error);
}
}
}
So, I spent some more time searching around the internet and found something that sort of solves my problem. I found, among some other things, this question: How to handle incoming TCP messages with a Kestrel ConnectionHandler?
Here it seems like the ConnectionHandler class provides exactly what I need, including some very handy plumbing for handling SSL certificates, port listening, etc that comes for free when building an ASP.NET application.