High concurrency learning in Linux -- reactor implementation of epoll

Reactor mode

At present, the mainstream network communication libraries, such as libevent of C/C + + and Netty of Java, use reactor mode. Reactor pattern is a design pattern of event processing. After I/O requests arrive, the service handler uses I/O reuse technology to synchronously send these requests to the relevant request handlers.
How to specify the Reactor mode? We take the hotel operation mode as an example:
Hotels are servers, and customers are I/O requests. Obviously, almost most hotels will not assign a waiter to each customer, which is too expensive. The restaurant adopting Reactor mode does this: generally, a waiter receives customers who come in for dinner, and there are several tables of customers received by the waiter, When customers have needs (such as ordering and checkout), tell the waiter, and the waiter will transfer these needs to the corresponding staff for processing (such as ordering to the kitchen and checkout to the cashier). In this way, even if customers are full (I/O events are frequent), the hotel can still operate in an orderly manner.

Specifically, "non blocking IO+IO multiplexing" is used. The basic structure is an event loop, which implements business logic in the way of event driven and event callback, that is, Reactor mode.
The pseudo code is as follows:

while(!done)
{
     int nready = epoll_wait(epollfd, epollEvents, EPOLL_EVENT_SIZE, timeout);
     if(nready < 0){
          // Processing error
     }
     else{
          // Handle the expired timer and call back the user's timer handler
          if(nready > 0){
               // Handle IO events and call back the user's IO handler
          }
     }
}

Reactor implementation of Epoll

In the design of reactor mode, we must first design a structure with perfect functions. This structure must at least include basic socket FD, callback function of read-write events, etc. in the subsequent epoll_ When wait returns events, the bound callback function can be called directly according to the event type. The structural variables designed by this routine are as follows:

typedef int (*callBack)(int, int, void*);

typedef struct _event_item 
{
     int fd;
     int events;
     void *args;
     callBack rhandle;     // Read event callback
     callBack whandle;     // Write event callback
     
     unsigned char sendBuf[BUFFER_SIZE];
     int sendLen;
     unsigned char recvBuf[BUFFER_SIZE];
     int recvLen;
} event_item;

typedef struct _reactor
{
     int epollfd;
     event_item *events;
}reactor;

Where event_item is a basic node, corresponding to a socketfd, as well as related read-write callback and send / receive cache; reactor node is used to manage epoll_ epollfd and all events created by create()_ Item node, event_ In this routine, item * events is an array with a size of 512, that is, it can support 512 TCP connections at most. If more connections are supported later, readers can use the linked list to organize the structure.

Like the original server model, we first need to create the socket to listen state. We use an init_ The function of socket() returns the socket in listening status for subsequent use:

sockfd = init_socket(SERVER_PORT);

int init_socket(short port){
     int reuseAddr = 1;
     int fd = socket(AF_INET, SOCK_STREAM, 0);
     if(fd < 0)
     {
          printf("errno: %s, socket() failed!\n", strerror(errno));
          return -1;
     }
     if(fcntl(fd, F_SETFL, O_NONBLOCK) < 0){
          printf("errno: %s, fcntl() failed!\n", strerror(errno));
          return -1;
     }

     struct sockaddr_in serverAddr;
     memset(&serverAddr, 0, sizeof(serverAddr));
     serverAddr.sin_family = AF_INET;
     serverAddr.sin_port = htons(port);
     serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);

     setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&reuseAddr, sizeof(reuseAddr));
     if(bind(fd, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0){
          printf("errno: %s, bind() failed!\n", strerror(errno));
          return -1;
     }

     if(listen(fd, MAX_LISTEN_BAGS) < 0){
          printf("errno: %s, listen() failed!\n", strerror(errno));
          return -1;
     }

     return fd; 
}

After we get the listening socket, we can then get a new socket connection. Before that, we initialize the structure variable set above:

int init_reactor(reactor *r){
     if(r == NULL)  return -1;
     
     memset(r, 0, sizeof(reactor));
     r->epollfd = epoll_create(1);
     if(r->epollfd <= 0){
          printf("errno: %s, epoll_create() failed!\n", strerror(errno));
          return -1;
     }

     r->events = (event_item*)malloc((MAX_EPOLL_SIZE) * sizeof(event_item));
     if(r->events == NULL){
          printf("errno: %s, (event_item*)malloc() failed!\n", strerror(errno));
          return -1;
     }

     return 0;
}

reactor *instance = NULL;
reactor *getInstance(){
     if(instance == NULL){
          instance = (reactor*)malloc(sizeof(reactor));
          if(instance == NULL) 
               return NULL;
          memset(instance, 0, sizeof(reactor));
          if(init_reactor(instance) < 0){
               free(instance);
               return NULL;
          }
     }

     return instance;
}

In the above code, we initialize the reactor structure, including creating epollfd and allocating max_ EPOLL_ An array of size is used to store tcp connections; In order to facilitate the subsequent code to use the reactor structure, we define a global reactor * variable instance, and initialize the instance variable using the singleton mode (not a very rigorous singleton).

Since the connection obtained by subsequent accept is also a kind of read event, we designed a set_reactor_events() function to uniformly set accept, read and write events:

