I'm writting a simple server with libevent. Main thread create kWorkerThreadCounts child threads with dirrent eventbase, main thread keep a eventbase with doAccept callback.
On accept, main thread will create new bufferevent on child eventbase and register read callback, child thread read callback will simply echo message back, I guess this is so called one loop perthread?
But I meet a strange question: The more child threads count, the slower to receive child message. I test with kWorkerThreadCounts equal 1, client side latency shows that average latency is 500us, But If I modified kWorkerThreadCounts to 8, latency will become 8k us.
I modified some code to use epoll directly with 8 client thread to echo data, It perform event better
I paste some code below, Any ideal what is the reason?
main thread, start io thread,
int main(int argc, char *argv[]) {
evthread_use_pthreads();
for (int i = 0; i < kWorkerThreadCounts; ++i) {
...
int result;
result = pthread_create(&globalIOWorkerThreads[i], NULL, ioWorkerThreadFunction, globalIOWorkerContexts[i]);
if (result != 0) {
perror("Thread creation failed");
exit(-1);
}
}
evutil_socket_t listener;
listener = socket(AF_UNIX, SOCK_STREAM, 0);
assert(listener > 0);
evutil_make_listen_socket_reuseable(listener);
struct sockaddr_un sun;
sun.sun_family = AF_UNIX;
strcpy(sun.sun_path, SOCKET_PATH);
// remove might exist old file
unlink(sun.sun_path);
if (bind(listener, (struct sockaddr *)&sun, sizeof(sun)) < 0) {
perror("bind");
return 1;
}
if (listen(listener, kListenBackLog) < 0) {
perror("listen");
return 1;
}
printf("Listening...\n");
evutil_make_socket_nonblocking(listener);
struct event_config *config= event_config_new();
assert(config != NULL);
event_config_require_features(config, EV_FEATURE_ET | EV_FEATURE_O1);
event_config_set_flag(config, EVENT_BASE_FLAG_NOLOCK);
struct event_base *base = event_base_new_with_config(config);
assert(base != NULL);
struct event *listen_event;
listen_event = event_new(base, listener, EV_READ|EV_PERSIST, doAccept, (void*)base);
event_add(listen_event, NULL);
// printf("Main thread start listening.\n");
event_base_dispatch(base);
event_config_free(config);
// printf("Main thread stop running.\n");
return 0;
}
void doAccept(evutil_socket_t listener, short event, void *arg) {
struct event_base *base = (struct event_base *)arg;
evutil_socket_t fd;
struct sockaddr_un sun;
socklen_t slen;
fd = accept(listener, (struct sockaddr *)&sun, &slen);
if (fd < 0) {
perror("accept");
return;
}
evutil_make_socket_nonblocking(fd);
// dispatch bufferevent to worker thread
int value = atomic_fetch_add(&counter, 1);
struct IOWorkerContextWrapper *io_context = globalIOWorkerContexts[value%kWorkerThreadCounts];
struct event_base *io_worker_base = globalEventBases[value%kWorkerThreadCounts];
struct bufferevent *bev = bufferevent_socket_new(io_worker_base, fd, BEV_OPT_CLOSE_ON_FREE);
// we need context to insert complicated data
bufferevent_setcb(bev, ioWorkerReadCallBack, NULL, ioWorkerErrorCallBack, (void*)io_context);
bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);
}
client thread
void ioWorkerReadCallBack(struct bufferevent *bev, void *arg) {
int n;
uint64_t readed_len = 0;
struct evbuffer *buf = bufferevent_get_input(bev);
// varint u32 encode
// client side is two byte 0xa1,0xa1 with varint u32 header length + header data
while(evbuffer_get_length(buf) >= HEADER_LENGTH) {
uint8_t *read_buf = malloc(kMaxBufferLength);
assert(read_buf != NULL);
memset(read_buf, 0, kMaxBufferLength);
evbuffer_copyout(buf, read_buf, HEADER_LENGTH);
uint32_t header_length;
int header_length_length = parse_varint_u32(read_buf+2, 5, &header_length);
if (header_length_length < 0) {
// printf("Message encoded header length invalid\n");
return;
}
if (evbuffer_get_length(buf) < (2 + header_length + header_length_length)) {
// printf("evbuffer length is too short to hold header\n");
return;
}
uint32_t total_message_length = 2 + header_length_length + header_length;
evbuffer_remove(buf, read_buf, total_message_length);
readed_len = total_message_length;
bufferevent_write(bev, read_buf, total_message_length);
}
}