Muduo source code

Muduo asynchronous log

Let's first look at the definition of the AsyncLogging class

class AsyncLogging : noncopyable
{
 public:

  AsyncLogging(const string& basename,
               off_t rollSize,
               int flushInterval = 3);

  ~AsyncLogging()
  {
    if (running_)
    {
      stop();
    }
  }

  void append(const char* logline, int len);

  void start()
  {
    running_ = true;
    thread_.start();
    latch_.wait();
  }

  void stop() NO_THREAD_SAFETY_ANALYSIS
  {
    running_ = false;
    cond_.notify();
    thread_.join();
  }

 private:

  void threadFunc();

  typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer; //Buffer size 4000 * 1000
  typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
  typedef BufferVector::value_type BufferPtr;

  const int flushInterval_;//
  std::atomic<bool> running_;
  const string basename_;
  const off_t rollSize_;//Scroll size
  muduo::Thread thread_;
  muduo::CountDownLatch latch_;
  muduo::MutexLock mutex_;
  muduo::Condition cond_ GUARDED_BY(mutex_);
  BufferPtr currentBuffer_ GUARDED_BY(mutex_);
  BufferPtr nextBuffer_ GUARDED_BY(mutex_);
  BufferVector buffers_ GUARDED_BY(mutex_);
};
}  // namespace muduo

Before looking at the specific implementation of this class, first look at the auxiliary classes used in it

class LogFile : noncopyable
{
 public:
  LogFile(const string& basename,
          off_t rollSize,
          bool threadSafe = true,
          int flushInterval = 3,
          int checkEveryN = 1024);
  ~LogFile();

  void append(const char* logline, int len);
  void flush();
  bool rollFile();

 private:
  void append_unlocked(const char* logline, int len);

  static string getLogFileName(const string& basename, time_t* now);

  const string basename_;
  const off_t rollSize_;//Write size
  const int flushInterval_;
  const int checkEveryN_;//Number of log entries in this file

  int count_;

  std::unique_ptr<MutexLock> mutex_;
  time_t startOfPeriod_;
  time_t lastRoll_;
  time_t lastFlush_;
  std::unique_ptr<FileUtil::AppendFile> file_;

  const static int kRollPerSeconds_ = 60*60*24;
};

The FileUtil class is as follows

class AppendFile : noncopyable
{
 public:
  explicit AppendFile(StringArg filename);

  ~AppendFile();

  void append(const char* logline, size_t len);

  void flush();

  off_t writtenBytes() const { return writtenBytes_; }

 private:

  size_t write(const char* logline, size_t len);

  FILE* fp_;
  char buffer_[64*1024];
  off_t writtenBytes_;
};

}  // namespace FileUtil
}  // namespace muduo

The StringArg class is a class that passes the string type of c language. The definition is as follows

class StringArg // copyable
{
 public:
  StringArg(const char* str)
    : str_(str)
  { }

  StringArg(const string& str)
    : str_(str.c_str())
  { }

  const char* c_str() const { return str_; }

 private:
  const char* str_;
};

The constructor of AppendFile is as follows

FileUtil::AppendFile::AppendFile(StringArg filename)
  : fp_(::fopen(filename.c_str(), "ae")),  // 'e' for O_CLOEXEC
    writtenBytes_(0)
{
  assert(fp_);
  ::setbuffer(fp_, buffer_, sizeof buffer_);
  // posix_fadvise POSIX_FADV_DONTNEED ?
}

setbuffer sets the buffer size corresponding to the file stream
The following example


#include<iostream>
#include<cstring>
#include<unistd.h>
using namespace std;

int main(){

    char *buf=(char*)malloc(sizeof(char)*1024*10);
    setbuffer(stdout,buf,1024*10);
    char buffer[10];
    memset(buffer,'a',sizeof(buffer));
    for(int i=0;i<1000;++i)
        printf("%s",buffer);
    sleep(10);
    while(1);
}

There will never be output here, because the standard output belongs to the line buffer. Now the buffer size is changed to 10 times 1024. A total of 10000 bytes are output. The buffer is not full and there is no line feed. The program is an endless loop and cannot end, so it cannot be output
Generally speaking, for terminals such as standard input, standard output belongs to line buffer. It will be output only when line feed or buffer is full
Standard error no buffer to print error message directly

