Muduo asynchronous log
Let's first look at the definition of the AsyncLogging class
class AsyncLogging : noncopyable { public: AsyncLogging(const string& basename, off_t rollSize, int flushInterval = 3); ~AsyncLogging() { if (running_) { stop(); } } void append(const char* logline, int len); void start() { running_ = true; thread_.start(); latch_.wait(); } void stop() NO_THREAD_SAFETY_ANALYSIS { running_ = false; cond_.notify(); thread_.join(); } private: void threadFunc(); typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer; //Buffer size 4000 * 1000 typedef std::vector<std::unique_ptr<Buffer>> BufferVector; typedef BufferVector::value_type BufferPtr; const int flushInterval_;// std::atomic<bool> running_; const string basename_; const off_t rollSize_;//Scroll size muduo::Thread thread_; muduo::CountDownLatch latch_; muduo::MutexLock mutex_; muduo::Condition cond_ GUARDED_BY(mutex_); BufferPtr currentBuffer_ GUARDED_BY(mutex_); BufferPtr nextBuffer_ GUARDED_BY(mutex_); BufferVector buffers_ GUARDED_BY(mutex_); }; } // namespace muduo
Before looking at the specific implementation of this class, first look at the auxiliary classes used in it
class LogFile : noncopyable { public: LogFile(const string& basename, off_t rollSize, bool threadSafe = true, int flushInterval = 3, int checkEveryN = 1024); ~LogFile(); void append(const char* logline, int len); void flush(); bool rollFile(); private: void append_unlocked(const char* logline, int len); static string getLogFileName(const string& basename, time_t* now); const string basename_; const off_t rollSize_;//Write size const int flushInterval_; const int checkEveryN_;//Number of log entries in this file int count_; std::unique_ptr<MutexLock> mutex_; time_t startOfPeriod_; time_t lastRoll_; time_t lastFlush_; std::unique_ptr<FileUtil::AppendFile> file_; const static int kRollPerSeconds_ = 60*60*24; };
The FileUtil class is as follows
class AppendFile : noncopyable { public: explicit AppendFile(StringArg filename); ~AppendFile(); void append(const char* logline, size_t len); void flush(); off_t writtenBytes() const { return writtenBytes_; } private: size_t write(const char* logline, size_t len); FILE* fp_; char buffer_[64*1024]; off_t writtenBytes_; }; } // namespace FileUtil } // namespace muduo
The StringArg class is a class that passes the string type of c language. The definition is as follows
class StringArg // copyable { public: StringArg(const char* str) : str_(str) { } StringArg(const string& str) : str_(str.c_str()) { } const char* c_str() const { return str_; } private: const char* str_; };
The constructor of AppendFile is as follows
FileUtil::AppendFile::AppendFile(StringArg filename) : fp_(::fopen(filename.c_str(), "ae")), // 'e' for O_CLOEXEC writtenBytes_(0) { assert(fp_); ::setbuffer(fp_, buffer_, sizeof buffer_); // posix_fadvise POSIX_FADV_DONTNEED ? }
setbuffer sets the buffer size corresponding to the file stream
The following example
#include<iostream> #include<cstring> #include<unistd.h> using namespace std; int main(){ char *buf=(char*)malloc(sizeof(char)*1024*10); setbuffer(stdout,buf,1024*10); char buffer[10]; memset(buffer,'a',sizeof(buffer)); for(int i=0;i<1000;++i) printf("%s",buffer); sleep(10); while(1); }
There will never be output here, because the standard output belongs to the line buffer. Now the buffer size is changed to 10 times 1024. A total of 10000 bytes are output. The buffer is not full and there is no line feed. The program is an endless loop and cannot end, so it cannot be output
Generally speaking, for terminals such as standard input, standard output belongs to line buffer. It will be output only when line feed or buffer is full
Standard error no buffer to print error message directly
Let's look at the constructor of AppendFile, which sets the buffer to buffer_ Size of
append writes len bytes of logline
void FileUtil::AppendFile::append(const char* logline, const size_t len) { size_t written = 0; while (written != len) { size_t remain = len - written; size_t n = write(logline + written, remain); if (n != remain) { int err = ferror(fp_); if (err) { fprintf(stderr, "AppendFile::append() failed %s\n", strerror_tl(err)); break; } } written += n; } writtenBytes_ += written; }
The write function is as follows
size_t FileUtil::AppendFile::write(const char* logline, size_t len) { // #undef fwrite_unlocked return ::fwrite_unlocked(logline, 1, len, fp_); }
fwrite_unlocked is the thread unsafe version of fwrite. As for why this is used here, it will not be revealed until the AsyncLogging class is analyzed
Next, let's look at the previous LogFile class constructor
LogFile::LogFile(const string& basename, off_t rollSize, bool threadSafe, int flushInterval, int checkEveryN) : basename_(basename),//file name rollSize_(rollSize),//Maximum byte flushInterval_(flushInterval),//The refresh interval is the flush interval checkEveryN_(checkEveryN),//Maximum number of logs count_(0),//Current number of logs mutex_(threadSafe ? new MutexLock : NULL), startOfPeriod_(0),//Time of the previous day lastRoll_(0),//Last refresh time lastFlush_(0) { //Check whether basename is legal assert(basename.find('/') == string::npos); rollFile(); }
bool LogFile::rollFile() { time_t now = 0; string filename = getLogFileName(basename_, &now);//Get file name time_t start = now / kRollPerSeconds_ * kRollPerSeconds_;Namely(now-now%KRollPerSeconds_) if (now > lastRoll_) { lastRoll_ = now; lastFlush_ = now; startOfPeriod_ = start; file_.reset(new FileUtil::AppendFile(filename));//Change a file return true; } return false; }
//File name of the generated log string LogFile::getLogFileName(const string& basename, time_t* now) { string filename; filename.reserve(basename.size() + 64); filename = basename; char timebuf[32]; struct tm tm; *now = time(NULL); gmtime_r(now, &tm); // FIXME: localtime_r ? strftime(timebuf, sizeof timebuf, ".%Y%m%d-%H%M%S.", &tm); filename += timebuf; filename += ProcessInfo::hostname(); char pidbuf[32]; snprintf(pidbuf, sizeof pidbuf, ".%d", ProcessInfo::pid()); filename += pidbuf; filename += ".log"; return filename; }
Add information to the current log. The append function is as follows
void LogFile::append(const char* logline, int len) { if (mutex_) { MutexLockGuard lock(*mutex_); append_unlocked(logline, len); } else { append_unlocked(logline, len); } }
MutexLockGuard is used here, so append_unlocked does not require any locking operation. It is in this function that AppendFile's append is called, and then fwrite is called_ Unlocked function
void LogFile::append_unlocked(const char* logline, int len) { file_->append(logline, len); if (file_->writtenBytes() > rollSize_) { rollFile(); } else { ++count_; if (count_ >= checkEveryN_) { count_ = 0; time_t now = ::time(NULL); time_t thisPeriod_ = now / kRollPerSeconds_ * kRollPerSeconds_; if (thisPeriod_ != startOfPeriod_) { rollFile(); } else if (now - lastFlush_ > flushInterval_) { lastFlush_ = now; file_->flush(); } } } }
When the size of the AppendFile as a buffer is larger than the current rollSize, a new log file needs to be regenerated
Otherwise, update the number of logs. If the current number of logs is greater than the maximum value
If the refresh interval is greater than the number of days set in the current disk, the refresh interval will not be the same as that set in the previous disk
be careful!! When the log is written too fast in the file, the actual written file size will be different from the expected file size. Although rollFile is used every time, the filename may be the same multiple times, resulting in a large difference from the expected size
The log commonly used in multithreaded programs does not directly perform disk IO, but uses the asynchronous log method to open a thread separately to write the log
Now let's move on to the AsynLogging class, which takes double buffering
AsyncLogging::AsyncLogging(const string& basename, off_t rollSize, int flushInterval) : flushInterval_(flushInterval),//flush interval running_(false),//Whether the thread is running basename_(basename), rollSize_(rollSize),//The approximate log size is written too fast, resulting in size mismatch thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"), latch_(1),//A necessary step mutex_(), cond_(mutex_), currentBuffer_(new Buffer), nextBuffer_(new Buffer), buffers_() { currentBuffer_->bzero(); nextBuffer_->bzero(); buffers_.reserve(16); }
append function
void AsyncLogging::append(const char* logline, int len) { muduo::MutexLockGuard lock(mutex_); if (currentBuffer_->avail() > len) { currentBuffer_->append(logline, len); } else { buffers_.push_back(std::move(currentBuffer_)); if (nextBuffer_) { currentBuffer_ = std::move(nextBuffer_); } else { currentBuffer_.reset(new Buffer); // Rarely happens } currentBuffer_->append(logline, len); cond_.notify(); } }
Front end operation: if the current buffer can hold the data, it will be added directly. Otherwise, the next buffer will be used to move it to the current buffer
Back end operation
void AsyncLogging::threadFunc() { assert(running_ == true); latch_.countDown(); LogFile output(basename_, rollSize_, false); BufferPtr newBuffer1(new Buffer); BufferPtr newBuffer2(new Buffer); newBuffer1->bzero(); newBuffer2->bzero(); BufferVector buffersToWrite; buffersToWrite.reserve(16); while (running_) { assert(newBuffer1 && newBuffer1->length() == 0); assert(newBuffer2 && newBuffer2->length() == 0); assert(buffersToWrite.empty()); { muduo::MutexLockGuard lock(mutex_); //One of the two conditions can be met if (buffers_.empty()) // unusual usage! { cond_.waitForSeconds(flushInterval_); } buffers_.push_back(std::move(currentBuffer_)); currentBuffer_ = std::move(newBuffer1); buffersToWrite.swap(buffers_); if (!nextBuffer_) { nextBuffer_ = std::move(newBuffer2); } } assert(!buffersToWrite.empty()); if (buffersToWrite.size() > 25) { char buf[256]; snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n", Timestamp::now().toFormattedString().c_str(), buffersToWrite.size()-2); fputs(buf, stderr); output.append(buf, static_cast<int>(strlen(buf))); buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end()); } for (const auto& buffer : buffersToWrite) { // FIXME: use unbuffered stdio FILE ? or use ::writev ? output.append(buffer->data(), buffer->length()); } if (buffersToWrite.size() > 2) { // drop non-bzero-ed buffers, avoid trashing buffersToWrite.resize(2); } if (!newBuffer1) { assert(!buffersToWrite.empty()); newBuffer1 = std::move(buffersToWrite.back()); buffersToWrite.pop_back(); newBuffer1->reset(); } if (!newBuffer2) { assert(!buffersToWrite.empty()); newBuffer2 = std::move(buffersToWrite.back()); buffersToWrite.pop_back(); newBuffer2->reset(); } buffersToWrite.clear(); output.flush(); } output.flush(); }
The backend uses writeBufferVector to shorten the critical area and add the current buffer to the buffers_ Move newBuffer to current buffer in. If nextBuffer is also used due to too much data written to the front end, move newBuffer2 to nextBuffer exchange buffers_ And writeBufferVector ensure that two buffers are available after exiting the buffer, namely newBuffer1 and newBuffer2