NOTE: I am using the words "client" and "child" interchangeably in this post to refer to the process launched from the "server".

I am using boost::process::async_pipe to write the STDIN of a process that I launch using boost::process::child. Assume my server program looks something like this:

(This is not a working server-demo)

server.cpp

int main()
{
    using namespace std::chrono_literals;

    boost::process::async_pipe writePipe;
    boost::process::child child { "client", boost::process::std_in < _writePipe };

    std::vector<char> buffer;
    buffer.resize(1024u * 1024u);

    while (working)
    {
        auto length = 0u;

        /* 
            do a bunch of work that takes a long time
            and also determines `length`, in this case I'm 
            adding a sleep to simulate the time between 
            calls to `async_write()`
        */

        std::this_thread::sleep_for(5s);

        boost::asio::async_write(writePipe,
            boost::asio::buffer(buffer.data(), length),
            [&writePipe](boost::system::error_code, size_t)
            {
                // writePipe.close();
            });


        /* 
            I know that this code as-is would have issues with
            synchronizing `buffer`, but for the purpose of this
            question please ignore that
        */
    }
}

Basically I have a buffer of memory in which I'm doing some work and every so often I want to send some binary data to the child process. My child process looks something like this:

child.cpp

#include <iostream>
#include <string_view>

void print_hex(const char* p, std::size_t size)
{
    std::string_view input(p, size);

    static const char* const lut = "0123456789ABCDEF";
    size_t len = input.length();

    std::string output;
    output.reserve(2 * len);
    for (size_t i = 0; i < len; ++i)
    {
        const unsigned char c = static_cast<const unsigned char>(input[i]);
        // output.append("0x");
        output.push_back(lut[c >> 4]);
        output.push_back(lut[c & 15]);
        output.append(" ");
    }

    if (output.size() > 0) output.pop_back();
    std::cout << "HEX (" << size<< "): " << output << std::endl;
}

int main()
{
    std::vector<char> buffer;
    buffer.resize(BUFFER_SIZE);

    bool done = false;
    while (!done)
    {
        auto rdbuf = std::cin.rdbuf();
        while (auto count = rdbuf->sgetn(buffer.data(), BUFFER_SIZE))
        {
            print_hex(buffer.data(), count);
        }
    }
}

With the writePipe.close() commented out, I noticed that my child program never got any data until the server process was terminated. If I instead uncommented the call to close the pipe, then I was able to process the data only from the first time boost::asio::async_write() was called.

EDIT:

Unfortunately the original answer from @sehe did not resolve the issue. I updated the server code a little bit to better illustrate the issue (and I resolved the reserve/resize issue).

However while looking around again, I read some language about sgetn() that said:

The default definition of xsgetn in streambuf retrieves characters from the controlled input sequence and stores them in the array pointed by s, until either n characters have been extracted or the end of the sequence is reached.

So, I refactored my client to first ask the stream how many bytes were available, and then read the stream in chunks. Here was my first attempt:

    bool done = false;
    while (!done)
    {
        auto rdbuf = std::cin.rdbuf();
        const auto available = rdbuf->in_avail();
        if (available == 0)
        {
            continue;
        }

        auto bytesToRead = std::min(BUFFER_SIZE, static_cast<std::uint32_t>(available));
        auto bytesRead = rdbuf->sgetn(buffer.data(), bytesToRead);
        print_hex(buffer.data(), bytesRead);

        while (bytesRead < available)
        {
            bytesToRead = std::min(BUFFER_SIZE, static_cast<std::uint32_t>(available - bytesRead));
            bytesRead += rdbuf->sgetn(buffer.data(), bytesToRead);
            print_hex(buffer.data(), bytesRead);
        }
    }

However, even after adding std::cin.sync_with_stdio(false); (from the answer Why does in_avail() output zero even if the stream has some char?), the call to rdbuf->in_avail() always returns 0. Even if I try outside of my server and on the command line like: ls | client