Let's look at the constructor of AppendFile, which sets the buffer to buffer_ Size of
append writes len bytes of logline

void FileUtil::AppendFile::append(const char* logline, const size_t len)
{
  size_t written = 0;

  while (written != len)
  {
    size_t remain = len - written;
    size_t n = write(logline + written, remain);
    if (n != remain)
    {
      int err = ferror(fp_);
      if (err)
      {
        fprintf(stderr, "AppendFile::append() failed %s\n", strerror_tl(err));
        break;
      }
    }
    written += n;
  }

  writtenBytes_ += written;
}

The write function is as follows

size_t FileUtil::AppendFile::write(const char* logline, size_t len)
{
  // #undef fwrite_unlocked
  return ::fwrite_unlocked(logline, 1, len, fp_);
}

fwrite_unlocked is the thread unsafe version of fwrite. As for why this is used here, it will not be revealed until the AsyncLogging class is analyzed
Next, let's look at the previous LogFile class constructor

LogFile::LogFile(const string& basename,
                 off_t rollSize,
                 bool threadSafe,
                 int flushInterval,
                 int checkEveryN)
  : basename_(basename),//file name
    rollSize_(rollSize),//Maximum byte
    flushInterval_(flushInterval),//The refresh interval is the flush interval
    checkEveryN_(checkEveryN),//Maximum number of logs
    count_(0),//Current number of logs
    mutex_(threadSafe ? new MutexLock : NULL),
    startOfPeriod_(0),//Time of the previous day
    lastRoll_(0),//Last refresh time
    lastFlush_(0)
{
//Check whether basename is legal
  assert(basename.find('/') == string::npos);
  rollFile();
}
bool LogFile::rollFile()
{
  time_t now = 0;
  string filename = getLogFileName(basename_, &now);//Get file name
  time_t start = now / kRollPerSeconds_ * kRollPerSeconds_;Namely(now-now%KRollPerSeconds_)

  if (now > lastRoll_)
  {
    lastRoll_ = now;
    lastFlush_ = now;
    startOfPeriod_ = start;
    file_.reset(new FileUtil::AppendFile(filename));//Change a file
    return true;
  }
  return false;
}
//File name of the generated log
string LogFile::getLogFileName(const string& basename, time_t* now)
{
  string filename;
  filename.reserve(basename.size() + 64);
  filename = basename;

  char timebuf[32];
  struct tm tm;
  *now = time(NULL);
  gmtime_r(now, &tm); // FIXME: localtime_r ?
  strftime(timebuf, sizeof timebuf, ".%Y%m%d-%H%M%S.", &tm);
  filename += timebuf;

  filename += ProcessInfo::hostname();

  char pidbuf[32];
  snprintf(pidbuf, sizeof pidbuf, ".%d", ProcessInfo::pid());
  filename += pidbuf;

  filename += ".log";

  return filename;
}

Add information to the current log. The append function is as follows

void LogFile::append(const char* logline, int len)
{
  if (mutex_)
  {
    MutexLockGuard lock(*mutex_);
    append_unlocked(logline, len);
  }
  else
  {
    append_unlocked(logline, len);
  }
}

MutexLockGuard is used here, so append_unlocked does not require any locking operation. It is in this function that AppendFile's append is called, and then fwrite is called_ Unlocked function

void LogFile::append_unlocked(const char* logline, int len)
{
  file_->append(logline, len);

  if (file_->writtenBytes() > rollSize_)
  {
    rollFile();
  }
  else
  {
    ++count_;
    if (count_ >= checkEveryN_)
    {
      count_ = 0;
      time_t now = ::time(NULL);
      time_t thisPeriod_ = now / kRollPerSeconds_ * kRollPerSeconds_;
      if (thisPeriod_ != startOfPeriod_)
      {
        rollFile();
      }
      else if (now - lastFlush_ > flushInterval_)
      {
        lastFlush_ = now;
        file_->flush();
      }
    }
  }
}

When the size of the AppendFile as a buffer is larger than the current rollSize, a new log file needs to be regenerated
Otherwise, update the number of logs. If the current number of logs is greater than the maximum value
If the refresh interval is greater than the number of days set in the current disk, the refresh interval will not be the same as that set in the previous disk

