The third type of events that network programs need to deal with is timing events, such as regularly detecting the active state of a customer connection. Server programs usually manage many timing events. Therefore, effectively organizing these timing events so that they can be triggered at the expected time point without affecting the main logic of the server has a vital impact on the performance of the server. To this end, we need to encapsulate each timing event into a timer, and use some kind of container data structure, such as linked list, sorting linked list and time wheel. The time heap connects all timers in series.
However, before discussing how to organize timers, let's introduce the method of timing. Timing refers to the mechanism that triggers a piece of code after a period of time. We can process all expired timers in this code in turn. In other words, the timing mechanism is the driving force for the timer to be processed. Linux provides three timing methods:
- socket option SO_RCVTIMEO and SO_SNDTIMEO.
- SIGALRM signal.
- Timeout parameter of I/O multiplexing system call.
socket option SO_RCVTIMEO and SO_SNDTIMEO
Socket option SO_RCVTIMEO and SO_SNDTIMEO, which are used to set the timeout of socket receiving data and sending data respectively. Therefore, these two options are only valid for socket specific system calls related to data reception and transmission, including send, sendmsg, recv, recvmsg, accept and connect We will choose SO_RCVTIMEO and so_ The impact of sndtimeo on these system calls is summarized in the table.
It can be seen from the table that in the program, we can judge whether the timeout has expired according to the return values of system calls (send, sendmsg, recv, recvmsg, accept and connect) and errno, and then decide whether to start processing scheduled tasks.
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdlib.h> #include <assert.h> #include <stdio.h> #include <errno.h> #include <fcntl.h> #include <unistd.h> #include <string.h> int timeout_connect( const char* ip, int port, int time ) { int ret = 0; struct sockaddr_in address; bzero( &address, sizeof( address ) ); address.sin_family = AF_INET; inet_pton( AF_INET, ip, &address.sin_addr ); address.sin_port = htons( port ); int sockfd = socket( PF_INET, SOCK_STREAM, 0 ); assert( sockfd >= 0 ); struct timeval timeout; timeout.tv_sec = time; timeout.tv_usec = 0; socklen_t len = sizeof( timeout ); ret = setsockopt( sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len ); assert( ret != -1 ); ret = connect( sockfd, ( struct sockaddr* )&address, sizeof( address ) ); if ( ret == -1 ) { if( errno == EINPROGRESS ) //If the error corresponding to the timeout is established, you can process the scheduled task { printf( "connecting timeout\n" ); return -1; } printf( "error occur when connecting to server\n" ); return -1; } return sockfd; } int main( int argc, char* argv[] ) { if( argc <= 2 ) { printf( "usage: %s ip_address port_number\n", basename( argv[0] ) ); return 1; } const char* ip = argv[1]; int port = atoi( argv[2] ); int sockfd = timeout_connect( ip, port, 10 ); if ( sockfd < 0 ) { return 1; } return 0; }
SIGALRM signal
Once the real-time alarm clock set by the alarm and setimer functions times out, the SIGALRM signal will be triggered. Therefore, we can use the signal processing function of the signal to process the timing task. However, if we want to process multiple timing tasks, we need to constantly trigger the SIGALRM signal and execute the expired tasks in its signal processing function. Generally speaking, the SIGALRM signal is generated at a fixed frequency, that is, the timing period t set by the alarm or setimer function remains unchanged. If the timeout time of a scheduled task is not an integral multiple of T, the actual execution time will deviate slightly from the expected time. Therefore, the timing period T reflects the accuracy of timing.
#ifndef LST_TIMER #define LST_TIMER #include <time.h> #include <netinet/in.h> #define BUFFER_SIZE 64 class util_timer; struct client_data { sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; util_timer *timer; }; class util_timer { public: util_timer() : prev(NULL), next(NULL) {} public: time_t expire; void (*cb_func)(client_data *); client_data *user_data; util_timer *prev; util_timer *next; }; class sort_timer_lst { public: sort_timer_lst() : head(NULL), tail(NULL) {} ~sort_timer_lst() { util_timer *tmp = head; while (tmp) { head = tmp->next; delete tmp; tmp = head; } } void add_timer(util_timer *timer) { if (!timer) { return; } if (!head) { head = tail = timer; return; } if (timer->expire < head->expire) { timer->next = head; head->prev = timer; head = timer; return; } add_timer(timer, head); } void adjust_timer(util_timer *timer) { if (!timer) { return; } util_timer *tmp = timer->next; if (!tmp || (timer->expire < tmp->expire)) { return; } if (timer == head) { head = head->next; head->prev = NULL; timer->next = NULL; add_timer(timer, head); } else { timer->prev->next = timer->next; timer->next->prev = timer->prev; add_timer(timer, timer->next); } } void del_timer(util_timer *timer) { if (!timer) { return; } if ((timer == head) && (timer == tail)) { delete timer; head = NULL; tail = NULL; return; } if (timer == head) { head = head->next; head->prev = NULL; delete timer; return; } if (timer == tail) { tail = tail->prev; tail->next = NULL; delete timer; return; } timer->prev->next = timer->next; timer->next->prev = timer->prev; delete timer; } void tick() { if (!head) { return; } printf("timer tick\n"); time_t cur = time(NULL); util_timer *tmp = head; while (tmp) { if (cur < tmp->expire) { break; } tmp->cb_func(tmp->user_data); head = tmp->next; if (head) { head->prev = NULL; } delete tmp; tmp = head; } } private: void add_timer(util_timer *timer, util_timer *lst_head) { util_timer *prev = lst_head; util_timer *tmp = prev->next; while (tmp) { if (timer->expire < tmp->expire) { prev->next = timer; timer->next = tmp; tmp->prev = timer; timer->prev = prev; break; } prev = tmp; tmp = tmp->next; } if (!tmp) { prev->next = timer; timer->prev = prev; timer->next = NULL; tail = timer; } } private: util_timer *head; util_timer *tail; }; #endif
Now let's consider the practical application of the above ascending timer linked list - dealing with inactive connections. Server programs usually deal with inactive connections regularly: Send a reconnect request to the client, or close the connection, or other. Linux provides a regular check mechanism for whether the connection is active in the kernel. We can activate it through the socket option KEEPALIVE. However, using this method will complicate the management of connections by applications. Therefore, we can consider implementing a mechanism similar to KEEPALIVE in the application layer to manage all connections that are inactive for a long time. For example, the SIGALRM signal is triggered periodically by the alarm function. The signal processing function of the signal uses the pipeline to notify the main loop to execute the timing task on the timer chain list and close the inactive connection.
#include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <assert.h> #include <stdio.h> #include <signal.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <fcntl.h> #include <stdlib.h> #include <sys/epoll.h> #include <pthread.h> #include "lst_timer.h" #define FD_LIMIT 65535 #define MAX_EVENT_NUMBER 1024 #define TIMESLOT 5 static int pipefd[2]; static sort_timer_lst timer_lst; static int epollfd = 0; int setnonblocking(int fd) { int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epollfd, int fd) { epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void sig_handler(int sig) { int save_errno = errno; int msg = sig; send(pipefd[1], (char *)&msg, 1, 0); errno = save_errno; } void addsig(int sig) { struct sigaction sa; memset(&sa, '\0', sizeof(sa)); sa.sa_handler = sig_handler; sa.sa_flags |= SA_RESTART; sigfillset(&sa.sa_mask); assert(sigaction(sig, &sa, NULL) != -1); } void timer_handler() { timer_lst.tick(); alarm(TIMESLOT); } void cb_func(client_data *user_data) { epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0); assert(user_data); close(user_data->sockfd); printf("close fd %d\n", user_data->sockfd); } int main(int argc, char *argv[]) { if (argc <= 2) { printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char *ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr *)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd); ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); assert(ret != -1); setnonblocking(pipefd[1]); addfd(epollfd, pipefd[0]); // add all the interesting signals here addsig(SIGALRM); addsig(SIGTERM); bool stop_server = false; client_data *users = new client_data[FD_LIMIT]; bool timeout = false; alarm(TIMESLOT); while (!stop_server) { int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if ((number < 0) && (errno != EINTR)) { printf("epoll failure\n"); break; } for (int i = 0; i < number; i++) { int sockfd = events[i].data.fd; if (sockfd == listenfd) { struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr *)&client_address, &client_addrlength); addfd(epollfd, connfd); users[connfd].address = client_address; users[connfd].sockfd = connfd; util_timer *timer = new util_timer; timer->user_data = &users[connfd]; timer->cb_func = cb_func; time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; users[connfd].timer = timer; timer_lst.add_timer(timer); } else if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) { int sig; char signals[1024]; ret = recv(pipefd[0], signals, sizeof(signals), 0); if (ret == -1) { // handle the error continue; } else if (ret == 0) { continue; } else { for (int i = 0; i < ret; ++i) { switch (signals[i]) { case SIGALRM: { timeout = true; break; } case SIGTERM: { stop_server = true; } } } } } else if (events[i].events & EPOLLIN) { memset(users[sockfd].buf, '\0', BUFFER_SIZE); ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE - 1, 0); printf("get %d bytes of client data %s from %d\n", ret, users[sockfd].buf, sockfd); util_timer *timer = users[sockfd].timer; if (ret < 0) { if (errno != EAGAIN) { cb_func(&users[sockfd]); if (timer) { timer_lst.del_timer(timer); } } } else if (ret == 0) { cb_func(&users[sockfd]); if (timer) { timer_lst.del_timer(timer); } } else { //send( sockfd, users[sockfd].buf, BUFFER_SIZE-1, 0 ); if (timer) { time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; printf("adjust timer once\n"); timer_lst.adjust_timer(timer); } } } else { // others } } if (timeout) { timer_handler(); timeout = false; } } close(listenfd); close(pipefd[1]); close(pipefd[0]); delete[] users; return 0; }
Timeout parameter of I/O multiplexing system call
The three groups of I/O multiplexing system calls under Linux have timeout parameters, so they can handle not only signals and I/O events, but also timing events. However, since I/O multiplexing system calls may return before the timeout expires (I/O events occur), if we want to use them to timing, we need to constantly update the timing parameters to reflect the remaining time.
#define TIMEOUT 5000 int timeout = TIMEOUT; time_t start = time(NULL); time_t end = time(NULL); while (1) { printf("the timeout is now %d mill-seconds\n", timeout); start = time(NULL); int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, timeout); if ((number < 0) && (errno != EINTR)) { printf("epoll failure\n"); break; } //If epo11_ If the wait returns 0 successfully, it indicates that the timeout time has expired. At this time, the scheduled task can be processed and the scheduled time can be reset if (number == 0) { // timeout timeout = TIMEOUT; continue; } end = time(NULL); /*If epol1_ If the return value of wait is greater than 0, this epoll_ The duration of the wait call is (end - start)* 1000 ms,We need to subtract this time from the timing time timeout to get the next epol1_ Timeout parameter of wait call*/ timeout -= (end - start) * 1000; /*The timeout value after recalculation may be equal to 0, indicating that this epoll_ When the wait call returns, not only the file descriptor is returned Thread, and its timeout time has just arrived. At this time, we also need to process the scheduled task and reset the scheduled time*/ if (timeout <= 0) { // timeout timeout = TIMEOUT; } // handle connections }
Time wheel
In the time wheel shown in the figure, (solid line) the pointer points to - a slot on the wheel. It rotates clockwise at a constant speed. Each step of rotation points to the next slot (the slot pointed by the dotted pointer). Each rotation is called a tick. A tick time is called the slot interval si (slot interval) of the time wheel, which is actually the heart beat time. The time wheel has N slots in total, so its operation time for one week is N*si. Each slot points to a timer chain list. The timers on each chain list have the same characteristics: their timing time is an integer multiple of N*si. The time wheel uses this relationship to hash timers into different linked lists. If the pointer now points to slot cs and we want to add a timer with timing time ti, the timer will be inserted into the linked list corresponding to slot ts (timer slot):
ts=(cs+(ti/si))%N
The timer based on the sorting linked list uses a unique linked list to manage all timers, so the efficiency of human insertion operation decreases with the increase of the number of timers. The time wheel uses the idea of hash table to hash timers to different linked lists. In this way, the number of timers on each linked list will be significantly less than the number of timers on the original sorting linked list, and the efficiency of insertion operation is basically not affected by the number of timers.
Obviously, for the time wheel, to improve the timing accuracy, it is necessary to make the si value small enough; to improve the execution efficiency, it is required that the N value is large enough.
The figure describes a simple time wheel because it has only one wheel. The complex time wheel may have multiple wheels, and different wheels have different granularity. The two adjacent wheels turn once with high precision and move forward only one slot with low precision, just like a water meter.
#ifndef TIME_WHEEL_TIMER #define TIME_WHEEL_TIMER #include <time.h> #include <netinet/in.h> #include <stdio.h> #define BUFFER_SIZE 64 class tw_timer; struct client_data { sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; tw_timer *timer; }; class tw_timer { public: tw_timer(int rot, int ts) : next(NULL), prev(NULL), rotation(rot), time_slot(ts) {} public: int rotation; //Record how many times the timer turns in time int time_slot; //Record which slot the timer is in on the time wheel void (*cb_func)(client_data *); client_data *user_data; tw_timer *next; tw_timer *prev; }; class time_wheel { public: time_wheel() : cur_slot(0) { for (int i = 0; i < N; ++i) { slots[i] = NULL; } } ~time_wheel() { for (int i = 0; i < N; ++i) { tw_timer *tmp = slots[i]; while (tmp) { slots[i] = tmp->next; delete tmp; tmp = slots[i]; } } } tw_timer *add_timer(int timeout) { if (timeout < 0) { return NULL; } int ticks = 0; if (timeout < TI) { ticks = 1; } else { ticks = timeout / TI; } int rotation = ticks / N; int ts = (cur_slot + (ticks % N)) % N; tw_timer *timer = new tw_timer(rotation, ts); if (!slots[ts]) { printf("add timer, rotation is %d, ts is %d, cur_slot is %d\n", rotation, ts, cur_slot); slots[ts] = timer; } else { timer->next = slots[ts]; slots[ts]->prev = timer; slots[ts] = timer; } return timer; } void del_timer(tw_timer *timer) { if (!timer) { return; } int ts = timer->time_slot; if (timer == slots[ts]) { slots[ts] = slots[ts]->next; if (slots[ts]) { slots[ts]->prev = NULL; } delete timer; } else { timer->prev->next = timer->next; if (timer->next) { timer->next->prev = timer->prev; } delete timer; } } void tick() { tw_timer *tmp = slots[cur_slot]; printf("current slot is %d\n", cur_slot); while (tmp) { printf("tick the timer once\n"); if (tmp->rotation > 0) { tmp->rotation--; tmp = tmp->next; } else { tmp->cb_func(tmp->user_data); if (tmp == slots[cur_slot]) { printf("delete header in cur_slot\n"); slots[cur_slot] = tmp->next; delete tmp; if (slots[cur_slot]) { slots[cur_slot]->prev = NULL; } tmp = slots[cur_slot]; } else { tmp->prev->next = tmp->next; if (tmp->next) { tmp->next->prev = tmp->prev; } tw_timer *tmp2 = tmp->next; delete tmp; tmp = tmp2; } } } cur_slot = ++cur_slot % N; } private: static const int N = 60; //Number of slots static const int SI = 1; //Rotate once in 1s tw_timer *slots[N]; //A slot in a time wheel in which each element points to an unordered linked list of timers int cur_slot; //Current slot of time wheel }; #endif
Time pile
The timing schemes discussed above call the heart beat function tick at a fixed frequency, detect the expiration timer in turn, and then execute the callback function on the expiration timer. Another idea of designing timer is to take the timeout value of the timer with the smallest timeout time among all timers as the heart beat interval. In this way, once the heart beat function tick is called, the timer with the minimum timeout time will inevitably expire, and we can process the timer in the tick function. Then, find the one with the smallest timeout time from the remaining timers again, and set this minimum time as the next heart beat interval. So repeatedly, a more accurate timing is realized.
The basic operations of the tree are inserting nodes and deleting nodes. They are simple for the smallest heap. In order to insert an element X into the smallest heap, we can create a hole in the next free position of the tree. If X can be placed in the hole without destroying the stacking sequence, the insertion is completed. Otherwise, the top-down operation is performed, that is, exchanging the hole and its parent node_ Element on. Continue the above process until X can be put into the hole, and the insertion operation is completed. For example, if we want to insert an element with a value of 14 into the minimum heap shown in the figure, we can follow the steps shown in the figure.
The nature of the minimum heap can be seen in this article
#ifndef intIME_HEAP #define intIME_HEAP #include <iostream> #include <netinet/in.h> #include <time.h> using std::exception; #define BUFFER_SIZE 64 class heap_timer; struct client_data { sockaddr_in address; int sockfd; char buf[BUFFER_SIZE]; heap_timer *timer; }; class heap_timer { public: heap_timer(int delay) { expire = time(NULL) + delay; } public: time_t expire;//Absolute time generated by timer void (*cb_func)(client_data *); client_data *user_data; }; class time_heap { public: time_heap(int cap) throw(std::exception) : capacity(cap), cur_size(0) { array = new heap_timer *[capacity]; if (!array) { throw std::exception(); } for (int i = 0; i < capacity; ++i) { array[i] = NULL; } } time_heap(heap_timer **init_array, int size, int capacity) throw(std::exception) : cur_size(size), capacity(capacity) { if (capacity < size) { throw std::exception(); } array = new heap_timer *[capacity]; if (!array) { throw std::exception(); } for (int i = 0; i < capacity; ++i) { array[i] = NULL; } if (size != 0) { for (int i = 0; i < size; ++i) { array[i] = init_array[i]; } for (int i = (cur_size - 1) / 2; i >= 0; --i) { percolate_down(i); } } } ~time_heap() { for (int i = 0; i < cur_size; ++i) { delete array[i]; } delete[] array; } public: void add_timer(heap_timer *timer) throw(std::exception) { if (!timer) { return; } if (cur_size >= capacity) { resize(); } int hole = cur_size++; int parent = 0; for (; hole > 0; hole = parent) { parent = (hole - 1) / 2; if (array[parent]->expire <= timer->expire) { break; } array[hole] = array[parent]; } array[hole] = timer; } void del_timer(heap_timer *timer) { if (!timer) { return; } // lazy delelte timer->cb_func = NULL; } heap_timer *top() const { if (empty()) { return NULL; } return array[0]; } void pop_timer() { if (empty()) { return; } if (array[0]) { delete array[0]; array[0] = array[--cur_size]; percolate_down(0); } } void tick() { heap_timer *tmp = array[0]; time_t cur = time(NULL); while (!empty()) { if (!tmp) { break; } if (tmp->expire > cur) { break; } if (array[0]->cb_func) { array[0]->cb_func(array[0]->user_data); } pop_timer(); tmp = array[0]; } } bool empty() const { return cur_size == 0; } private: void percolate_down(int hole) { heap_timer *temp = array[hole]; int child = 0; for (; ((hole * 2 + 1) <= (cur_size - 1)); hole = child) { child = hole * 2 + 1; if ((child < (cur_size - 1)) && (array[child + 1]->expire < array[child]->expire)) { ++child; } if (array[child]->expire < temp->expire) { array[hole] = array[child]; } else { break; } } array[hole] = temp; } void resize() throw(std::exception) { heap_timer **temp = new heap_timer *[2 * capacity]; for (int i = 0; i < 2 * capacity; ++i) { temp[i] = NULL; } if (!temp) { throw std::exception(); } capacity = 2 * capacity; for (int i = 0; i < cur_size; ++i) { temp[i] = array[i]; } delete[] array; array = temp; } private: heap_timer **array; //Heap array int capacity; //Heap array capacity int cur_size; //The number of elements currently contained in the heap array }; #endif