How to write a NWProtocolFramer for Network.framework that splits streams into frames using a delimiter?

970 Views Asked by At

I tried the following code to create a framer that splits a stream of ASCII bytes into frames separated by the pipe ascii character: "|".

import Network

fileprivate let pipe = Character("|").asciiValue!

class PipeFramer: NWProtocolFramerImplementation {
    static let label = "Pipe framer"
    static let definition = NWProtocolFramer.Definition(implementation: PipeFramer.self)

    var minLengthUntilNextMessage = 1 {
        didSet { print("client: minLength set to", minLengthUntilNextMessage) }
    }

    required init(framer: NWProtocolFramer.Instance) {}

    func start(framer: NWProtocolFramer.Instance) -> NWProtocolFramer.StartResult { .ready }

    func handleInput(framer: NWProtocolFramer.Instance) -> Int {
        while true {
            var delimiterPosition: Int?
            _ = framer.parseInput(minimumIncompleteLength: minLengthUntilNextMessage, maximumLength: 65535) { buffer, endOfMessage in
                if let buffer = buffer {
                    print("client: parsing buffer: \"\(String(bytes: buffer, encoding: .utf8) ?? buffer.debugDescription)\"")
                    if let indexOfDelimiter = buffer.firstIndex(of: pipe) {
                        minLengthUntilNextMessage = 1
                        delimiterPosition = indexOfDelimiter
                    } else {
                        minLengthUntilNextMessage = buffer.count + 1
                    }
                } else {
                    print("client: no buffer")
                }
                return 0
            }

            if let length = delimiterPosition {
                guard framer.deliverInputNoCopy(length: length, message: .init(instance: framer), isComplete: true) else {
                    return 0
                }
                _ = framer.parseInput(minimumIncompleteLength: 1, maximumLength: 65535) { _,_ in 1 }
            } else {
                return minLengthUntilNextMessage
            }
        }
    }

    func handleOutput(framer: NWProtocolFramer.Instance, message: NWProtocolFramer.Message, messageLength: Int, isComplete: Bool) {
        try! framer.writeOutputNoCopy(length: messageLength)
        framer.writeOutput(data: [pipe])
    }

    func wakeup(framer: NWProtocolFramer.Instance) {}

    func stop(framer: NWProtocolFramer.Instance) -> Bool { return true }

    func cleanup(framer: NWProtocolFramer.Instance) { }
}

The problem is that from the moment I get a chunk that does not end with "|", the framer gets stuck on that chunk. So the other chunks that come after this incomplete chunk never fully arrive in the framer.parseInput(...) call. Because it always parses chunks of minimumIncompleteLength and hence never arrives to the point where the next "|" is.

Here is a simple reproduction of this problem:

  1. Create a TCP server
  2. Setup the server so that it sends chunks of messages when a client connects.
  3. Connect to the server (created in 1.) using the framer from above.
  4. Start receiving messages.

Swift Code:

import Network

let client = DispatchQueue(label: "Server")
let server = DispatchQueue(label: "Client")
let networkParameters = NWParameters.tcp
networkParameters.defaultProtocolStack.applicationProtocols.insert(NWProtocolFramer.Options(definition: PipeFramer.definition), at: 0)
let server = try! NWListener(using: .tcp)

server.newConnectionHandler = { connection in
    print("server: new connection from", connection.endpoint)

    print("server (client \(connection.endpoint)): state", connection.state)

    connection.viabilityUpdateHandler = { viable in
        print("server (client \(connection.endpoint)): state", connection.state)
        if viable {
            print("server: sending")
            connection.send(content: "A|Be||Sea".data(using: .utf8)!, isComplete: false, completion: .idempotent)

            serverQueue.asyncAfter(deadline: .now() + 5) {
                print("server: sending second part")
                connection.send(content: " is longer than expected|0|".data(using: .utf8)!, isComplete: true, completion: .idempotent)
            }
            serverQueue.asyncAfter(deadline: .now() + 8) {
                print("server: sending last part")
                connection.send(content: "Done|".data(using: .utf8)!, isComplete: true, completion: .idempotent)
            }
        }
    }

    connection.start(queue: serverQueue)
}

