100 line process pool realized by C++11

Thread pool

C + + has thread operation and asynchronous operation, which means there is no thread pool. As for the concept of thread pool, I'll first search other people's explanations:

Generally speaking, the thread pool has the following parts:

  1. One or more threads that complete the main task.

  2. Management thread for scheduling management.

  3. A task queue that requires execution.

Let me tell you something: your function needs to run in multiple threads, but you can't start a thread every time you come to a function, so you need fixed N threads to run and execute, but some threads haven't finished executing, and some are idle. How to allocate tasks? You need to encapsulate a thread pool to complete these operations. With the encapsulation of thread pool, You just need to tell it to start a few threads, then directly plug the task, and then obtain the execution results through a certain mechanism.

Here is a 100 line thread pool operation:

https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h

Analyze source code header files

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

There's nothing to say about vector, queue and mory. Thread is thread related. Mutex mutex solves the problem of resource preemption. condition_variable condition quantity is used to wake up threads and block threads. From the perspective of future use, it is a function to obtain thread data. functional function sub, which can be understood as a normalized function pointer. Stdecept is the same as its name, standard exception.

class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;
    
    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

The declaration of the thread pool, the constructor, and an enqueue template function return std::future. Then this type uses run-time detection (or compile time detection?) Inferred, very amazing. Successfully use one line of code to repeat dolls. Is this high-level usage the level of a big man? i'm i.

workers is vectorstd::thread, commonly known as worker thread.

STD:: queue < STD:: function < void() > > tasks are commonly known as task queues.

So, the question is, can the tasks in this task queue only be of void() type? It's not that simple. I have to keep looking.

mutex,condition_variable has nothing to say. Stop controls the thread pool to stop.

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            [this]
            {
                for(;;)
                {
                    std::function<void()> task;
 
 
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
 
 
                    task();
                }
            }
        );
}

The comments written by the boss are so simple that they say that this constructor only inserts a certain number of threads. I realized what this means after reading and reading... Although it really only inserts threads, this thread is too convoluted.

workers. emplace_ The back parameter is a lambda expression that will not be blocked, that is, the outermost layer is an asynchronous function, and the things in each thread are the focus.

The outermost layer of the labmda expression is an endless loop. Why is it for(;) Not while(1) although this is not the key point, the usage of the boss is still worth guessing. I guess it will be more efficient?

After the task is declared, it is followed by a brace. The part in {} is a synchronous operation. As for why this - > lock is used instead of [&] directly to capture parameters, i think it is in memory consideration. The frugal style is like a stingy landlord.

Followed by a wait(lock,condtion) operation, which is very much like the routine of a thousand layer cake.

First floor: isn't this TM going to lock itself? Doesn't it get stuck?

Layer 2: we see it empty_ Back a thread, it won't block, but when you unlock it, isn't the lock in its own thread? That can't be locked?

Layer 3: we can see that the lock is actually just a package. The real lock is the outer mutex, so there is no deadlock from here. But how can you not understand the condition of your wait? You must stop or stop! empty just wait?

Layer 4: we checked the data and found that the following condition will wait only if it returns false, that is to say, wait! Stop & & empty will wait, which means that the thread pool is in running state and no task will execute the wait operation! Otherwise, you won't wait. Go straight!

Layer 5: since you have judged the stop and non empty above, why do you have to judge the stop and empty below to exit? Not redundant?

Layer 6: make sure it is set to stop and the queue is empty before it can retire. Is there a problem? Yes, in the end, all threads are blocked. If you set stop to true, they don't know

I estimate that its stop will wake up all threads, but if some are executing and some are waiting, there should be no way to notify them in place, but they can exit normally at the next judgment.

Because of the doubt, we wanted to see the stop related operations, and found that they were placed in the destructor

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

{} the lock inside is operated with stop as true. I don't know why atomic operation is not used, but after careful consideration, it is probably because there is already a lock, and atomic operation is not internal. Then it did notify all and join ed the worker thread. That is, when they're over.

After analyzing the thousand layer cake, let's take a look at the most important queue joining operation

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;
 
 
    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
 
 
        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");
 
 
        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
}

typename std::result_ Of < F (Args...) >: typename in type should be used to eliminate ambiguity, or because of nested dependency names. As an ordinary programmer who resolutely does not write templates, this code is too difficult... - > type I know what's going on, which is a way to indicate its return type_ Of < F (Args...) > should indicate that F is a function and the signature is Args... This variable parameter does not matter what Args is. It cares about the parameter type of the return value, so there is a type.

As for why the function entry is an R-value reference, it is beyond my understanding. Does functional have to have an R-value reference? Who will take care of its destruction? Is this thread in charge? I'll fill these holes slowly later.

As mentioned earlier, tasks can only receive void() function types. Here, STD:: packaged is used_ task<return_ Type() > complete the derivation of function types. Why not use function < return_ Type () >, because this is not the object finally put into tasks. It needs to undertake a work to return to the future, not package_task is to package and return to the future

Then there is the operation of locking in the queue + notifying the worker thread + returning to the future. It was originally the most difficult part of the thread pool to understand, but it seemed commonplace, because the previous fancy operations have well opened up our understanding ability. There is a little concept about this operation, so there is a kind of "here?" The feeling of

Keywords: C++

Added by chris_2001 on Sun, 16 Jan 2022 00:48:53 +0200