IO multiplexing -- detailed explanation and example of poll

1, poll system call
Poll and select adopt the polling method, that is, each call scans the entire set of registered file descriptors and returns the ready file descriptors to the user program. Therefore, the time complexity of their algorithm for detecting ready events is O(n). However, the way in which poll describes the fd set is different from that of select. Poll uses the pollfd structure instead of the fd of select_ Set structure. Moreover, poll has no limit on the maximum number of connections because it is stored based on a linked list.

2, poll prototype and parameters

#include<poll.h>
int poll(struct pollfd* fds,nfds_t nfds,int timeout);
  • The fds parameter is an array of poll structure types, which specifies the readable, writable and exception events that occur on all file descriptors of interest to us.
struct pollfd
{
     int fd;        //File descriptor
     short events;  //Registered events
     short revents; //Actual events, populated by the kernel
}

The event types supported by poll are as follows:
Usually, the application needs to distinguish the valid data received on the socket from the other party's request to close the connection according to the return value of the recv call, and handle it accordingly. However, since Linux kernel 2.6.17, gun has added a POLLRDHUP event for poll system calls. It is triggered after receiving the other party's request to close the connection on the socket. This provides a simpler way to distinguish between the above two cases, but when using POLLRDHUP events, we need to define them at the beginning of the code_ GUN_SOURCE.

  • The nfds parameter specifies the size of the monitored event collection fds, and its type is nfds_t is defined as follows:
typedef unsigned long int nfds_t;
  • The timeout parameter specifies the timeout value of poll in milliseconds. When timeout returns - 1, the poll call will be blocked forever until an event occurs; When timeout is 0, the poll call returns immediately.

The meaning of the return value of the poll call is the same as that of select.

3, select example: the Client sends a message to the Server, the Server receives the message and sends it to the Client as it is, and the Client outputs the message to the terminal.
Server side:

#include<netinet/in.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<poll.h>
#include<sys/ioctl.h>
#include<sys/time.h>
#include<iostream>
#include<vector>
#include<string>
#include<cstdio>
#include<cstdlib>
#include<cstring>
using namespace std;

#define BUFFER_SIZE 1024
#define MAX_FD 1000

struct PACKET_HEAD
{
    int length;
};

class Server
{
private:
    struct sockaddr_in server_addr;
    socklen_t server_addr_len;
    int listen_fd;  //Monitored fd
    struct pollfd fds[MAX_FD]; //fd array, size 1000
    int nfds;  //Number of connections
public:
    Server(int port);
    ~Server();
    void Bind();
    void Listen(int queue_len=20);
    void Accept();
    void Run();
    void Recv();
};

Server::Server(int port)
{
    bzero(&server_addr,sizeof(server_addr));
    server_addr.sin_family=AF_INET;
    server_addr.sin_addr.s_addr=htons(INADDR_ANY);
    server_addr.sin_port=htons(port);

    //creat socket to listen
    listen_fd =socket(PF_INET,SOCK_STREAM,0);
    if(listen_fd<0)
    {
        cout<<"Create Socket Failed!";
        exit(1);
    }
    int opt=1;
    //The setsockopt() function is used to set option values for sockets of any type and any state.
    setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));   
}

Server::~Server()
{
    for(int i=0;i<MAX_FD;i++)
    {
        close(fds[i].fd);
    }
}

void Server::Bind()
{
    if(-1==bind(listen_fd,(struct sockaddr*)&server_addr,sizeof(server_addr)))
    {
        cout<<"server bind Failed!";
        exit(1);
    }
    cout<<"Bind Successfully!\n";
}

void Server::Listen(int queue_len)
{
    if(-1==listen(listen_fd,queue_len))
    {
        cout<<"Server Listen Failed!";
        exit(1);
    }
    cout<<"Listen Successfully.\n";
}

void Server::Accept()
{
    struct sockaddr_in client_addr;
    socklen_t client_addr_len=sizeof(client_addr);

    int new_fd=accept(listen_fd,(struct sockaddr*)&client_addr,&client_addr_len);
    if(new_fd<0)
    {
        cout<<"Server Accept Failed!";
        exit(1);
    }

    cout<<"new connection was accepted.\n";

    //Add the newly connected fd to fds []
    int i;
    for(i=1;i<MAX_FD;++i)
    {
        if(fds[i].fd<0)
        {
            fds[i].fd=new_fd;
            break;
        }
    }

    //Maximum number of connections exceeded
    if(i==MAX_FD)
    {
        cout<<"Too many clients.\n";
        exit(1);
    }

    fds[i].events=POLLIN;  //Sets the read event for the new descriptor
    nfds=i>nfds?i:nfds;    //Update connections
}

void Server::Run()
{
    fds[0].fd=listen_fd; //Add listening descriptor
    fds[0].events=POLLIN;
    nfds=0;

    for(int i=1;i<MAX_FD;++i)
        fds[i].fd=-1;

    while(1)
    {
        int nums=poll(fds,nfds+1,-1);
        if(nums<0)
        {
            cout<<"poll() error!";
            exit(1);
        }

        if(nums==0)
        {
            continue;
        }

        if(fds[0].revents&POLLIN)
            Accept();   //There are new client requests
        else
            Recv();
    }
}

void Server::Recv()
{
    for(int i=0;i<MAX_FD;++i)
    {
        if(fds[i].fd<0)
            continue;

        if(fds[i].revents & POLLIN)  //Readready POLLIN: data readable
        {
            int fd=fds[i].fd;
            bool close_conn=false;   //Marks whether the current connection is disconnected

            PACKET_HEAD head;
            recv(fd,&head,sizeof(head),0); //First accept the header, that is, the total length of the data

            char* buffer=new char[head.length];
            bzero(buffer,head.length);
            int total=0;
            while(total<head.length)
            {
                int len=recv(fd,buffer+total,head.length-total,0);
                if(len<0)
                {
                    cout<<"recv() error";
                    close_conn=true;
                    break;
                }
                
                total+=len;
            }

            if(total==head.length) //Send the received message back to the client as it is
            {
                int ret1=send(fd,&head,sizeof(head),0);
                int ret2=send(fd,buffer,head.length,0);
                if(ret1<0||ret2<0)
                {
                    cout<<"send() error!";
                    close_conn=true;
                }
                
            }

            delete buffer;

            if(close_conn) //There is a problem with this connection. Close it
            {
                close(fd);
                fds[i].fd=-1;
            }
        }
    }
}

int main()
{
    Server server(15000);
    server.Bind();
    server.Listen();
    server.Run();
    return 0;
}

The client code is consistent with the select client code, which can be viewed in the previous select article.

Keywords: C++ Linux pointer

Added by Branden Wagner on Wed, 24 Nov 2021 09:18:16 +0200