Detailed analysis of muduo network library base source code

1. Directory structure of Muduo Network Library

/muduo$ tree ./ -L 2
./
├── BUILD.bazel
├── build.sh
├── ChangeLog
├── ChangeLog2
├── CMakeLists.txt
├── contrib
│?? ├── CMakeLists.txt
│?? ├── hiredis
│?? └── thrift
├── muduo
├── README
└── WORKSPACE
......

muduo library body code

├── muduo
│   ├── base
# base network independent basic code, including thread library:: muduo namespace 
# ::muduo::net namespace 
│   └── net
        ├── inspect
            ├── poller
            ├── http
            ├── protobuf
            ├── protorpc
# Code about network module and rpc

base directory

chen@ecs-213609:~/muduo/muduo/base$ tree ./ -L 1
./
├── AsyncLogging.cc	# Asynchronous log
├── AsyncLogging.h
├── Atomic.h		# Atomic operation
├── BlockingQueue.h	# Non blocking queue
├── BoundedBlockingQueue.h
├── BUILD.bazel
├── CMakeLists.txt
├── Condition.cc		# Conditional variable
├── Condition.h
├── copyable.h			# The default class that can be copied is an empty base class
├── CountDownLatch.cc 	# Countdown latch synchronization
├── CountDownLatch.h	
├── CurrentThread.cc	# thread 
├── CurrentThread.h
├── Date.cc		
├── Date.h
├── Exception.cc
├── Exception.h
├── FileUtil.cc
├── FileUtil.h
├── GzipFile.h			# Compressed file
├── LogFile.cc			# Log files, etc
├── LogFile.h		
├── Logging.cc
├── Logging.h
├── LogStream.cc
├── LogStream.h
├── Mutex.h				# mutex
├── noncopyable.h
├── ProcessInfo.cc
├── ProcessInfo.h
├── Singleton.h
├── StringPiece.h
├── tests
├── Thread.cc
├── Thread.h
├── ThreadLocal.h
├── ThreadLocalSingleton.h
├── ThreadPool.cc	# Thread pool
├── ThreadPool.h
├── Timestamp.cc
├── Timestamp.h
├── TimeZone.cc
├── TimeZone.h
├── Types.h
└── WeakCallback.h

Class of base directory

base are some tool classes that can be extracted and used directly

Under base/tests are some test cases

Execution/ build.sh will generate a build/Debug directory in the upper directory

You can change the shell script if necessary

$ chmod +x build.sh # Add executable permissions

set -x

SOURCE_DIR=`pwd` # Set current path variable
BUILD_DIR=${BUILD_DIR:-./build} # Set the output path to/ build
BUILD_NO_EXAMPLES=${BUILD_NO_EXAMPLES:-0}

mkdir -p $BUILD_DIR/ \
  && cd $BUILD_DIR/ \
  && cmake $SOURCE_DIR \
  && make $*
rm CMakeCache.txt
rm CMakeFiles -r 
rm cmake_install.cmake

Delete some redundant files and some information in the Debug directory, and only do the simplest application output

Executable files will be generated under build/release-cpp11/bin

tree ./ -L 1
./
├── CMakeCache.txt
├── CMakeFiles
├── cmake_install.cmake
├── dep.dot
├── dep.dot.muduo_base
├── dep.dot.muduo_base.dependers
├── lib
├── Makefile
└── muduo

Compile timestamp.exe separately CC generates muduo_base static library

set(base_SRCS
  Timestamp.cc
  )

add_library(muduo_base ${base_SRCS})
target_link_libraries(muduo_base pthread rt)

install(TARGETS muduo_base DESTINATION lib)
file(GLOB HEADERS "*.h")
install(FILES ${HEADERS} DESTINATION include/muduo/base)

2 Timestamp time processing

copyable.h / / the classes that can be copied by default are empty base classes and value semantics

#ifndef MUDUO_BASE_COPYABLE_H
#define MUDUO_BASE_COPYABLE_H
namespace muduo {
class copyable {
 protected:
  copyable() = default;
  ~copyable() = default;
};
} 
#endif  // MUDUO_BASE_COPYABLE_H                    

For example, the Timestamp class handles time

