Is it possible to use EPOLL to send tasks to multiple client

618 Views Asked by At

I am building a server/client application. The server will need to deal with less than 1000+ clients. I currently have a simple EPOLL implementation to be able to receive information from the clients to the server. However, I will need to be able to send tasks to specific clients as well. My question is there a way to use EPOLL to identify which clients I need to send a task too (potentially using the EPOLLOUT flag) and send the message out. I've attached the snippet of what I currently have. If possible, how would I go about implementing this. It would be great to be able to have one epoll for all the sending an the receiving if it could work like that.

Thanks for any help/recommendations!

void epoll(int listening_port)
{
    char buffer[500];       //buffer for message
    int listen_sock = 0;    //file descriptor (fd) for listening socket
    int conn_sock = 0;      //fd for connecting socket
    int epollfd = 0;         // fd for epoll
    int nfds = 0;           //number of fd's ready for i/o
    int i = 0;              //index to which file descriptor we are lookng at
    int curr_fd = 0;        //fd for socket we are currently looking at
    bool loop = 1;          //boolean value to help identify whether to keep in loop or not
    socklen_t address_len;
    struct sockaddr_in serv_addr;
    struct epoll_event ev, events[EPOLL_MAX_EVENTS];
    ssize_t result = 0;


    bzero(buffer, sizeof(buffer));
    bzero(&serv_addr, sizeof(serv_addr));

    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = listening_port;
    serv_addr.sin_addr.s_addr = INADDR_ANY;

    listen_sock = create_socket();


    if(bind(listen_sock, SA &serv_addr, sizeof(serv_addr)) != 0)
    {
        perror("Bind failed");
    }
    else
    {
        printf("Bind successful\n");
    }


    set_socket_nonblocking(listen_sock);

    listen_on_socket(listen_sock, SOMAXCONN); //specifying max connections in backlog

    epollfd = initialize_epoll();

    ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
    ev.data.fd = listen_sock;

    if(epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == ERROR)
    {
        perror("Epoll_ctl: listen sock");
    }
    else
    {
        printf("Successfully added listen socket to epoll\n");
    }

    while (RUN_EPOLL)
    {
        nfds = epoll_wait(epollfd, events, EPOLL_MAX_EVENTS, 0); //waiting for incoming connection;
        if(nfds == ERROR)
        {
            perror("EPOLL_Wait");
        }
        //printf("Finished waiting\i");

        for(i = 0; i < nfds; ++i)
        {
            curr_fd = events[i].data.fd;
            loop = true; //reset looping flag
            //Notification from Listening Socket - Process Incoming Connections

            if (curr_fd == listen_sock) {

                while(loop)
                {
                    conn_sock = accept(listen_sock, SA &serv_addr, &address_len); //accept incoming connection
                    printf("Accepted new incoming connection - socket fd: %d\n", conn_sock);
                    if (conn_sock > 0) //if successful set socket nonblocking and add it to epoll
                    {

                        set_socket_nonblocking(conn_sock);

                        ev.events = EPOLLIN | EPOLLOUT| EPOLLET | EPOLLRDHUP; //setting flags
                        ev.data.fd = conn_sock; //specify fd of new connection in event to follow
                        if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock, &ev) == ERROR) //add fd to monitored fd's
                        {
                            perror("epoll_ctl: conn_sck");
                        }
                        else
                        {
                            printf("Added %d to monitor list\n", conn_sock);
                        }


                    }
                    else if (conn_sock == ERROR)
                    {
                        if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
                        {
                            printf("All incoming connections processed\n");
                            loop = false;
                        }
                        else
                        {
                            perror("Accept remote socket");
                            loop = false;
                        }
                    }
                }
            }
            else if(events[i].events & EPOLLRDHUP) //detecting if peer shutdown
            {
                printf("Detected socket peer shutdown. Closing now. \n");

                if (epoll_ctl(epollfd, EPOLL_CTL_DEL, curr_fd, NULL) == ERROR) {
                    perror("epoll_ctl: conn_sck");
                }

                close_socket(curr_fd);
            }
            else if(events[i].events & EPOLLIN)
            {
                while(loop)
                {
                    result = recv(curr_fd, buffer, sizeof(buffer), 0);
                    //printf("Length of incoming message is %d\i", result);

                    if(result > 0) //
                    {
                        printf("File Descriptor: %d. Message: %s\n", curr_fd, buffer);
                        bzero(buffer, sizeof(buffer));
                    }
                    else if(result == ERROR) //Message is completely sent
                    {

                        if(errno == EAGAIN)
                        {


                            send_message(curr_fd, "Success", strlen("Success"));
                            loop = false;

                        }
                    }
                    else if(result == 0)
                    {
                        //Removing the fd from the monitored descriptors in epoll
                        if (epoll_ctl(epollfd, EPOLL_CTL_DEL, curr_fd, NULL) == ERROR) {
                            perror("epoll_ctl: conn_sck");
                        }
                        close_socket(curr_fd); //Closing the fd
                        loop = false;
                    }

                }

            }
        }

    }

    close_socket(listen_sock);
    //need to develop way to gracefully close out of epoll

    return;

}
0

There are 0 best solutions below