1: Event loop pool class
Event loop pool is a singleton class that manages EventPoller
1. EventPollerPool constructor
EventPollerPool::EventPollerPool() { auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true); InfoL << "establish EventPoller number:" << size; }
2: Adding EventPoller to circular pool
According to s_pool_size, instantiated s_pool_size is an EventPoller, which creates different threads when calling runLoop
size_t TaskExecutorGetterImp::addPoller(const string &name, size_t size, int priority, bool register_thread) { auto cpus = thread::hardware_concurrency(); size = size > 0 ? size : cpus; for (size_t i = 0; i < size; ++i) { EventPoller::Ptr poller(new EventPoller((ThreadPool::Priority) priority)); poller->runLoop(false, register_thread);//Different threads are created here auto full_name = name + " " + to_string(i); poller->async([i, cpus, full_name]() { setThreadName(full_name.data()); setThreadAffinity(i % cpus); }); _threads.emplace_back(std::move(poller)); } return size; }
When calling runLoop, blocked passes false, so it is a thread created in the else branch of the function. Then pass the runLoop function into the thread and modify the parameter blocked to true,
In this way, different threads are created to execute runLoop, and the event loop establishes the round robin process based on the multiplexed epoll model. The principle of epoll is not described in more detail
void EventPoller::runLoop(bool blocked,bool regist_self) { if (blocked) { ...... } else { _loop_thread = new thread(&EventPoller::runLoop, this, true, regist_self); _sem_run_started.wait(); } }
2: EventPoller
Listen for two types of file descriptors through epoll
1. The descriptor of socket is mainly used to monitor the data sent by different clients
Add fd to epoll management through addEvent,
If it is the current thread, the event is added_ event_map to manage that when the epoll event is triggered, it will call back the epoll trigger event, that is, the incoming PollEventCB
If it is not the current thread, asynchronous processing is actually using the pipeline to process events in the current thread. The pipeline processing process can be manipulated 2
int EventPoller::addEvent(int fd, int event, PollEventCB cb) { TimeTicker(); if (!cb) { WarnL << "PollEventCB Empty!"; return -1; } if (isCurrentThread()) { struct epoll_event ev = {0}; ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE; ev.data.fd = fd; int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev); if (ret == 0) { _event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb))); } return ret; } async([this, fd, event, cb]() { addEvent(fd, event, std::move(const_cast<PollEventCB &>(cb))); }); return 0; }
2. Internal custom events, using linux pipeline simulation files to listen,
The program wraps the pipeline and constructs the pipeline in the constructor
PipeWrap _pipe; PipeWrap::PipeWrap(){ //Create pipe if (pipe(_pipe_fd) == -1) { throw runtime_error(StrPrinter << "create posix pipe failed:" << get_uv_errmsg());\ } SockUtil::setNoBlocked(_pipe_fd[0],true); SockUtil::setNoBlocked(_pipe_fd[1],false); SockUtil::setCloExec(_pipe_fd[0]); SockUtil::setCloExec(_pipe_fd[1]); }
When the EventPoller is constructed, the pipeline has been added to epoll management, and the callback is onPipeEvent
EventPoller::EventPoller(ThreadPool::Priority priority ) { ...... //Add internal pipeline event if (addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) == -1) { throw std::runtime_error("epoll Failed to add pipeline"); } }
onPipeEvent is defined as follows. When triggered, the_ list_ The task is exchanged into a temporary variable and then executed. The essence is to handle asynchronous tasks: when other threads add the task to the task list and write the pipeline at the same time, epoll will listen to the read event of the pipeline in this thread, and then take the task out of the task list for execution. So as to achieve the purpose of asynchronous task processing
inline void EventPoller::onPipeEvent() { TimeTicker(); char buf[1024]; int err = 0; do { if (_pipe.read(buf, sizeof(buf)) > 0) { continue; } err = get_uv_error(true); } while (err != UV_EAGAIN); decltype(_list_task) _list_swap; { lock_guard<mutex> lck(_mtx_task); _list_swap.swap(_list_task); } _list_swap.for_each([&](const Task::Ptr &task) { try { (*task)(); } catch (ExitException &) { _exit_flag = true; } catch (std::exception &ex) { ErrorL << "EventPoller Exception caught while executing asynchronous task:" << ex.what(); } }); }
3: Summary
When the program starts, an event loop pool will be created. The size can be customized. By default, it will be automatically allocated according to the hardware performance. Each event loop class has its own independent thread. The event loop is realized through the multiplexed epoll mode. It listens to the events of the sock, receives network data, listens to the events of the pipeline, and performs asynchronous tasks