class Timestamp : public muduo::copyable,
                  public boost::less_than_comparable<Timestamp>
{
   inline bool operator<(Timestamp lhs, Timestamp rhs)
    {
      return lhs.microSecondsSinceEpoch() < rhs.microSecondsSinceEpoch();
    }                   
}

Inheriting this class, it is required to realize the operator overloading of <, < =, > = and the idea of template meta programming.

static const int muduo::Timestamp::microSecondsSinceEpoch_ 
priavte : int64_t muduo::Timestamp::kMicroSecondsPerSecond 
    
    
muduo::Timestamp::valid muduo::Timestamp::toString()const
muduo::Timestamp::toFormattedString() const
muduo::Timestamp::Timestamp muduo::Timestamp::Timestamp()
muduo::Timestamp::swap muduo::Timestamp::secondsSinceEpoch()const
muduo::Timestamp::now()
muduo::Timestamp(int64_t microSecondsSinceEpoch)
muduo::Timestamp::invalid()
void swap(Timestamp& that)
  {
    std::swap(microSecondsSinceEpoch_, that.microSecondsSinceEpoch_);
  }
// Used to exchange two timest amp s, & reference passing
inline double timeDifference(Timestamp high, Timestamp low)
// Returns the number of microseconds of the timestamp
    
inline Timestamp addTime(Timestamp timestamp, double seconds)
// Add two timestamps
Timestamp Timestamp::now()
// Microseconds from 1970
string Timestamp::toString() const
{
      // Multiply by 100W to get the number of microseconds, using a structure tm_time to get the current number of seconds since 1970
      char buf[32] = {0};
      time_t seconds = static_cast<time_t>(microSecondsSinceEpoch_ / kMicroSecondsPerSecond);
      int microseconds = static_cast<int>(microSecondsSinceEpoch_ % kMicroSecondsPerSecond);
      struct tm tm_time;
      gmtime_r(&seconds, &tm_time); // Thread safe function

      snprintf(buf, sizeof(buf), "%4d%02d%02d %02d:%02d:%02d.%06d",
          tm_time.tm_year + 1900, tm_time.tm_mon + 1, tm_time.tm_mday,
          tm_time.tm_hour, tm_time.tm_min, tm_time.tm_sec,
          microseconds);
    // Splice it into buf to obtain the current month, day, hour, minute and second
  return buf;

}

toString()

string Timestamp::toString() const
{
  // int64_t PRId64 is used to represent 64 bit integers. The cross platform printing method is PRld64
  char buf[32] = {0};
  int64_t seconds = microSecondsSinceEpoch_ / kMicroSecondsPerSecond;
  int64_t microseconds = microSecondsSinceEpoch_ % kMicroSecondsPerSecond;
  snprintf(buf, sizeof(buf)-1, "%"  ".%06" PRId64 "", seconds, microseconds);
  return buf;
}

Timestamp_unittest.cc

#include <muduo/base/Timestamp.h>
#include <vector>
#include <stdio.h>
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <ctype.h>
using muduo::Timestamp;

void passByConstReference(const Timestamp& x)
{
  printf("%s\n", x.toString().c_str());
}
/**
 * @brief Print out the subtleties in a cross platform way
 * 
 * @param x 
 */
void passByValue(Timestamp x)
{
    int a = 0;
    printf("%s\n", x.toString().c_str());
}

void benchmark()
{
  const int kNumber = 1000*1000;

  std::vector<Timestamp> stamps;
  stamps.reserve(kNumber);  // Pre allocated space, 1 million object spaces
  for (int i = 0; i < kNumber; ++i)
  {
    // One million time objects, now static function system call calculation is subtle
    stamps.push_back(Timestamp::now());
  }
  printf("%s\n", stamps.front().toString().c_str());
  printf("%s\n", stamps.back().toString().c_str());
  // Calculate the time difference a million times
  printf("%f\n", timeDifference(stamps.back(), stamps.front()));

  int increments[100] = { 0 };
  int64_t start = stamps.front().microSecondsSinceEpoch(); // Microseconds of the first time
  for (int i = 1; i < kNumber; ++i)
  {
    int64_t next = stamps[i].microSecondsSinceEpoch();    // Time difference between two similar times
    int64_t inc = next - start;
    start = next;
    if (inc < 0)          
    {
      // Time has reversed, and this problem is generally impossible
      printf("reverse!\n");
    }
    else if (inc < 100)
    {
      ++increments[inc]; // Several time differences are less than 100
    }
    else
    {
      printf("big gap %d\n", static_cast<int>(inc));
      // More than a hundred subtle time differences 
    }
  }

  for (int i = 0; i < 100; ++i)
  {
    printf("%2d: %d\n", i, increments[i]);
  }
}

