I want to implement a message queue between processes. I used the mmap interface for persistence, while also using pthread_mutex_ and pthread_cond_t to ensure process synchronization. But I found that if one of the processes crashes after calling pthread_cond_wait to enter the conditional wait, other processes will permanently block when using pthread_cond_broadcast
I know that pthread_mutex_t has a robust attribute and consistency can be ensured through the pthread_mutex_consistent interface. How to ensure the consistency of pthread_cond_t
#include <stdio.h>
#include <errno.h>
#include <assert.h>
#include <pthread.h>
#include "message_block.h"
#define MAX_MEM_SIZE (1 * 1024* 1024)
typedef struct
{
int npos;
int clen;
char ptr[0];
} Msg_Node;
typedef struct
{
unsigned char use;
unsigned int idle[2];
unsigned int busy[2];
pthread_mutex_t idle_mtx;
pthread_mutex_t busy_mtx;
pthread_cond_t idle_cond;
pthread_cond_t busy_cond;
} Msg_Entry;
typedef struct
{
int fd;
char *addr;
char *base;
Msg_Entry *entry;
} Msg_Blck;
void* MB_Init(char* filename)
{
Msg_Blck *msgB = NULL;
Msg_Node *msgN = NULL;
pthread_mutexattr_t mtxattr;
pthread_condattr_t condattr;
msgB = (Msg_Blck*)calloc(1, sizeof(Msg_Blck));
if (NULL == msgB)
{
return NULL;
}
msgB->fd = open(filename, O_CREAT|O_RDWR, 0777);
if (-1 == msgB->fd)
{
perror("open failed:");
goto ERR;
}
if (-1 == lseek(msgB->fd, (off_t)MAX_MEM_SIZE, SEEK_SET))
{
perror("lseek failed:");
goto ERR;
}
write(msgB->fd, "", 1);
msgB->addr = mmap(NULL, MAX_MEM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, msgB->fd, 0);
if (MAP_FAILED == msgB->addr)
{
printf("mmap failed! \n");
goto ERR;
}
msgB->base = msgB->addr + sizeof(Msg_Entry);
msgB->entry = (Msg_Entry*)(msgB->addr);
if (__sync_bool_compare_and_swap(&(msgB->entry->use), 0, 1))
{
msgN = (Msg_Node*)(msgB->base + 1);
msgN->npos = 0;
msgN->clen = MAX_MEM_SIZE - sizeof(Msg_Entry) - 1;
msgB->entry->idle[0] = 1;
msgB->entry->idle[1] = 1;
msgB->entry->busy[0] = 0;
msgB->entry->busy[1] = 0;
pthread_mutexattr_init(&mtxattr);
pthread_mutexattr_setpshared(&mtxattr, PTHREAD_PROCESS_SHARED);
pthread_mutexattr_setrobust(&mtxattr, PTHREAD_MUTEX_ROBUST);
pthread_mutex_init(&(msgB->entry->idle_mtx), &mtxattr);
pthread_mutex_init(&(msgB->entry->busy_mtx), &mtxattr);
pthread_mutexattr_destroy(&mtxattr);
pthread_condattr_init(&condattr);
pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED);
pthread_cond_init(&(msgB->entry->idle_cond), &condattr);
pthread_cond_init(&(msgB->entry->busy_cond), &condattr);
pthread_condattr_destroy(&condattr);
msgB->entry->use = 1;
}
return msgB;
ERR:
if (0 <= msgB->fd)
{
close(msgB->fd);
msgB->fd = -1;
}
if (msgB->addr)
{
munmap(msgB->addr, MAX_MEM_SIZE);
msgB->addr = NULL;
}
return NULL;
}
char* MB_Get_Empty(void *blk, int size)
{
assert(blk && (0 < size));
Msg_Blck *msgB = (Msg_Blck*)blk;
Msg_Node *msgC = NULL;
Msg_Node *msgH = NULL;
Msg_Node *msgT = NULL;
int length = sizeof(Msg_Node) + size;
if (EOWNERDEAD == pthread_mutex_lock(&(msgB->entry->idle_mtx)))
{
pthread_mutex_consistent(&(msgB->entry->idle_mtx));
}
while (msgH = (Msg_Node*)(msgB->base + msgB->entry->idle[0]), msgH->clen < length + sizeof(Msg_Node))
{
if (0 < msgH->npos)
{
msgC = (Msg_Node*)(msgB->base + msgB->entry->idle[0]);
msgT = (Msg_Node*)(msgB->base + msgB->entry->idle[1]);
msgH = (Msg_Node*)(msgB->base + msgC->npos);
msgT->npos = (int)((char*)msgC - msgB->base);
msgT = msgC;
msgT->npos = 0;
msgB->entry->idle[0] = (int)((char*)msgH - msgB->base);
msgC = msgH;
while (0 < msgC->npos)
{
Msg_Node *msgN = (Msg_Node*)(msgB->base + msgC->npos);
if (msgB->entry->idle[0] + msgH->clen == msgC->npos)
{
msgC->npos = msgN->npos;
msgH->clen += msgN->clen;
msgC = msgH;
continue;
}
else if (msgC->npos + msgN->clen == msgB->entry->idle[0])
{
msgB->entry->idle[0] = (int)((char*)msgN - msgB->base);
msgC->npos = msgN->npos;
msgN->npos = msgH->npos;
msgN->clen += msgH->clen;
msgH = msgN;
msgC = msgH;
continue;
}
msgC = (Msg_Node*)(msgB->base + msgC->npos);
}
msgB->entry->idle[1] = (int)((char*)msgC - msgB->base);
continue;
}
pthread_cond_wait(&(msgB->entry->idle_cond), &(msgB->entry->idle_mtx));
}
msgC = msgH;
msgH = (Msg_Node*)((char*)msgC + length);
msgH->npos = msgC->npos;
msgH->clen = msgC->clen - length;
msgB->entry->idle[0] += length;
if (0 == msgH->npos)
{
msgB->entry->idle[1] += length;
}
msgC->npos = 0;
msgC->clen = length;
pthread_mutex_unlock(&(msgB->entry->idle_mtx));
return msgC->ptr;
}
void MB_Put_Empty(void *blk, char *ptr)
{
assert(blk && ptr);
Msg_Blck *msgB = (Msg_Blck*)blk;
Msg_Node *msgT = NULL;
Msg_Node *msgN = NULL;
if (EOWNERDEAD == pthread_mutex_lock(&(msgB->entry->idle_mtx)))
{
pthread_mutex_consistent(&(msgB->entry->idle_mtx));
}
msgT = (Msg_Node*)(msgB->base + msgB->entry->idle[1]);
msgN = (Msg_Node*)(ptr - sizeof(Msg_Node));
msgB->entry->idle[1] = (int)((char*)msgN - msgB->base);
if (msgB->base == (char*)msgT)
{
msgB->entry->idle[0] = msgB->entry->idle[1];
}
else
{
msgT->npos = msgB->entry->idle[1];
}
pthread_mutex_unlock(&(msgB->entry->idle_mtx));
pthread_cond_broadcast(&(msgB->entry->idle_cond));
}
char* MB_Get_Full(void *blk)
{
assert(blk);
Msg_Blck *msgB = (Msg_Blck*)blk;
Msg_Node *msgH = NULL;
if (EOWNERDEAD == pthread_mutex_lock(&(msgB->entry->busy_mtx)))
{
pthread_mutex_consistent(&(msgB->entry->busy_mtx));
}
while (msgH = (Msg_Node*)(msgB->base + msgB->entry->busy[0]), msgB->base == (char*)msgH)
{
pthread_cond_wait(&(msgB->entry->busy_cond), &(msgB->entry->busy_mtx));
}
msgB->entry->busy[0] = msgH->npos;
if (0 == msgH->npos)
{
msgB->entry->busy[1] = 0;
}
pthread_mutex_unlock(&(msgB->entry->busy_mtx));
return msgH->ptr;
}
void MB_Put_Full(void *blk, char *ptr)
{
assert(blk && ptr);
Msg_Blck *msgB = (Msg_Blck*)blk;
Msg_Node *msgT = NULL;
Msg_Node *msgN = NULL;
if (EOWNERDEAD == pthread_mutex_lock(&(msgB->entry->busy_mtx)))
{
pthread_mutex_consistent(&(msgB->entry->busy_mtx));
}
msgT = (Msg_Node*)(msgB->base + msgB->entry->busy[1]);
msgN = (Msg_Node*)(ptr - sizeof(Msg_Node));
msgB->entry->busy[1] = (int)((char*)msgN - msgB->base);
if (msgB->base == (char*)msgT)
{
msgB->entry->busy[0] = msgB->entry->busy[1];
}
else
{
msgT->npos = msgB->entry->busy[1];
}
pthread_mutex_unlock(&(msgB->entry->busy_mtx));
pthread_cond_broadcast(&(msgB->entry->busy_cond));
}