I'm doing the parallel version of Bellman-Ford algorithm in c++ using std::atomic This is my main function executed in multiple threads
void calculateDistances(size_t start, size_t end, const Graph& graph, std::vector<std::atomic<double>>& distances, bool& haveChange)
{
for (size_t source = start; source < end; ++source) {
for (const auto& connection : graph[source]) {
const size_t& destination = connection.destination;
const double& distance = connection.distance;
double oldDistance = distances[destination];
while (distances[source] + distance < oldDistance)
{
if (distances[destination].compare_exchange_strong(oldDistance, distances[source] + distance)) {
haveChange = true;
break;
}
}
}
}
}
Here I'm trying to update distances[destination] with distances[source] + distance if second is smaller.
However both: distances[destination] and distances[source] can be changed in other thread during this operation, so I'm using compare_exchange_strong here.
But even when using this code - data race is still present and some of the iterations are skipped, resulting in failure of algorithm on some input data.
Why is this going on and how I can fix this?
The main function for context
std::vector<std::atomic<double>> BellmanFordAsync(const Graph& graph, size_t tasks, size_t start)
{
size_t nodesCount = graph.getNodesCount();
std::vector<std::atomic<double>> distances(nodesCount);
for (auto& atomic : distances)
atomic = std::numeric_limits<double>::infinity();
distances[start] = 0;
size_t chunkSize = (nodesCount + tasks - 1) / tasks;
std::vector<std::future<void>> return_results;
bool haveChange = true;
for (size_t i = 1; i < nodesCount; ++i)
{
haveChange = false;
for (size_t i = 0; i < tasks; ++i) {
size_t start = i * chunkSize;
size_t end = std::min((i + 1) * chunkSize, nodesCount);
std::future<void> future = std::async(calculateDistances,
start, end, std::cref(graph), std::ref(distances), std::ref(haveChange));
return_results.push_back(std::move(future));
}
for (auto& result : return_results)
result.wait();
return_results.clear();
if (!haveChange) break;
}
bool haveNegativeCycles = false;
for (size_t i = 0; i < tasks; ++i) {
size_t start = i * chunkSize;
size_t end = std::min((i + 1) * chunkSize, nodesCount);
std::future<void> future = std::async(checkNegativeCycles<std::atomic<double>>, start, end, std::cref(graph), std::cref(distances), std::ref(haveNegativeCycles));
return_results.push_back(std::move(future));
}
for (auto& result : return_results) {
result.wait();
if (haveNegativeCycles) return std::vector<double>();
}
return distances;
}