int main()
{
  Timestamp now(Timestamp::now());
  printf("%s\n", now.toString().c_str()); // Output current time
  passByValue(now);
  passByConstReference(now);
  benchmark(); // Function to measure time
}

In the contained type H provides two type conversion functions

Implicit conversion and downward conversion

template<typename To, typename From>
inline To implicit_cast(From const &f) {
  return f;
}
template<typename To, typename From>     
inline To down_cast(From* f) {
     if (false) {
    implicit_cast<From*, To>(0);
  }
#if ! defined(NDEBUG) && ! Defined (google_protobuf_no_rtti) / / judge the runtime type recognition to transform RTTI
  assert(f == NULL || dynamic_cast<To>(f) != NULL);  // c + + type conversion, specifically the operator of the parent class and the rotor class, provided that the base class pointer points to the derived class object
#endif
  return static_cast<To>(f);
}

3. CAS of GCC

GCC4. Atomic operations of CAS are supported in version 1 + (for complete atomic operations, see GCC Atomic Builtins)

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)

type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

AtomicIntegerT the atomic operation of an integer

volatile T value_ The system always re reads data from its memory instead of using a backup stored in a register. Even if he has just read the previous instruction, the data is saved. To prevent the compiler from optimizing this operation, you need to read the value accurately every time.

T get()
  {
    return __sync_val_compare_and_swap(&value_, 0, 0);
    // Compare the current value_ Whether the obtained value is 0, return value_ Value before modification
  }
T getAndAdd(T x)
  {
    return __sync_fetch_and_add(&value_, x);
    // Return the unmodified value, and then_+ x
  }
T incrementAndGet()
  {
    return addAndGet(1);
	// Self addition+  
  }
T getAndSet(T newValue)
  {
    return __sync_lock_test_and_set(&value_, newValue);
    // Return the original value and get a new value
  }
typedef detail::AtomicIntegerT<int32_t> AtomicInt32;
typedef detail::AtomicIntegerT<int64_t> AtomicInt64;
// 32 64 bit integer

4. Implementation of exception class

Used to save stack frame address

Exception::Exception(const char* msg)
  : message_(msg)
{
  fillStackTrace();
}
void Exception::fillStackTrace()
{
  const int len = 200;
  void* buffer[len];
  int nptrs = ::backtrace(buffer, len); // man 3 is a function that returns the call information. The program call in the current program activity is returned to the buffer
  char** strings = ::backtrace_symbols(buffer, nptrs);
    //Corresponding to the analysis of the returned task frame, the address is converted into a function symbol and a secondary pointer, pointing to a pointer array
  if (strings)
  {
    for (int i = 0; i < nptrs; ++i)
    {
      
      stack_.append(strings[i]);
      stack_.push_back('\n');
    }
    free(strings);
  }
}

Exception_test.cc usage example

It can print out the abnormal call stack

int main()
{
  try
  {
    foo();
  }
  catch (const muduo::Exception& ex)
  {
    printf("reason: %s\n", ex.what());
    printf("stack trace: %s\n", ex.stackTrace());
  }
}

5 some details of thread class

thread_test.cc

int main()
{
  printf("pid=%d, tid=%d\n", ::getpid(), muduo::CurrentThread::tid());
}
1 when printing the current worker thread

CurrentThread::tid()

// ......
extern __thread int t_cachedTid; // Thread local storage
// ......
inline int tid()
  {
    if (t_cachedTid == 0)
    {
      cacheTid(); // If the cache has been obtained, it will not be obtained again, but will be returned directly to reduce system calls
    }
    return t_cachedTid;
  }
void CurrentThread::cacheTid()
{
  if (t_cachedTid == 0)
  {
    t_cachedTid = detail::gettid();
    int n = snprintf(t_tidString, sizeof t_tidString, "%5d ", t_cachedTid);
    assert(n == 6); (void) n; 
      // Since n is a compile time assertion, add (void) to prevent release verison from warning because the variable is not used
  }
}

pid_t gettid()

pid_t gettid()
{
  return static_cast<pid_t>(::syscall(SYS_gettid));
    // Call the system call to get the real tid
}

Since it was submitted in 2013, it seems that the compilation does not support such a declaration in the namespace. I changed it myself. How to change it is wrong, so I had to change it into a static variable in my own project.

	/**
 * @brief 
 * @param __thread Is the thread local storage under gcc. If you use__ Modifier, a private global variable for each thread 
 */
	namespace CurrentThread
	{
		// __ The variable modified by thread is stored locally by the thread.
		__thread int t_cachedTid = 0;							 // The cache of thread real pid (tid) is inefficient if the system call is used to obtain pid every time
																 // This is to reduce the number of:: syscall(SYS_gettid) system calls
																 // Improve the efficiency of obtaining tid
		__thread char t_tidString[32];							 // This is the string representation of tid
		__thread const char *t_threadName = "unknown";			 // Thread name
		const bool sameType = boost::is_same<int, pid_t>::value; // Indicates whether it is of the same type
		BOOST_STATIC_ASSERT(sameType);							 // Compile assertions, judge types
	}

Modified

namespace muduo
{
    namespace CurrentThread
    {
        // internal
        static __thread int t_cachedTid;
        inline int tid()
        {
            if (t_cachedTid == 0)
                t_cachedTid = static_cast<pid_t>(::syscall(SYS_gettid));
            return t_cachedTid;
        }
    }
}
2. Note that ThreadNameInitializer initializes this class in

In namespace muduo, it means that its execution should be before the main function.

class ThreadNameInitializer
{
 public:
  ThreadNameInitializer()
  {
    muduo::CurrentThread::t_threadName = "main";
    CurrentThread::tid();
    pthread_atfork(NULL, NULL, &afterFork);
  }
};
ThreadNameInitializer init;

pthread_atfork three function pointers

Before creating a child process internally, the parent process will call parameter 1. After creation, the parent process will call parameter 2 and the child process will call parameter 3

3 example code
#include <stdio.h>
#include <time.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/wait.h>
void prepare(void)
{
	printf("pid = %d prepare ...\n", static_cast<int>(getpid()));
}

void parent(void)
{
	printf("pid = %d parent ...\n", static_cast<int>(getpid()));
}

void child(void)
{
	printf("pid = %d child ...\n", static_cast<int>(getpid()));
}


int main(void)
{
	printf("pid = %d Entering main ...\n", static_cast<int>(getpid()));

	pthread_atfork(prepare, parent, child); // Before creating a child process internally, the Parent process will call prepare. After creation, the Parent process will call Parent and the child process will call child
	int pid;
	int c_pid;   
	if((pid =fork()))
	{
		 c_pid = pid;
		// Parent process
		int staus;
		if ((pid = wait(&staus)) != -1 && pid == c_pid)
		{
			printf("Recycle child process\n");
			fflush(stdin);
		}
	}
	printf("pid = %d Exiting main ...\n",static_cast<int>(getpid()));
}

Operation results

pid = 23777 Entering main ...
pid = 23777 prepare ...
pid = 23777 parent ...
pid = 23778 child ...
pid = 23778 Exiting main ...
Recycle child process
pid = 23777 Exiting main ...

6 MutexLock and MutexLockGuard

They belong to an association relationship. MutexLockGuard is not responsible for the life cycle of MutexLock, but is only responsible for unlocking it

class MutexLock : boost::noncopyable{/*......*/};
class MutexLockGuard : boost::noncopyable
{
  public:
		explicit MutexLockGuard(MutexLock &mutex)
			: mutex_(mutex)
		{
			mutex_.lock();
		}
		~MutexLockGuard()
		{
			mutex_.unlock();
		}

	private:
		MutexLock &mutex_;  
};
#define MutexLockGuard(x) error "Missing guard object name"

In the last sentence, the author also set the macro not to use temporary objects to own locks to prevent abuse.

1 example code

On the efficiency of locking

MutexLock g_mutex;
vector<int> g_vec;
const int kCount = 10 * 1000 * 1000; // Global constant 1000w
void threadFunc()
{
	for (int i = 0; i < kCount; ++i)
	{
		MutexLockGuard lock(g_mutex);
		g_vec.push_back(i);
	}
}

int main()
{
	const int kMaxThreads = 8;			 // Up to eight threads
	g_vec.reserve(kMaxThreads * kCount); // Reserve 80 million spaces
	
	Timestamp start(Timestamp::now());   // Get time
	for (int i = 0; i < kCount; ++i)
	{
		g_vec.push_back(i);
	}

	printf("single thread without lock %f\n", timeDifference(Timestamp::now(), start)); // Subtract two timestamps
	
	start = Timestamp::now();
	threadFunc();
	printf("single thread with lock %f\n", timeDifference(Timestamp::now(), start));	// Calculate the time after locking

	for (int nthreads = 1; nthreads < kMaxThreads; ++nthreads)
	{
		boost::ptr_vector<Thread> threads; // ptr_ When the vector is destroyed, it can release the Thread object
		g_vec.clear();
		start = Timestamp::now();
		for (int i = 0; i < nthreads; ++i)
		{
			threads.push_back(new Thread(&threadFunc));
			threads.back().start();
		}
		for (int i = 0; i < nthreads; ++i)
		{
			threads[i].join();
		}
		printf("%d thread(s) with lock %f\n", nthreads, timeDifference(Timestamp::now(), start));
	}
}
2 conditional variables

This class can be used for all child threads to wait for the main thread to start, or it can be used to wait for the child thread to initialize before working

doxygen is set for display convenience

$sudo apt install graphviz   # Used to generate code diagrams 
$sudo apt install doxygen
$ cd CODE_DIR
$ doxygen -g Doxygen.config   # Generate profile 
$ vim Doxygen.config          # Modify profile

RECURSIVE              = YES 
$ doxygen Doxygen.config      # Generate documents from code

Set config to display private and static variables

muduo::CountDownLatch class reference

Class muduo::CountDownLatch inheritance diagram:

Collaboration diagram of muduo::CountDownLatch:

[legend]

Public member function
CountDownLatch (int count)
voidwait ()
voidcountDown ()
intgetCount () const
Private property
MutexLockmutex_ // mutex
Conditioncondition_ // Conditional variable
intcount_ // Reference count

Documents for this class are generated from the following files:

class CountDownLatch : boost::noncopyable
{
 public:
  explicit CountDownLatch(int count);
  void wait();
  void countDown();
  int getCount() const;
 private:
  mutable MutexLock mutex_; // Changeable
  Condition condition_; // The class of a conditional variable is an aggregate relationship that controls its life cycle
  int count_; // 
};
~Condition()
  {
    // The virtual destructor of the condition variable will detect the release condition variable
    MCHECK(pthread_cond_destroy(&pcond_));
  }

Standard writing for preventing false Awakening

void CountDownLatch::wait()
{
  MutexLockGuard lock(mutex_);
  while (count_ > 0) {
    condition_.wait();
  }
}

The MutexLockGuard class is also used to automatically lock and unlock

int CountDownLatch::getCount() const
{
  MutexLockGuard lock(mutex_); // const member function, because the mutable keyword was declared earlier, because its state needs to be changed internally
  return count_; // 
}

7 bounded queue and unbounded queue

[external chain picture transfer failed. The source station may have anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-vhtyevao-1641279265269)( https://gitee.com/ak47chen111/cloud-library.git/https://gitee.com/ak47chen111/cloud -library. git/image-20220104145403436. png)]

Public member function
voidput (const T &x)
Ttake ()
size_tsize () const
Private property
MutexLockmutex_
ConditionnotEmpty_
std::deque< T >queue_

Documents for this class are generated from the following files:

Extract element

T take()
  {
    MutexLockGuard lock(mutex_);
    while (queue_.empty())   // The standard way to prevent false wake-up is to lock first, and then fall into the activation of waiting condition variables
    {
      notEmpty_.wait();
    }
    assert(!queue_.empty()); // Assertion queue is not empty
    T front(queue_.front()); // Take out the first element
    queue_.pop_front();
    return front;
  }
1 example code

Count the number of times it takes to get in and out of the team

class Bench
{
public:
	Bench(int numThreads)
		: latch_(numThreads),
		  threads_(numThreads)
	{
		for (int i = 0; i < numThreads; ++i)
		{
			char name[32];
			snprintf(name, sizeof name, "work thread %d", i);
			threads_.push_back(new muduo::Thread(
				boost::bind(&Bench::threadFunc, this), muduo::string(name)));
		}
		for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::start, _1));
	}

	void run(int times)
	{
		printf("waiting for count down latch\n");
		latch_.wait();
		printf("all threads started\n");
		for (int i = 0; i < times; ++i)
		{
			muduo::Timestamp now(muduo::Timestamp::now());
			queue_.put(now);
			usleep(1000);
			// Timestamp of production product
		}
	}

	void joinAll()
	{
		for (size_t i = 0; i < threads_.size(); ++i)
		{
			queue_.put(muduo::Timestamp::invalid()); // Give each thread an illegal time to end
		}

		for_each(threads_.begin(), threads_.end(), boost::bind(&muduo::Thread::join, _1));
	}

