I'm messing around trying to learn pipelines in .NET 6.0. What I'm trying to do is simple implementation of HTTP/1.1 server. Nothing fancy, probably wouldn't work for most real-life cases, but that doesn't matter anyway.
To make things easy, let's say I'm implementing echo server.
Parsing aside, what really bothers me is reading data from NetworkStream
.
I have processing splitted into few phases:
- Read all lines with HTTP request start line and headers (basically reading all lines as long as I find out empty newline).
- Parsing request to get what's the size of body.
- Get body from stream based on provided length and encoding headers.
- Echo it back as correct HTTP response.
I'm doing something wrong while I'm filling PipeWriter
. At some point it hangs the whole reading from network stream and returns data after exactly 5 seconds (not including parsing time, but that's a matter of few ms).
Everything was fine up to the point when I was just reading and parsing all lines from client (headers etc), but It bite me back when I got to the point when I needed to load body and echo it back. That's where the 5s started to appear.
I'm using Stream
because it was easier to do few unit tests by providing MemoryStream
into my class and test it separately from networking stack.
internal class PipelineStreamReader : IDisposable
{
private readonly ILogger<PipelineStreamReader> logger;
private readonly Pipe pipe;
private readonly Task writing;
private bool disposed;
/// <summary>
/// Create (possibly reusable) pipeline reader.
/// </summary>
/// <param name="stream">Most probably duplex NetworkStream, but could be MemoryStream in tests.</param>
/// <param name="logger">Just logger.</param>
public PipelineStreamReader(Stream stream, ILogger<PipelineStreamReader> logger)
{
this.logger = logger;
pipe = new Pipe();
writing = FillPipelineAsync(pipe.Writer, stream);
}
public Task<ReadOnlyMemory<char>[]> ReadLines(CancellationToken token)
{
return ReadLinesFromPipelineAsync(pipe.Reader, token);
}
public async ValueTask<ReadOnlyMemory<byte>> ReadBody(int length, ContentType contentType)
{
try
{
if (contentType.Charset != null)
{
return await ReadBodyFromPipelineAsync(pipe.Reader, length, contentType.Charset);
}
throw new NotImplementedException();
}
catch (ArgumentException e)
{
logger.LogError(e, "Incorrect length of body!");
await pipe.Reader.CompleteAsync();
await pipe.Writer.CompleteAsync();
throw;
}
}
private async Task FillPipelineAsync(PipeWriter writer, Stream input)
{
while (true)
{
// Allocate at least 16 bytes from the PipeWriter, could be more
var memory = writer.GetMemory(16);
try
{
var bytesRead = await input.ReadAsync(memory);
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket
writer.Advance(bytesRead);
}
catch (IOException fuck) when (disposed)
{
// TODO RK: Something doesn't dispose correctly
// Not the brightest solution, but hey, it works...
}
catch (Exception ex)
{
Debug.Fail("Something failed!");
logger.LogError(ex, "TODO af");
throw;
}
// Make the data available to the PipeReader
var result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
// Tell the PipeReader that there's no more data coming
await writer.CompleteAsync();
}
private static async ValueTask<ReadOnlyMemory<byte>> ReadBodyFromPipelineAsync(PipeReader reader, int length, Encoding encoding)
{
var body = new ArrayBufferWriter<byte>();
var charsRead = 0L;
while (true)
{
var result = await reader.ReadAsync();
var buffer = result.Buffer;
ReadBufferUsingEncoding(buffer, body, encoding, ref charsRead);
// Tell the PipeReader how much of the buffer we have consumed
reader.AdvanceTo(buffer.Start, buffer.End);
// There shouldn't be more bytes to read than what was provided in headers
if (charsRead > length)
throw new ArgumentException("Too much data");
// Stop reading if there's no more data coming
if (charsRead == length)
{
break;
}
if (result.IsCompleted)
{
break;
}
}
// Check characters count
if (charsRead < length)
throw new ArgumentException("Wrong length");
return body.WrittenMemory;
}
private static void ReadBufferUsingEncoding(
ReadOnlySequence<byte> seq,
IBufferWriter<byte> buffer,
Encoding encoding,
ref long charactersCount)
{
var segment = seq.Start;
while (seq.TryGet(ref segment, out var memory))
{
buffer.Write(memory.Span);
charactersCount += encoding.GetCharCount(memory.Span);
}
}
private static async Task<ReadOnlyMemory<char>[]> ReadLinesFromPipelineAsync(PipeReader reader, CancellationToken token)
{
var list = new List<ReadOnlyMemory<char>>(16);
var endOfLines = false;
while (!endOfLines)
{
// Wait for data
var read = await reader.ReadAsync(token);
// Save to temporary buffer for reading lines
var currentBuffer = read.Buffer;
// check for end or completed stream (error in this case)
if (read.Buffer.IsEmpty && read.IsCompleted)
throw new InvalidOperationException("That shouldn't happen");
SequencePosition? consumed = null;
while (true)
{
// check if there is newline
var newLine = currentBuffer.PositionOf((byte) '\n');
// No newline, so let writer drop more data into buffers
if (newLine == null) break;
// we have newline, but let's check for windows carriage
var carriage = currentBuffer.PositionOf((byte) '\r');
var pos = carriage ?? newLine.Value;
var line = ProcessLine(currentBuffer.Slice(0, pos));
// advance current buffer for next line read and update consumed data
var offset = carriage is null ? 1 : 2;
var newPos = currentBuffer.GetPosition(offset, pos);
currentBuffer = currentBuffer.Slice(newPos);
// check if it's last line and update position to remove last newline
if (line.Length == 0)
{
consumed = read.Buffer.GetPosition(0, newPos);
endOfLines = true;
break;
}
// otherwise add line and update consumed data normally
list.Add(line);
consumed = read.Buffer.GetPosition(0, pos);
}
// clear consumed data
reader.AdvanceTo(consumed ?? read.Buffer.Start, read.Buffer.End);
}
return list.ToArray();
}
private static ReadOnlyMemory<char> ProcessLine(ReadOnlySequence<byte> seq)
{
var buffer = new ArrayBufferWriter<char>();
var segment = seq.Start;
while (seq.TryGet(ref segment, out var memory))
{
Encoding.ASCII.GetChars(memory.Span, buffer);
}
return buffer.WrittenMemory;
}
protected virtual void Dispose(bool disposing)
{
if (disposed) return;
if (!disposing) return;
pipe.Reader.Complete();
pipe.Writer.Complete();
disposed = true;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
Update
Sorry, I forgot to actually ask question. Like I said, the problem is that my code is getting 5 seconds delay every time it's requested. It's exactly 5 seconds delay + few milliseconds for parsing.
I don't know what's the reason for that and what I'm doing wrong. This seems like something on networking stack is biting me, but I don't know.
Okay, I found out how to solve it after looking at my other codes.
I still don't know what is the exact reason for pooling data for this long, but adding simple check in
FillPipeline
method for data availability solved my problem: