HTTP echo server using System.IO.Pipelines and Span<T>

605 Views Asked by At

I'm messing around trying to learn pipelines in .NET 6.0. What I'm trying to do is simple implementation of HTTP/1.1 server. Nothing fancy, probably wouldn't work for most real-life cases, but that doesn't matter anyway.

To make things easy, let's say I'm implementing echo server.

Parsing aside, what really bothers me is reading data from NetworkStream.

I have processing splitted into few phases:

  1. Read all lines with HTTP request start line and headers (basically reading all lines as long as I find out empty newline).
  2. Parsing request to get what's the size of body.
  3. Get body from stream based on provided length and encoding headers.
  4. Echo it back as correct HTTP response.

I'm doing something wrong while I'm filling PipeWriter. At some point it hangs the whole reading from network stream and returns data after exactly 5 seconds (not including parsing time, but that's a matter of few ms).

Everything was fine up to the point when I was just reading and parsing all lines from client (headers etc), but It bite me back when I got to the point when I needed to load body and echo it back. That's where the 5s started to appear.

I'm using Stream because it was easier to do few unit tests by providing MemoryStream into my class and test it separately from networking stack.

internal class PipelineStreamReader : IDisposable
{
    private readonly ILogger<PipelineStreamReader> logger;
    private readonly Pipe pipe;
    private readonly Task writing;
    private bool disposed;

    /// <summary>
    /// Create (possibly reusable) pipeline reader.
    /// </summary>
    /// <param name="stream">Most probably duplex NetworkStream, but could be MemoryStream in tests.</param>
    /// <param name="logger">Just logger.</param>
    public PipelineStreamReader(Stream stream, ILogger<PipelineStreamReader> logger)
    {
        this.logger = logger;

        pipe = new Pipe();
        writing = FillPipelineAsync(pipe.Writer, stream);
    }

    public Task<ReadOnlyMemory<char>[]> ReadLines(CancellationToken token)
    {
        return ReadLinesFromPipelineAsync(pipe.Reader, token);
    }
    
    public async ValueTask<ReadOnlyMemory<byte>> ReadBody(int length, ContentType contentType)
    {
        try
        {
            if (contentType.Charset != null)
            {
                return await ReadBodyFromPipelineAsync(pipe.Reader, length, contentType.Charset);
            }

            throw new NotImplementedException();
        }
        catch (ArgumentException e)
        {
            logger.LogError(e, "Incorrect length of body!");
            await pipe.Reader.CompleteAsync();
            await pipe.Writer.CompleteAsync();
            throw;
        }
    }

    private async Task FillPipelineAsync(PipeWriter writer, Stream input)
    {
        while (true)
        {
            // Allocate at least 16 bytes from the PipeWriter, could be more
            var memory = writer.GetMemory(16);
            try
            {
                var bytesRead = await input.ReadAsync(memory);
                if (bytesRead == 0)
                {
                    break;
                }

                // Tell the PipeWriter how much was read from the Socket
                writer.Advance(bytesRead);
            }
            catch (IOException fuck) when (disposed)
            {
                // TODO RK: Something doesn't dispose correctly
                // Not the brightest solution, but hey, it works...
            }
            catch (Exception ex)
            {
                Debug.Fail("Something failed!");
                logger.LogError(ex, "TODO af");
                throw;
            }

            // Make the data available to the PipeReader
            var result = await writer.FlushAsync();

            if (result.IsCompleted)
            {
                break;
            }
        }

        // Tell the PipeReader that there's no more data coming
        await writer.CompleteAsync();
    }

    private static async ValueTask<ReadOnlyMemory<byte>> ReadBodyFromPipelineAsync(PipeReader reader, int length, Encoding encoding)
    {
        var body = new ArrayBufferWriter<byte>();
        var charsRead = 0L;

        while (true)
        {
            var result = await reader.ReadAsync();
            var buffer = result.Buffer;

            ReadBufferUsingEncoding(buffer, body, encoding, ref charsRead);

            // Tell the PipeReader how much of the buffer we have consumed
            reader.AdvanceTo(buffer.Start, buffer.End);

            // There shouldn't be more bytes to read than what was provided in headers
            if (charsRead > length)
                throw new ArgumentException("Too much data");

            // Stop reading if there's no more data coming
            if (charsRead == length)
            {
                break;
            }

            if (result.IsCompleted)
            {
                break;
            }
        }

        // Check characters count
        if (charsRead < length)
            throw new ArgumentException("Wrong length");

        return body.WrittenMemory;
    }

    private static void ReadBufferUsingEncoding(
        ReadOnlySequence<byte> seq,
        IBufferWriter<byte> buffer,
        Encoding encoding,
        ref long charactersCount)
    {
        var segment = seq.Start;

        while (seq.TryGet(ref segment, out var memory))
        {
            buffer.Write(memory.Span);
            charactersCount += encoding.GetCharCount(memory.Span);
        }
    }

    private static async Task<ReadOnlyMemory<char>[]> ReadLinesFromPipelineAsync(PipeReader reader, CancellationToken token)
    {
        var list = new List<ReadOnlyMemory<char>>(16);
        var endOfLines = false;

        while (!endOfLines)
        {
            // Wait for data
            var read = await reader.ReadAsync(token);

            // Save to temporary buffer for reading lines
            var currentBuffer = read.Buffer;

            // check for end or completed stream (error in this case)
            if (read.Buffer.IsEmpty && read.IsCompleted)
                throw new InvalidOperationException("That shouldn't happen");

            SequencePosition? consumed = null;

            while (true)
            {
                // check if there is newline
                var newLine = currentBuffer.PositionOf((byte) '\n');

                // No newline, so let writer drop more data into buffers
                if (newLine == null) break;

                // we have newline, but let's check for windows carriage
                var carriage = currentBuffer.PositionOf((byte) '\r');
                var pos = carriage ?? newLine.Value;
                var line = ProcessLine(currentBuffer.Slice(0, pos));

                // advance current buffer for next line read and update consumed data
                var offset = carriage is null ? 1 : 2;
                var newPos = currentBuffer.GetPosition(offset, pos);
                currentBuffer = currentBuffer.Slice(newPos);

                // check if it's last line and update position to remove last newline
                if (line.Length == 0)
                {
                    consumed = read.Buffer.GetPosition(0, newPos);
                    endOfLines = true;
                    break;
                }

                // otherwise add line and update consumed data normally
                list.Add(line);
                consumed = read.Buffer.GetPosition(0, pos);
            }

            // clear consumed data
            reader.AdvanceTo(consumed ?? read.Buffer.Start, read.Buffer.End);
        }

        return list.ToArray();
    }

    private static ReadOnlyMemory<char> ProcessLine(ReadOnlySequence<byte> seq)
    {
        var buffer = new ArrayBufferWriter<char>();
        var segment = seq.Start;

        while (seq.TryGet(ref segment, out var memory))
        {
            Encoding.ASCII.GetChars(memory.Span, buffer);
        }

        return buffer.WrittenMemory;
    }

    protected virtual void Dispose(bool disposing)
    {
        if (disposed) return;

        if (!disposing) return;

        pipe.Reader.Complete();
        pipe.Writer.Complete();

        disposed = true;
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }
}

Update

Sorry, I forgot to actually ask question. Like I said, the problem is that my code is getting 5 seconds delay every time it's requested. It's exactly 5 seconds delay + few milliseconds for parsing.

I don't know what's the reason for that and what I'm doing wrong. This seems like something on networking stack is biting me, but I don't know.

1

There are 1 best solutions below

0
On

Okay, I found out how to solve it after looking at my other codes.

I still don't know what is the exact reason for pooling data for this long, but adding simple check in FillPipeline method for data availability solved my problem:

private async Task FillPipelineAsync(PipeWriter writer, Stream input)
{
    while (true)
    {
        // Allocate at least 16 bytes from the PipeWriter, could be more
        var memory = writer.GetMemory(16);
        try
        {
            // escape if no data is available
            if (!FirstRead && input is NetworkStream {DataAvailable: false})
                break;

            var bytesRead = await input.ReadAsync(memory);
            if (bytesRead == 0)
                break;

            // Tell the PipeWriter how much was read from the Socket
            writer.Advance(bytesRead);
        }
        catch (IOException fuck) when (disposed)
        {
            logger.LogError(fuck, "fuck");
            // TODO RK: Something doesn't dispose correctly
        }
        catch (Exception ex)
        {
            Debug.Fail("Something failed!");
            logger.LogError(ex, "TODO af");
            throw;
        }

        // Make the data available to the PipeReader
        var result = await writer.FlushAsync();

        if (result.IsCompleted)
        {
            break;
        }
    }

    // Tell the PipeReader that there's no more data coming
    await writer.CompleteAsync();
}