private:
	void threadFunc()
	{
		printf("tid=%d, %s started\n",
			   muduo::CurrentThread::tid(),
			   muduo::CurrentThread::name());

		std::map<int, int> delays;
		latch_.countDown();
		bool running = true;
		while (running)
		{
			muduo::Timestamp t(queue_.take());  // Take the element of storage time from the queue
			muduo::Timestamp now(muduo::Timestamp::now());
			if (t.valid()) // Judge whether the time is legal
			{
				int delay = static_cast<int>(timeDifference(now, t) * 1000000);
				++delays[delay]; // Count the number of times the time slice appears
			}
			running = t.valid(); // It's an illegal time to jump out of the loop
		}

		printf("tid=%d, %s stopped\n",
			   muduo::CurrentThread::tid(),
			   muduo::CurrentThread::name());
		for (std::map<int, int>::iterator it = delays.begin();
			 it != delays.end(); ++it)
		{
			printf("tid = %d, delay = %d, count = %d\n",
				   muduo::CurrentThread::tid(),
				   it->first, it->second);
		}
	}

	muduo::BlockingQueue<muduo::Timestamp> queue_; // queue
	muduo::CountDownLatch latch_;			   //  count down
	boost::ptr_vector<muduo::Thread> threads_; // Thread container
};
int main(int argc, char *argv[])
{
	int threads = argc > 1 ? atoi(argv[1]) : 1;
	
	Bench t(threads);
	t.run(10000); // Production of 10000 products
	t.joinAll();  // The main thread produces an illegal time for all threads to end
}

8 thread pool object