server.stateUpdateHandler = { state in
    print("server:", state)
    if state == .ready, let port = server.port {
        print("server: listening on", port)
    }
}

server.start(queue: serverQueue)

let client = NWConnection(to: .hostPort(host: "localhost", port: server.port!), using: networkParameters)

func receiveNext() {
    client.receiveMessage { (data, context, complete, error) in
        let content: String
        if let data = data {
            content = String(data: data, encoding: .utf8) ?? data.description
        } else {
            content = data?.debugDescription ?? "<no data>"
        }
        print("client: received \"\(content)\"", context.debugDescription, complete, error?.localizedDescription ?? "No error")

        receiveNext()
    }
}

client.stateUpdateHandler = { state in
    print("client:", state)

    if state == .ready {
        print("client: receiving")
        receiveNext()
    }
}

client.start(queue: clientQueue)

Results in:

server: waiting(POSIXErrorCode: Network is down)
server: ready
server: listening on 54894
client: preparing
client: ready
client: receiving
server: new connection from ::1.53179
server (client ::1.53179): state setup
server (client ::1.53179): state ready
server: sending
client: parsing buffer: "A|Be||Sea"
client: minLength set to 1
client: parsing buffer: "Be||Sea"
client: minLength set to 1
client: parsing buffer: "|Sea"
client: minLength set to 1
client: parsing buffer: "Sea"
client: minLength set to 4
client: parsing buffer: ""
client: minLength set to 1
client: received "A" Optional(Network.NWConnection.ContentContext) true No error
client: received "Be" Optional(Network.NWConnection.ContentContext) true No error
client: received "<no data>" Optional(Network.NWConnection.ContentContext) true No error
client: parsing buffer: "Sea"
client: minLength set to 4
server: sending second part
client: parsing buffer: "Sea "
client: minLength set to 5
client: parsing buffer: "Sea i"
client: minLength set to 6
server: sending last part
client: parsing buffer: "Sea is"
client: minLength set to 7
client: parsing buffer: "Sea is "
client: minLength set to 8

Notice that the fourth and fifth message are never received by the client. How should I write the Framer so that it receives messages after an incoming incomplete chunk?


References

1

There are 1 best solutions below

0
On

I had exactly the same problem... The network protocol that I was working with also had a simple delimiter that separated each 'message' and the protocol had no header that told me what to expect. Often at the end of the buffer, there was only a partial message with no delimiter and needed to read more bytes to get the remainder of the message. Something like this:

|              PACKET A              |              PACKET B             |
|<message>|<message>|<message><mess...age>|<message><message><message><m...essage>
     1         2         4      5a    5b      6        7        8      9a    9b

Note:
delimiter = | - single character
lhsMessage = message 5a 
rhsMessage = message 5b

Even after watching WWDC and looking at the other examples from Apple, I still do not completely understand how handleInput and parseInput are supposed to function.

I assumed I could simply return from handleInput with the (lhsMessage.count + 1) and it would keep the partial message in the current buffer AND add additional bytes into the buffer (ie from PACKET B) that parseInput could inspect.

However, it does appear to work that way. Instead I ended up storing the value of lhsMessage in a class var and then returned lhsMessage.count from parseInput, which I believe moves the ‘cursor’ in the buffer to end and forces handleInput to get the new packet (ie packet B).

As part of parseInput, I then check if I have a lhsMessage and then assume if I find a delimiter that it is in fact rhsMessage. I then join LHS and RHS to create a completeMessage. At this point, I also return from parseInput the value of (rhsMessage.count + 1) to move the cursor along again.

Now to send this completeMessage I could not use deliverInputNoCopy as the bytes that make up completeMessage were no longer in the buffer :-)

Instead handleInput sent the message back using deliverInput.