Interpretation of zlmedia kit server source code -- event loop

1: Event loop pool class

Event loop pool is a singleton class that manages EventPoller

1. EventPollerPool constructor
EventPollerPool::EventPollerPool() {
    auto size = addPoller("event poller", s_pool_size, ThreadPool::PRIORITY_HIGHEST, true);
    InfoL << "establish EventPoller number:" << size;
}
2: Adding EventPoller to circular pool

According to s_pool_size, instantiated s_pool_size is an EventPoller, which creates different threads when calling runLoop

size_t TaskExecutorGetterImp::addPoller(const string &name, size_t size, int priority, bool register_thread) {
    auto cpus = thread::hardware_concurrency();
    size = size > 0 ? size : cpus;
    for (size_t i = 0; i < size; ++i) {
        EventPoller::Ptr poller(new EventPoller((ThreadPool::Priority) priority));
        poller->runLoop(false, register_thread);//Different threads are created here
        auto full_name = name + " " + to_string(i);
        poller->async([i, cpus, full_name]() {
            setThreadName(full_name.data());
            setThreadAffinity(i % cpus);
        });
        _threads.emplace_back(std::move(poller));
    }
    return size;
}

When calling runLoop, blocked passes false, so it is a thread created in the else branch of the function. Then pass the runLoop function into the thread and modify the parameter blocked to true,
In this way, different threads are created to execute runLoop, and the event loop establishes the round robin process based on the multiplexed epoll model. The principle of epoll is not described in more detail

void EventPoller::runLoop(bool blocked,bool regist_self) {
    if (blocked) {
       	......
    } else {
        _loop_thread = new thread(&EventPoller::runLoop, this, true, regist_self);
        _sem_run_started.wait();
    }
}

2: EventPoller

Listen for two types of file descriptors through epoll
1. The descriptor of socket is mainly used to monitor the data sent by different clients

Add fd to epoll management through addEvent,
If it is the current thread, the event is added_ event_map to manage that when the epoll event is triggered, it will call back the epoll trigger event, that is, the incoming PollEventCB
If it is not the current thread, asynchronous processing is actually using the pipeline to process events in the current thread. The pipeline processing process can be manipulated 2

int EventPoller::addEvent(int fd, int event, PollEventCB cb) {
    TimeTicker();
    if (!cb) {
        WarnL << "PollEventCB Empty!";
        return -1;
    }

    if (isCurrentThread()) {
        struct epoll_event ev = {0};
        ev.events = (toEpoll(event)) | EPOLLEXCLUSIVE;
        ev.data.fd = fd;
        int ret = epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, fd, &ev);
        if (ret == 0) {
            _event_map.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));
        }
        return ret;
    }

    async([this, fd, event, cb]() {
        addEvent(fd, event, std::move(const_cast<PollEventCB &>(cb)));
    });
    return 0;
}
2. Internal custom events, using linux pipeline simulation files to listen,

The program wraps the pipeline and constructs the pipeline in the constructor

PipeWrap _pipe;
PipeWrap::PipeWrap(){
	//Create pipe
    if (pipe(_pipe_fd) == -1) {
        throw runtime_error(StrPrinter << "create posix pipe failed:" << get_uv_errmsg());\
    }
    SockUtil::setNoBlocked(_pipe_fd[0],true);
    SockUtil::setNoBlocked(_pipe_fd[1],false);
    SockUtil::setCloExec(_pipe_fd[0]);
    SockUtil::setCloExec(_pipe_fd[1]);
}

When the EventPoller is constructed, the pipeline has been added to epoll management, and the callback is onPipeEvent

EventPoller::EventPoller(ThreadPool::Priority priority ) {
	......
    //Add internal pipeline event
    if (addEvent(_pipe.readFD(), Event_Read, [this](int event) { onPipeEvent(); }) == -1) {
        throw std::runtime_error("epoll Failed to add pipeline");
    }
}

onPipeEvent is defined as follows. When triggered, the_ list_ The task is exchanged into a temporary variable and then executed. The essence is to handle asynchronous tasks: when other threads add the task to the task list and write the pipeline at the same time, epoll will listen to the read event of the pipeline in this thread, and then take the task out of the task list for execution. So as to achieve the purpose of asynchronous task processing

inline void EventPoller::onPipeEvent() {
    TimeTicker();
    char buf[1024];
    int err = 0;
    do {
        if (_pipe.read(buf, sizeof(buf)) > 0) {
            continue;
        }
        err = get_uv_error(true);
    } while (err != UV_EAGAIN);

    decltype(_list_task) _list_swap;
    {
        lock_guard<mutex> lck(_mtx_task);
        _list_swap.swap(_list_task);
    }

    _list_swap.for_each([&](const Task::Ptr &task) {
        try {
            (*task)();
        } catch (ExitException &) {
            _exit_flag = true;
        } catch (std::exception &ex) {
            ErrorL << "EventPoller Exception caught while executing asynchronous task:" << ex.what();
        }
    });
}

3: Summary

When the program starts, an event loop pool will be created. The size can be customized. By default, it will be automatically allocated according to the hardware performance. Each event loop class has its own independent thread. The event loop is realized through the multiplexed epoll mode. It listens to the events of the sock, receives network data, listens to the events of the pipeline, and performs asynchronous tasks

Keywords: Operation & Maintenance server

Added by lukeurtnowski on Fri, 11 Feb 2022 11:25:02 +0200