I'd like my client program to read the data as it's coming in without having to (1) close the server process or (2) close the pipe (unless I can reopen the pipe to do a subsequent write().

Thank you!

1

There are 1 best solutions below

4
On

Oof. I spent an enormous amount of time tweaking the server until it worked.

It turned out there was... a bug in the client.

Basically, where you write .reserve(...) you ought to have put .resize():

buffer.resize(BUFFER_SIZE);

A similar bug also possibly exists in the server, even you don't show the full code, the reserve there seems odd.

Now, I'm not sure just how much changes were actually required in the server part, but let me just put it as I ended up testing it with successfully.

Completely Async

Live On Coliru

#include <boost/asio/io_service.hpp>
#include <boost/process.hpp>
#include <iostream>
#include <random>

static std::mt19937 prng{ std::random_device{}() };

static std::uniform_int_distribution<size_t> lendist(10, 32);
static std::uniform_int_distribution<char> a_z('a', 'z');

static size_t gen_length() { return lendist(prng); }
static char   gen_alpha()  { return a_z(prng);     }

namespace bp = boost::process;
namespace ba = boost::asio;
using namespace std::chrono_literals;

int main() {
    ba::io_service io; // one thread
    //ba::io_service::strand strand(io);
    auto& strand = io;
    bp::async_pipe writePipe(io);
    //bp::child child(bp::exe("/home/sehe/Projects/stackoverflow/child.exe"),
    bp::child child(bp::exe("./child.exe"),
            io,
            bp::std_in < writePipe,
            bp::std_out > "child.log");

    auto shutdown_sequence = [&] {
        std::this_thread::sleep_for(1s);
        std::clog << "Closing" << std::endl;
        post(strand, [&] { writePipe.close(); });
    };

    std::function<void()> work_loop;

    work_loop = [&, buffer = std::vector<char>(1 << 20)]() mutable {
        size_t length = gen_length();
        std::generate_n(buffer.data(), length, gen_alpha);

        async_write(writePipe, bp::buffer(buffer.data(), length), 
            [&strand, &shutdown_sequence, &work_loop](boost::system::error_code ec, size_t tx) {
                std::clog << "Wrote " << tx << " bytes (" << ec.message() << ")" << std::endl;
                if (ec || (tx == 29)) { // magic length indicates "work done"
                    post(strand, shutdown_sequence);
                } else {
                    post(strand, work_loop);
                }
            });
    };

    // start IO pump
    post(strand, work_loop);
    io.run();

    std::clog << "Bye" << std::endl;
}

When run, prints something similar to

./main.exe
Wrote 13 bytes (Success)
Wrote 11 bytes (Success)
Wrote 26 bytes (Success)
Wrote 32 bytes (Success)
Wrote 17 bytes (Success)
Wrote 24 bytes (Success)
Wrote 28 bytes (Success)
Wrote 29 bytes (Success)
Closing
Bye

While also writing a child.log:

HEX (32): 71 74 79 6E 77 74 66 63 74 72 66 6D 6D 69 6E 68 73 61 6F 75 68 77 69 77 6B 65 77 6F 76 6D 76 6E
HEX (32): 68 67 72 79 77 7A 74 68 6A 77 65 63 78 64 66 76 6A 61 64 7A 72 6C 74 6E 63 74 6B 71 64 73 7A 70
HEX (32): 70 73 77 79 75 61 70 7A 6D 73 6F 77 68 71 6A 6B 62 6F 77 63 70 63 6D 74 79 70 70 67 6B 64 75 63
HEX (32): 78 6A 79 65 78 68 74 69 75 7A 67 73 67 63 6D 69 73 65 64 63 67 6C 72 75 72 66 76 79 74 75 61 6F
HEX (32): 76 69 75 6D 73 76 6E 79 72 6C 77 6D 69 6F 74 71 6D 76 77 6F 6E 70 73 77 6C 6B 75 68 76 74 71 74
HEX (20): 79 71 77 77 61 75 71 6A 73 68 67 71 72 7A 77 6C 66 67 74 67