int set_reactor_events(int fd, int event, void *args){
     struct epoll_event ev;
     reactor *r = getInstance();
     if(r == NULL){
          printf("set_reactor_events(): getInstance() failed!\n");
          return -1;
     }

     if(event == ACCEPT){
          r->events[fd].fd = fd;
          r->events[fd].args = args;
          r->events[fd].rhandle = acceptCB;
          ev.events = EPOLLIN;
     }else if(event == READ){
          r->events[fd].fd = fd;
          r->events[fd].args = args;
          r->events[fd].rhandle = readCB;
          ev.events = EPOLLIN;
          // ev.events |= EPOLLET;
     }else if(event == WRITE){
          r->events[fd].fd = fd;
          r->events[fd].args = args;
          r->events[fd].whandle = writeCB;
          ev.events = EPOLLOUT;
     }

     ev.data.ptr = &r->events[fd];
     if(r->events[fd].events == INIT){
          epoll_ctl(r->epollfd, EPOLL_CTL_ADD, fd, &ev);
          r->events[fd].events = event;
     }else if(r->events[fd].events != event){
          epoll_ctl(r->epollfd, EPOLL_CTL_MOD, fd, &ev);
          r->events[fd].events = event;
     }

     return 0;
}

In this function, just pass in fd and the event type to be set, and then encapsulate epoll_ The ctl() function completes the registration of events.
So how to use it? We use reactor_loop() function circulates events and assigns them to different callback functions for processing according to different types of events. The code is as follows:

int reactor_loop(){
     struct epoll_event events[MAX_EPOLL_SIZE] = {0};
     reactor *r = getInstance();
     if(r == NULL){
          printf("reactor_loop(): getInstance() failed!\n");
          return -1;
     }

     while(1){
          int nready = epoll_wait(r->epollfd, events, MAX_EPOLL_SIZE, -1);
          if(nready == -1)    
               continue;

          for(int i = 0; i < nready; ++i){
               event_item *item = (event_item *)events[i].data.ptr;
               if(events[i].events & EPOLLIN)
                    (*item->rhandle)(item->fd, 0, NULL);
               if(events[i].events & EPOLLOUT)
                    (*item->whandle)(item->fd, 0, NULL);
          }         
     }

     return 0;
}

After reading this, I believe you have understood what reactor is. Next, we will give the callback functions of accept, read and write events:

int acceptCB(int fd, int events, void* args){
     struct sockaddr_in clientAdr;
     socklen_t len = sizeof(clientAdr);
     int connfd = accept(fd, (struct sockaddr*)&clientAdr, &len);
     if(fcntl(connfd, F_SETFL, O_NONBLOCK) < 0){
          printf("errno: %s, fcntl() failed!\n", strerror(errno));
          return -1;
     }
     printf("new connection: %d\n", connfd);
     set_reactor_events(connfd, READ, args);
     return 0;
}

int readCB(int fd, int events, void* args){
     reactor *r = getInstance();
     if(r == NULL){
          printf("reactor_loop(): getInstance() failed!\n");
          return -1;
     }

     unsigned char *rbuf = r->events[fd].recvBuf;

#if 0  // ET
     int cnt = 0, num = 0;
     while(1){
          num = recv(fd, rbuf+cnt, BUFFER_SIZE-cnt, 0);
          if(num == -1){
               if(errno == EAGAIN || errno == EWOULDBLOCK)
                    break;
          }
          else if(num > 0)
               cnt += num;
          else
               break;
     }
     if (cnt == BUFFER_SIZE && num != -1) {
		set_reactor_events(fd, READ, NULL);
	} else if (num == 0) {
          printf("End to end connection disconnected! clientfd = %d\n", fd);
		del_reactor_events(fd);
	} else {
          unsigned char *sbuf = r->events[fd].sendBuf;
          memcpy(sbuf, rbuf, cnt);
          r->events[fd].sendLen = cnt;
          printf("recv from fd = %d: %s\n", fd, sbuf);
		set_reactor_events(fd, WRITE, NULL);
	}

#else
     int n = recv(fd, rbuf, BUFFER_SIZE, 0);
     if(n == 0){
          printf("errno: %s, clientfd %d closed!\n", strerror(errno), fd);
          del_reactor_events(fd);
          return -1;
     }else if(n < 0){
          if(errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN){
               printf("errno: %s, error!\n", strerror(errno));
               return -1;
          }
     }
     else{
          unsigned char *sbuf = r->events[fd].sendBuf;
          memcpy(sbuf, rbuf, n);
          r->events[fd].sendLen = n;
          printf("recv from fd = %d: %s\n", fd, sbuf);
          set_reactor_events(fd, WRITE, args);
     }

#endif
     return 0;
}

int writeCB(int fd, int events, void* args){
     reactor *r = getInstance();
     if(r == NULL){
          printf("reactor_loop(): getInstance() failed!\n");
          return -1;
     }

     unsigned char *sbuf = r->events[fd].sendBuf;
     int len = r->events[fd].sendLen;

     int ret = send(fd, sbuf, len, 0);
     if(ret < len){
          set_reactor_events(fd, WRITE, args);
     }else{
          set_reactor_events(fd, READ, args);
     }
     return 0;
}

follow-up

This article will come to an end. In the next article, we will implement a single million concurrent code routine based on reactor. In addition, students who need the code can leave a message or send a private letter to me in the comment area. Ha, I also uploaded the corresponding code. The link is here: Add link description
Please let me know if you have any questions~
learn from each other!

Keywords: C C++ Linux server epoll

Added by slibob on Wed, 29 Dec 2021 13:02:58 +0200