I've been attempting to solve the longest common subsequence problem using multiprocessing and multi-threading, and I have implemented a multiprocess version of the code, using the usual dynamic programming approach: generate a score matrix, each element depends on the one to its left, north-west and directly above.
In my multiprocess approach, I have adopted propagating the wavefront along the anti-diagonals of the score matrix, and to make life easy, I have performed a shear transform on said score matrix, so that each antidiagonal is now horizontal (this is for improved memory access):
Following is my code (admittedly rather long, which allows for some set-up):
#include <algorithm>
#include <atomic>
#include <cstring>
#include <fcntl.h>
#include <fstream>
#include <iostream>
#include <string>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <unistd.h>
#include <x86intrin.h>
#define LOGICAL_CORES (int) sysconf(_SC_NPROCESSORS_CONF) /* Number of
logical cores on system: indirectly determines number of processes run */
#define MAX_WORK_SIZE 256 /* Maximum amount
of work for each worker process */
#define NUM_ANTIDIAGS (X + Y + 1) /* Number of anti-
diagonals to process */
#define ANTIDIAG_SIZE std::max(X, Y) /* Length of each
anti-diagonal of the score matrix */
#define ANTIDIAG_REAL_SIZE ((ANTIDIAG_SIZE + MAX_WORK_SIZE - 1) & -MAX_WORK_SIZE) /* Length of each
anti-diagonal in memory: a multiple of MAX_WORKER_SIZE */
#define NUM_WORKERS (ANTIDIAG_REAL_SIZE / MAX_WORK_SIZE) /* Total number of
worker processes */
// The sizes of the input strings
u_int32_t X, Y;
u_int32_t *back; /* The back anti-diagonal, read from */
u_int32_t *middle; /* The middle antidiagonal, read from */
u_int32_t *front; /* The front antidiagonal, written to */
struct sync_container {
pthread_barrier_t barrier; /* A barrier, to ensure all threads are synchronised */
pthread_barrierattr_t barrierattr; /* Barrier attributes */
std::vector<pid_t> pids; /* A list of process IDs of the worker processes */
};
u_int32_t *data;
sync_container *sync_data;
std::string seq_1;
std::string seq_2;
void read_files(std::ifstream f1, std::ifstream f2)
{
if (f1.fail() || f2.fail())
{
std::cout << "Error reading files; exiting." << std::endl;
exit(EXIT_FAILURE);
}
f1 >> X >> seq_1;
f1.close();
f2 >> Y >> seq_2;
f2.close();
}
void shm_setup()
{
data = reinterpret_cast<u_int32_t *>(mmap(nullptr, 4 * ANTIDIAG_REAL_SIZE * sizeof(uint32_t),
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, 0, 0));
back = reinterpret_cast<u_int32_t *>(data);
middle = back + ANTIDIAG_REAL_SIZE;
front = middle + ANTIDIAG_REAL_SIZE;
memset(back, 0, ANTIDIAG_REAL_SIZE * sizeof(u_int32_t));
memset(middle, 0, ANTIDIAG_REAL_SIZE * sizeof(u_int32_t));
memset(front, 0, ANTIDIAG_REAL_SIZE * sizeof(u_int32_t));
sync_data = static_cast<sync_container *>(mmap(nullptr, sizeof(sync_container),
PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, 0, 0));
}
void cleanup()
{
munmap(data, 3 * ANTIDIAG_REAL_SIZE * sizeof(u_int32_t));
munmap(sync_data, sizeof(sync_container));
exit(0);
}
int main(int argc, char **argv)
{
if (argc != 3)
{
std::cout << "Usage: [executable] [file 1] [file 2]" << std::endl;
return 1;
}
read_files(std::ifstream(argv[1], std::ifstream::in),
std::ifstream(argv[2], std::ifstream::in));
// Initialise shared memory and arrays
shm_setup();
// Initialise barrier
pthread_barrierattr_init(&sync_data->barrierattr);
pthread_barrierattr_setpshared(&sync_data->barrierattr, PTHREAD_PROCESS_SHARED);
pthread_barrier_init(&sync_data->barrier, &sync_data->barrierattr, NUM_WORKERS + 1);
int pid = 0;
int worker_id = 0;
for (; worker_id < NUM_WORKERS; ++worker_id)
{
pid = fork();
if (pid) sync_data->pids[worker_id] = pid;
else
break;
}
pthread_barrier_wait(&sync_data->barrier);
for (int antidiag_idx = 2; antidiag_idx < NUM_ANTIDIAGS; ++antidiag_idx)
{
pthread_barrier_wait(&sync_data->barrier);
if (!pid) // worker processes go here
{
for (int element = MAX_WORK_SIZE * worker_id; element < (antidiag_idx * worker_id) + MAX_WORK_SIZE; ++element)
{
if (!element || element >= ANTIDIAG_SIZE) continue;
char vert = seq_1[antidiag_idx - 1 - element];
char horz = seq_2[element - 1];
front[element] = horz == vert ? back[element - 1] + 1
: std::max(middle[element - 1], middle[element]);
}
}
if (pid) // parent process moves pointers
{
back = middle;
middle = front;
front = back;
}
pthread_barrier_wait(&sync_data->barrier);
}
if (!pid) exit(0);
std::cout << middle[ANTIDIAG_SIZE] << std::endl;
cleanup();
}
Now, this code does not work. This is strange, because with a small input size (specifically, < 256
), this code only spawns one worker process and one parent process to manage it, and it still fails.
However, when the fork()
, various pthread_barrier_wait()
calls, and if (pid)
control flow paths are removed in the for
loop, the code executes perfectly and returns the correct expected length of the LCS between two strings specified in the input files. In other words, it degenerates into effectively a single-threaded, single-process version of the dynamic programming solution, but with the shear transform thing.
There is clearly an issue with my synchronisation, and I can't figure out where it is. I've tried several permutations of adding more pthread_barrier_wait()
s, but this hasn't led anywhere.
Where is the synch issue, and how may I fix it?