ReadOnlySequence – Slice to given SequencePosition + 1

1.4k Views Asked by At

I try to read some data from a ReadOnlySequence. The data are formatted as frames. Each frame is terminated by a NULL byte (octet 0).

My code searches for the end of a frame using ReadOnlySequence.PositionOf. When it finds a NULL byte, it will process all bytes up to the position of the NULL byte. After processing I would like to process the next frame by slicing the input and repeat the previous steps. Since the frame ended before the NULL byte, the NULL byte would be part of the next sequence of bytes if I wouldn’t slice the input data again (start = 1).

Is there a way to slice a ReadOnlySequence with a SequencePosition + 1 item/byte as start value?

I tried to use SequencePosition.GetInteger + 1 as start value, but that does not work since GetInteger sometimes returns values larger then the length of the ReadOnlySequence. Slicing to the value returned by GetInteger results in the following exception: System.ArgumentOutOfRangeException: Specified argument was out of the range of valid values. (Parameter 'start')

Minimal Reproducible Example

using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp1
{
    public class Program
    {
        private static IDuplexPipe _pipe;

        public static async Task Main( String[] args )
        {
            var pipe = new Pipe();
            _pipe = new DuplexPipe( pipe.Reader, pipe.Writer );

            var firstMessage = Encoding.UTF8.GetBytes( "CONNECTED\nversion:1.1\nsession:2a840965\nserver:ActiveMQ-Artemis/2.8.0 ActiveMQ Artemis Messaging Engine\nheart-beat:10000,10000\n\n\0\n" );
            await _pipe.Output.WriteAsync( firstMessage );
            await _pipe.Output.FlushAsync();

            var secondMessage =
                Encoding.UTF8.GetBytes(
                    "\n\nMESSAGE\nsubscription:test-839c7766-0f38-4579-a3fc-74de35408536Sub1\ncontent-length:4\nmessage-id:2147486350\ndestination:/queue/TestQ\nexpires:1572278642017\nredelivered:false\npriority:5\npersistent:true\ntimestamp:1572278582050\ndestination-type:ANYCAST\nreceipt:2\ntest:test\nNMSXDeliveryMode:true\ntransformation:jms-byte\ntimestamp:1572278582017\n\nHello World\0\n" );
            await _pipe.Output.WriteAsync( secondMessage );
            await _pipe.Output.FlushAsync();

            var readResult = await _pipe.Input.ReadAsync();
            var buffer = readResult.Buffer;
            while ( TryParseFrame( ref buffer ) )
            {
                // ...
            }

            _pipe.Input.AdvanceTo( buffer.Start, buffer.End );

            Console.ReadLine();
        }

        private static Boolean TryParseFrame( ref ReadOnlySequence<Byte> inputBuffer )
        {
            var endOfFrame = inputBuffer.PositionOf( ByteConstants.Null );
            if ( endOfFrame == null )
                return false;

            var frameBuffer = inputBuffer.Slice( 0, endOfFrame.Value );
            // parse and process the frame...

            // This works....
            //inputBuffer = inputBuffer.Slice( frameBuffer.End );
            //inputBuffer = inputBuffer.Slice( 1 );

            // This does NOT.
            try
            {
                var end = frameBuffer.End.GetInteger();
                var length = inputBuffer.Length;
                Console.WriteLine( $" END: {end}, LENGTH: {length} " );
                inputBuffer = inputBuffer.Slice( end + 1 );
            }
            catch ( Exception ex )
            {
                Console.WriteLine( ex );
                // Make sure we can read the next frame...
                inputBuffer = inputBuffer.Slice( frameBuffer.End );
                inputBuffer = inputBuffer.Slice( 1 );
            }

            return true;
        }
    }

    public class DuplexPipe : IDuplexPipe
    {
        public DuplexPipe( PipeReader input, PipeWriter output )
        {
            Input = input;
            Output = output;
        }

        public PipeReader Input { get; }
        public PipeWriter Output { get; }
    }

    public static class ByteConstants
    {
        public const Byte HeaderDelimiter = 58;
        public const Byte LineFeed = 10;
        public const Byte Null = 0;
    }
}
2

There are 2 best solutions below

0
Jonathan On

I had a play with pipes recently, and quickly gave up on manipulating the SequencePosition constructs.

ReadOnlySequence exposes a GetPosition(Int64, SequencePosition) method, where the first argument is an offset (how far to read along) starting at the provided SequencePosition (in your case the position returned for the terminating null-byte).

Consider the following:

private static bool TryParseFrame(ref ReadOnlySequence<byte> inputBuffer)
{
    var endOfFrame = inputBuffer.PositionOf(ByteConstants.Null);
    if (endOfFrame == null)
        return false;

    // Get SequencePosition 1 place after endOfFrame
    var sliceEnd = inputBuffer.GetPosition(1, endOfFrame.Value);

    inputBuffer = inputBuffer.Slice(0, sliceEnd);

    return true;
}

Not sure if the above is exactly what you're looking for, but hopefully GetPosition() is will help you out.

Disclaimer: I don't know what happens if your offset falls outside of the ReadOnlySequence, but I imagine you'd have problems when trying to slice the buffer. In your specific case, however, you already know that the byte is there.

0
davidfowl On

PS: SequenceReader<T> is your best friend for parsing a ReadOnlySequence<T>