Libevent: Multi Thread Event Base Loop Response Is Slow

18 Views Asked by At

I'm writting a simple server with libevent. Main thread create kWorkerThreadCounts child threads with dirrent eventbase, main thread keep a eventbase with doAccept callback.

On accept, main thread will create new bufferevent on child eventbase and register read callback, child thread read callback will simply echo message back, I guess this is so called one loop perthread?

But I meet a strange question: The more child threads count, the slower to receive child message. I test with kWorkerThreadCounts equal 1, client side latency shows that average latency is 500us, But If I modified kWorkerThreadCounts to 8, latency will become 8k us.

I modified some code to use epoll directly with 8 client thread to echo data, It perform event better

I paste some code below, Any ideal what is the reason?

main thread, start io thread,

int main(int argc, char *argv[]) {
    evthread_use_pthreads();

    for (int i = 0; i < kWorkerThreadCounts; ++i) {
...
    int result;

    result = pthread_create(&globalIOWorkerThreads[i], NULL, ioWorkerThreadFunction, globalIOWorkerContexts[i]);
    if (result != 0) {
        perror("Thread creation failed");
        exit(-1);
    }
  }

  evutil_socket_t listener;
  listener = socket(AF_UNIX, SOCK_STREAM, 0);
  assert(listener > 0);
  evutil_make_listen_socket_reuseable(listener);

  struct sockaddr_un sun;
  sun.sun_family = AF_UNIX;
  strcpy(sun.sun_path, SOCKET_PATH);
  // remove might exist old file
  unlink(sun.sun_path);

  if (bind(listener, (struct sockaddr *)&sun, sizeof(sun)) < 0) {
      perror("bind");
      return 1;
  }

  if (listen(listener, kListenBackLog) < 0) {
      perror("listen");
      return 1;
  }

  printf("Listening...\n");

  evutil_make_socket_nonblocking(listener);

  struct event_config *config= event_config_new();
  assert(config != NULL);
  event_config_require_features(config, EV_FEATURE_ET | EV_FEATURE_O1);
  event_config_set_flag(config, EVENT_BASE_FLAG_NOLOCK);

  struct event_base *base = event_base_new_with_config(config);
  assert(base != NULL);
  struct event *listen_event;
  listen_event = event_new(base, listener, EV_READ|EV_PERSIST, doAccept, (void*)base);
  event_add(listen_event, NULL);
  // printf("Main thread start listening.\n");
  event_base_dispatch(base);
  event_config_free(config);

  // printf("Main thread stop running.\n");
  return 0;
}


void doAccept(evutil_socket_t listener, short event, void *arg) {
    struct event_base *base = (struct event_base *)arg;
    evutil_socket_t fd;
    struct sockaddr_un sun;
    socklen_t slen;
    fd = accept(listener, (struct sockaddr *)&sun, &slen);
    if (fd < 0) {
        perror("accept");
        return;
    }
    
    evutil_make_socket_nonblocking(fd);
    
    // dispatch bufferevent to worker thread
    int value = atomic_fetch_add(&counter, 1);

    struct IOWorkerContextWrapper *io_context = globalIOWorkerContexts[value%kWorkerThreadCounts];
    struct event_base *io_worker_base = globalEventBases[value%kWorkerThreadCounts];
    struct bufferevent *bev = bufferevent_socket_new(io_worker_base, fd, BEV_OPT_CLOSE_ON_FREE);
    // we need context to insert complicated data
    bufferevent_setcb(bev, ioWorkerReadCallBack, NULL, ioWorkerErrorCallBack, (void*)io_context);
    bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST);
}

client thread

void ioWorkerReadCallBack(struct bufferevent *bev, void *arg) {

  int n;
  uint64_t readed_len = 0;

  struct evbuffer *buf = bufferevent_get_input(bev);

  // varint u32 encode
  // client side is two byte 0xa1,0xa1 with varint u32 header length + header data
  while(evbuffer_get_length(buf) >= HEADER_LENGTH) {

    uint8_t *read_buf = malloc(kMaxBufferLength);
    assert(read_buf != NULL);
    memset(read_buf, 0, kMaxBufferLength);

    evbuffer_copyout(buf, read_buf, HEADER_LENGTH);
    uint32_t header_length;
    int header_length_length = parse_varint_u32(read_buf+2, 5, &header_length);
    if (header_length_length < 0) {
      // printf("Message encoded header length invalid\n");
      return;
    }

    if (evbuffer_get_length(buf) < (2 + header_length + header_length_length)) {
      // printf("evbuffer length is too short to hold header\n");
      return;
    }

    uint32_t total_message_length = 2 + header_length_length + header_length;
    evbuffer_remove(buf, read_buf, total_message_length);
    readed_len = total_message_length;
    bufferevent_write(bev, read_buf, total_message_length);
  }
}



0

There are 0 best solutions below