[external chain picture transfer failed. The source station may have anti-theft chain mechanism. It is recommended to save the picture and upload it directly (img-askaxx7i-1641279265270)( https://gitee.com/ak47chen111/cloud-library.git/https://gitee.com/ak47chen111/cloud -library. git/image-20220104114947374. png)]

Public type
typedef boost::function< void()>Task
Register a function of void type, which is equivalent to a function pointer
Public member function
ThreadPool (const string &name=string())
voidstart (int numThreads)
voidstop ()
voidrun (const Task &f)
Private member function
voidrunInThread ()
Tasktake ()
Private property
MutexLockmutex_
Conditioncond_
stringname_
boost::ptr_vector< muduo::Thread >threads_
std::deque< Task >queue_
boolrunning_

Documents for this class are generated from the following files:

boost::ptr_vector<muduo::Thread> threads_;

//The container of thread object is muduo::Thread instead of pointer muduo::Thread*

//Maybe the author doesn't want to set the separation property, but let the thread pool object control the life cycle of the thread object

1 the focus is on the start function
void ThreadPool::start(int numThreads)
{
    assert(threads_.empty());       // Asserts whether the thread pool is empty
    running_ = true;
    threads_.reserve(numThreads);   // Reserved space
    for (int i = 0; i < numThreads; ++i)
    {
        char id[32];
        snprintf(id, sizeof id, "%d", i);
        threads_.push_back(new muduo::Thread(boost::bind(&ThreadPool::runInThread, this), name_ + id)); 
        					 // Create a thread object and set the name of the thread
        threads_[i].start(); // Thread object create thread
    }
}

runInThread is the task assigned when the thread is started

void ThreadPool::runInThread()
{
    try
    {

        while (running_)
        {
            Task task(take()); // ThreadPool::Task ThreadPool::take()
            if (task)
            {
                task();
            }
        }
    }
	// ......
}

threads_.push_back(new muduo::Thread(boost::bind(&ThreadPool::runInThread, this), name_ + id));

threads_[i].start(); It is the core key, especially the essence, and the wonderful pen of muduo thread pool

Look back at the details of the Thread class

Delete some redundant information

Public type
typedef boost::function< void()>ThreadFunc
Private property
boolstarted_
booljoined_
pthread_tpthreadId_
boost::shared_ptr< pid_t >tid_
ThreadFuncfunc_
stringname_

Documents for this class are generated from the following files:

ThreadFunc registers a type, which is the return value of void. It needs to be initialized when the constructor of this class

Thread::Thread(const ThreadFunc &func, const string &n)
    : started_(false),
      pthreadId_(0),
      tid_(0),
      func_(func),
      name_(n)
{
    numCreated_.increment();
}

Call method

muduo::Thread *th = new muduo::Thread(boost::bind(&ThreadPool::runInThread,this),"test001"+id);

The func of this thread class is assigned to tasks by ThreadPool. Different tasks can be bound, including ordinary functions, member functions and static member functions

threads_. push_ back(new muduo::Thread(boost::bind(&ThreadPool::runInThread, this), name_ + id)); It is to assign different task types when creating thread objects. For example, read and write are separated. The read thread is dedicated to reading tasks, and the write thread is dedicated to writing tasks. Tasks involving database operations can be put into db type tasks.

Thread start

 assert(!started_);
    started_ = true;
    errno = pthread_create(&pthreadId_, NULL, &startThread, this);
    if (errno != 0)
    {
        //LOG_SYSFATAL << "Failed in pthread_create";
    }
2 example code
#include <muduo/base/ThreadPool.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/base/CurrentThread.h>

#include <boost/bind.hpp>
#include <stdio.h>

void print()
{
	printf("tid=%d\n", muduo::CurrentThread::tid());
}

void printString(const std::string &str)
{
	printf("tid=%d, str=%s\n", muduo::CurrentThread::tid(), str.c_str());
}

int main()
{
	muduo::ThreadPool pool("MainThreadPool");  // Create thread pool
	pool.start(5);

	pool.run(print);	// Two print normal member functions are added to run the task
	pool.run(print);
	for (int i = 0; i < 100; ++i)
	{
		char buf[32];
		snprintf(buf, sizeof buf, "task %d", i);
		pool.run(boost::bind(printString, std::string(buf)));
		// Add a hundred ordinary functions
	}

	muduo::CountDownLatch latch(1);
	pool.run(boost::bind(&muduo::CountDownLatch::countDown, &latch));
	// To add a member function, you need to add the this pointer
	latch.wait();
	pool.stop();
}

pool.run

If the thread queue is empty, execute the task directly, otherwise insert the task into the task queue.

void ThreadPool::run(const Task &task)
{
    if (threads_.empty())
    {
        task(); 
    }
    else
    {
        MutexLockGuard lock(mutex_);
        queue_.push_back(task);
        cond_.notify();
    }
}

code

#include <muduo/base/ThreadPool.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/base/CurrentThread.h>

#include <boost/bind.hpp>
#include <stdio.h>

void print()
{
	printf("tid=%d\n", muduo::CurrentThread::tid());
}

void printString(const std::string &str)
{
	printf("tid=%d, str=%s\n", muduo::CurrentThread::tid(), str.c_str());
}

int main()
{
	muduo::ThreadPool pool("MainThreadPool");  // Create thread pool
	pool.start(5);

	pool.run(print);	// Two print normal member functions are added to run the task
	pool.run(print);
	for (int i = 0; i < 100; ++i)
	{
		char buf[32];
		snprintf(buf, sizeof buf, "task %d", i);
		pool.run(boost::bind(printString, std::string(buf)));
		// Add a hundred ordinary functions
	}

	muduo::CountDownLatch latch(1);
	pool.run(boost::bind(&muduo::CountDownLatch::countDown, &latch));
	// To add a member function, you need to add the this pointer
	latch.wait();
	pool.stop();
}

pool.run

If the thread queue is empty, execute the task directly, otherwise insert the task into the task queue.

void ThreadPool::run(const Task &task)
{
    if (threads_.empty())
    {
        task(); 
    }
    else
    {
        MutexLockGuard lock(mutex_);
        queue_.push_back(task);
        cond_.notify();
    }
}

Keywords: network

Added by goodluck4287 on Tue, 04 Jan 2022 20:07:43 +0200