I'm trying to write a single producer, multiple consumer pipeline, where the consumers run in parallel threads. Or to find or share a simple example of that sort. With relatively easy code in Go, the output clearly shows the consumers work in parallel. I thought it might be similar with Boost 1.73 fibers, but I can't get beyond this code which (unsurprisingly) works sequentially:
#include <boost/fiber/buffered_channel.hpp>
#include <boost/fiber/fiber.hpp>
static void process(int item) {
std::cout << "consumer processing " << item << std::endl;
auto wasteOfTime = 0.;
for (auto s = 0.; s < item; s += 1e-7) {
wasteOfTime += sin(s);
}
if (wasteOfTime != 42) {
std::cout << "consumer processed " << item << std::endl;
}
}
static const std::uint32_t workers = 3;
int main() {
boost::fibers::buffered_channel<int> channel { 2 };
boost::fibers::fiber consumer[workers];
for (int i = 0; i < workers; ++i) {
consumer[i] = boost::fibers::fiber([&channel]() {
for (auto item : channel) {
process(item);
}
});
}
auto producer = boost::fibers::fiber([&channel]() {
std::cout << "producer starting" << std::endl;
channel.push(1);
channel.push(2);
channel.push(3);
channel.close();
std::cout << "producer ending" << std::endl;
});
producer.join();
for (int i = 0; i < workers; ++i) {
consumer[i].join();
}
return 0;
}
I tried to insert many variations of code fragments to get worker threads to schedule the fibers, but they always execute sequentially or not at all. The code from a question about the inverse problem seems a step in the right direction, though much more complicated than Go, but (when compiled with -DM_1_PI=3.14) that program also just sits idle for me.
Turns out I misinterpreted boost's code fragment for scheduling fibers. It does seem to work for me like this:
Beware that this might not be correct. It also does not schedule reliable onto all threads (at least on Windows).
It is in fact pretty much the same as the code in a question about the inverse problem, but that needed an update. Also, I find that using a condition variable instead of barriers isn't reliable, because the main thread may run ahead of the other threads.
I didn't model the initialization and cleanup as a class, to avoid the illusion you could instantiate it multiple times (in sequence or in parallel).
work_stealing
's constructor uses global variables. You cannot end the fiber-based task cleanly, and then start another fiber-based task, not even if coding everything in temporary threads and not tainting the main thread in any way.