be careful!! When the log is written too fast in the file, the actual written file size will be different from the expected file size. Although rollFile is used every time, the filename may be the same multiple times, resulting in a large difference from the expected size

The log commonly used in multithreaded programs does not directly perform disk IO, but uses the asynchronous log method to open a thread separately to write the log

Now let's move on to the AsynLogging class, which takes double buffering

AsyncLogging::AsyncLogging(const string& basename,
                           off_t rollSize,
                           int flushInterval)
  : flushInterval_(flushInterval),//flush interval
    running_(false),//Whether the thread is running
    basename_(basename),
    rollSize_(rollSize),//The approximate log size is written too fast, resulting in size mismatch
    thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"),
    latch_(1),//A necessary step
    mutex_(),
    cond_(mutex_),
    currentBuffer_(new Buffer),
    nextBuffer_(new Buffer),
    buffers_()
{
  currentBuffer_->bzero();
  nextBuffer_->bzero();
  buffers_.reserve(16);
}

append function

void AsyncLogging::append(const char* logline, int len)
{
  muduo::MutexLockGuard lock(mutex_);
  if (currentBuffer_->avail() > len)
  {
    currentBuffer_->append(logline, len);
  }
  else
  {
    buffers_.push_back(std::move(currentBuffer_));

    if (nextBuffer_)
    {
      currentBuffer_ = std::move(nextBuffer_);
    }
    else
    {
      currentBuffer_.reset(new Buffer); // Rarely happens
    }
    currentBuffer_->append(logline, len);
    cond_.notify();
  }
}

Front end operation: if the current buffer can hold the data, it will be added directly. Otherwise, the next buffer will be used to move it to the current buffer
Back end operation

void AsyncLogging::threadFunc()
{
  assert(running_ == true);
  latch_.countDown();
  LogFile output(basename_, rollSize_, false);
  BufferPtr newBuffer1(new Buffer);
  BufferPtr newBuffer2(new Buffer);
  newBuffer1->bzero();
  newBuffer2->bzero();
  BufferVector buffersToWrite;
  buffersToWrite.reserve(16);
  while (running_)
  {
    assert(newBuffer1 && newBuffer1->length() == 0);
    assert(newBuffer2 && newBuffer2->length() == 0);
    assert(buffersToWrite.empty());

    {
      muduo::MutexLockGuard lock(mutex_);
      //One of the two conditions can be met
      if (buffers_.empty())  // unusual usage!
      {
        cond_.waitForSeconds(flushInterval_);
      }
      buffers_.push_back(std::move(currentBuffer_));
      currentBuffer_ = std::move(newBuffer1);
      buffersToWrite.swap(buffers_);
      if (!nextBuffer_)
      {
        nextBuffer_ = std::move(newBuffer2);
      }
    }

    assert(!buffersToWrite.empty());

    if (buffersToWrite.size() > 25)
    {
      char buf[256];
      snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
               Timestamp::now().toFormattedString().c_str(),
               buffersToWrite.size()-2);
      fputs(buf, stderr);
      output.append(buf, static_cast<int>(strlen(buf)));
      buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
    }

    for (const auto& buffer : buffersToWrite)
    {
      // FIXME: use unbuffered stdio FILE ? or use ::writev ?
      output.append(buffer->data(), buffer->length());
    }

    if (buffersToWrite.size() > 2)
    {
      // drop non-bzero-ed buffers, avoid trashing
      buffersToWrite.resize(2);
    }

    if (!newBuffer1)
    {
      assert(!buffersToWrite.empty());
      newBuffer1 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer1->reset();
    }

    if (!newBuffer2)
    {
      assert(!buffersToWrite.empty());
      newBuffer2 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer2->reset();
    }

    buffersToWrite.clear();
    output.flush();
  }
  output.flush();
}

The backend uses writeBufferVector to shorten the critical area and add the current buffer to the buffers_ Move newBuffer to current buffer in. If nextBuffer is also used due to too much data written to the front end, move newBuffer2 to nextBuffer exchange buffers_ And writeBufferVector ensure that two buffers are available after exiting the buffer, namely newBuffer1 and newBuffer2

Added by Shellfishman on Sat, 19 Feb 2022 07:01:50 +0200