Corrupted network write in kj AsyncIoStream

138 Views Asked by At

I'm implementing an networked application on the excellent kj-async library from Cap'n Proto.

The example code below serves a (large, preloaded) file (argv[1]) over TCP. If I redirect it directly to a file with nc localhost 9004 > test, the file is retrieved correctly.

If I let the file output to the terminal without redirecting to a file, I see intermittent corruption. I suspect the underlying call to ::write is failing with EAGAIN/EWOULDBLOCK when the terminal can't write the output fast enough, and the next call is somehow using incorrect offsets. This was much more reliably reproducible when I tried fetching it over wifi.

I think I must have some issues with object lifetimes, but there is no hint from valgrind.

I used angular.min.js but any not-too-small file reproduced the problem for me

I'm on 64bit Linux 4.1.6 (Arch) with libkj from git

#include <fstream>
#include <string>
#include <kj/async-io.h>

class Server : public kj::TaskSet::ErrorHandler {
public:
    Server(const char* filename) :
        tasks(*this),
        ctx(kj::setupAsyncIo())
    {
        std::ifstream fstr(filename);
        fstr.seekg(0, std::ios::end);
        size_t sz = fstr.tellg();
        if(fstr.rdstate() == 0) {
            large.resize(sz);
            fstr.seekg(0);
            fstr.read(&large[0], sz);
        }
    }
    void taskFailed(kj::Exception&& exception) override {}
    void run() {
        tasks.add(ctx.provider->getNetwork().parseAddress("*:9004", 0)
               .then([&](kj::Own<kj::NetworkAddress>&& addr) {
            auto listener = addr->listen();
            accept(kj::mv(listener));
        }));
        kj::NEVER_DONE.wait(ctx.waitScope);
    }
private:
    void accept(kj::Own<kj::ConnectionReceiver>&& listener) {
        auto ptr = listener.get();
        tasks.add(ptr->accept().then(kj::mvCapture(kj::mv(listener),
           [&](kj::Own<kj::ConnectionReceiver>&& listener,
                kj::Own<kj::AsyncIoStream>&& cn) {
            accept(kj::mv(listener));
            tasks.add(cn->write(large.c_str(), large.size()).attach(kj::mv(cn)));
        })));
    }

private:    
    kj::TaskSet tasks;
    kj::AsyncIoContext ctx;
    std::string large;
};


int main(int argc, char** argv) {
    Server s(argv[1]);
    s.run();
}

Compiled and run:

g++ -std=c++11 test.cpp -lkj-async -lkj -o test
./test path/to/angular.min.js

Request:

nc localhost 9004
0

There are 0 best solutions below