Muduo 异步log的实现
1. logging
日志(logging) 有两个意思:
- 诊断日志: 即是我们日常debug 使用的文本文件记录trace。常用的log 有log4j, logback, log4cpp, ezlogger等常用的日志库。
- 交易日志: 即是数据库的write-ahead log, 文件系统的journaling 等, 用于记录状态的变更,通过回访日志可以逐步恢复每一次修改之后的状态。
这篇文章我们主要介绍下muduo的log的实现,我觉得挺有意思的。muduo的日志库是C++ stream 风格的, 不需要保持格式字符串和参数类型的一致性, 可以随用随写,而且是类型安全的。我们需要的日志功能主要有下面几个需求:
- 日志消息有多种级别(level), 如TRACE, DEBUG, INFO, WARN, FATAL 等。
- 调整日志级别不需要重新编译,也不需要重启进程, 只要调用setloglevel() 就能即时6生效。
- 往本地存储写文件时,考虑日志文件的滚动。文件大小(例如每写满1GB就换下一个文件)和时间(例如每天零点新建一个日志文件,不论前一个文件有没有写满)
注意: 往文件中写日志的一个常见的问题时,万一程序崩溃,那么最后若干条日志就丢失了,因为日志库不能每条消息都flush硬盘,更不能每条日志都open/close文件,这样性能开销太大。 muduo采用两个办法来应对这一点:
- 定期(默认3s)将缓冲区内的日志消息flush到硬盘;
- 每条内存的日志消息都带有cookie, 其值为某个函数的地址,这样通过core dump 文件中查找cookie就能找到尚未来得及写入磁盘的消息。
日志消息的格式有几个要点:
- 尽量每条日志占一行。
- 时间戳精确到微秒。 muduo的每条消息时通过gettimeofday(2) 获得当前时间, 这个函数不是系统调用, 不会有什么性能损失。
- 打印线程的id。
- 打印日志的级别
- 打印源文件名和行号。
推荐一个零声学院免费教程,个人觉得老师讲得不错,分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,
fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,
TCP/IP,协程,DPDK等技术内容,点击立即学习:链接
2. 性能需求
编写linux 服务端程序的时候,我们需要一个高效的日志库。
- 每秒写几千上万条日志的时候没有明显的i性能损失
- 能应对一个进程产生大量的日志数据的场景, 例如 1GM/min
- 不阻塞正常的执行流程
- 在多线程中,不造成race condition.
为了满足这样的性能指标, muduo日志库的实现有几个优化措施。
- 时间戳字符串中的日期和时间两部分时缓存的, 一秒之内的多条日志只需重新格式化微秒部分。
- 日志消息的前4个字段时定长的,避免在运行期求字符串的长度。
- 线程id时格式化为字符串。
- 每行日志消息的源文件名部分采用了编译器计算来获得basename, 避免运行期strchr开销。
3. muduo的日志库分析
muduo的日志库采用的是双缓冲(double buffering)的技术. 准备两块buffer: A和B, 前端负责往buffer A填充数据, 后端负责将buffer B的数据写入文件。
当buffer A写满之后,交换A和B, 让后端将buffer A的数据写入文件,而前端则往buffer B填入新的日志消息,如此往复。用两个buffer的好处是在新建日志消息的时候不必等待磁盘文件操作, 也避免每条新日志消息都触发后端日志线程。换言之,前端将多条日志消息拼成一个大的buffer传送给后端,相当于批处理,减少了线程唤醒的频度,降低开销。muduo异步日志的性能开销大约是前端每写一条日志消息耗时1.0us ~ 1.6us.
在介绍AsyncLogging
文件之前,我们先看下LogFile
和 Logger
, LogStream
,FixBuffer
的实现。
LogFile
logfile
比较简单,主要是rollfile的实现比较有意思。 我们先看下头文件。
class LogFile : noncopyable // 不可被复制
{public:LogFile(const string& basename, // basename 文件名off_t rollSize, // rollSize 设置roll的大小的阈值,大于这个size的大小就开始roll. bool threadSafe = true, // 是否为线程安全, 为mutex 服务int flushInterval = 3, // 几秒一次flushint checkEveryN = 1024); // 检查roll 或者 flush的 次数~LogFile();void append(const char* logline, int len); // 给外部调用的只有这三个函数void flush();bool rollFile();private:void append_unlocked(const char* logline, int len);static string getLogFileName(const string& basename, time_t* now);const string basename_;const off_t rollSize_;const int flushInterval_;const int checkEveryN_;int count_;std::unique_ptr<MutexLock> mutex_;time_t startOfPeriod_;time_t lastRoll_;time_t lastFlush_;std::unique_ptr<FileUtil::AppendFile> file_; // 文件操作const static int kRollPerSeconds_ = 60*60*24;
};
cpp文件, 构造函数比较简单。
LogFile::LogFile(const string& basename,off_t rollSize,bool threadSafe,int flushInterval,int checkEveryN): basename_(basename),rollSize_(rollSize),flushInterval_(flushInterval),checkEveryN_(checkEveryN),count_(0),mutex_(threadSafe ? new MutexLock : NULL),startOfPeriod_(0), // 记录每一次roll的时间, roll 完会更新lastRoll_(0), // 记录上一次roll的时间lastFlush_(0) // 记录上一次的flush/roll的时间
{assert(basename.find('/') == string::npos);rollFile();
}
我们看下append的实现,append 需要输入append的内容和长度, 如果是线程的安全的,加锁去append。
void LogFile::append(const char* logline, int len)
{if (mutex_){MutexLockGuard lock(*mutex_);append_unlocked(logline, len);}else{append_unlocked(logline, len);}
}
这里调用了append_unlocked
, 我们看下这个函数的实现。我们调用文件操作append写入len字节的logline, writtenBytes
会返回append
的大小,如果 writtenBytes
大于设置的rollSize_
, 我们就开始’roolsize
的操作, 否则就判断count的次数, 我们默认的是1024次。如果上一次lastFlush_
超过了3s, 我们就写到文件中。
void LogFile::append_unlocked(const char* logline, int len)
{file_->append(logline, len);if (file_->writtenBytes() > rollSize_){rollFile();}else{++count_;if (count_ >= checkEveryN_){count_ = 0;time_t now = ::time(NULL);time_t thisPeriod_ = now / kRollPerSeconds_ * kRollPerSeconds_;if (thisPeriod_ != startOfPeriod_){rollFile();}else if (now - lastFlush_ > flushInterval_){lastFlush_ = now;file_->flush();}}}
}
我们 看下真正rollsize
的实现, 我们创建文件名+时间+主机名+ pid.log的文件名, 返回文件名和时间,如果此刻大于我们的上一次的roll的时间, 我们就新建一个文件。
bool LogFile::rollFile()
{time_t now = 0;string filename = getLogFileName(basename_, &now);time_t start = now / kRollPerSeconds_ * kRollPerSeconds_;if (now > lastRoll_){lastRoll_ = now;lastFlush_ = now;startOfPeriod_ = start;file_.reset(new FileUtil::AppendFile(filename));return true;}return false;
}
string LogFile::getLogFileName(const string& basename, time_t* now)
{string filename;filename.reserve(basename.size() + 64);filename = basename;char timebuf[32];struct tm tm;*now = time(NULL);gmtime_r(now, &tm); // FIXME: localtime_r ?strftime(timebuf, sizeof timebuf, ".%Y%m%d-%H%M%S.", &tm);filename += timebuf;filename += ProcessInfo::hostname();char pidbuf[32];snprintf(pidbuf, sizeof pidbuf, ".%d", ProcessInfo::pid());filename += pidbuf;filename += ".log";return filename;
}
Logger
我们看到的loglevel 有六个级别,用枚举类型来定义的, setLogLevel
是static
类型的, 所以在多线程的写多个文件, 也会全局生效。
class Logger
{public:enum LogLevel{TRACE,DEBUG,INFO,WARN,ERROR,FATAL,NUM_LOG_LEVELS,};// compile time calculation of basename of source fileclass SourceFile{public:Logger(SourceFile file, int line);Logger(SourceFile file, int line, LogLevel level);Logger(SourceFile file, int line, LogLevel level, const char* func);Logger(SourceFile file, int line, bool toAbort);~Logger();LogStream& stream() { return impl_.stream_; }static LogLevel logLevel();static void setLogLevel(LogLevel level);typedef void (*OutputFunc)(const char* msg, int len);typedef void (*FlushFunc)();static void setOutput(OutputFunc);static void setFlush(FlushFunc);static void setTimeZone(const TimeZone& tz);private:class Impl{public:typedef Logger::LogLevel LogLevel;Impl(LogLevel level, int old_errno, const SourceFile& file, int line);void formatTime();void finish();Timestamp time_;LogStream stream_;LogLevel level_;int line_;SourceFile basename_;};Impl impl_;
};extern Logger::LogLevel g_logLevel;
inline Logger::LogLevel Logger::logLevel()
{return g_logLevel;
}
muduo使用预定义的方式定义了八种log 的级别, 这样我们调用LOG_TRACE
就会调用到logstream
流里面。
#define LOG_TRACE if (muduo::Logger::logLevel() <= muduo::Logger::TRACE) \\muduo::Logger(__FILE__, __LINE__, muduo::Logger::TRACE, __func__).stream()
#define LOG_DEBUG if (muduo::Logger::logLevel() <= muduo::Logger::DEBUG) \\muduo::Logger(__FILE__, __LINE__, muduo::Logger::DEBUG, __func__).stream()
#define LOG_INFO if (muduo::Logger::logLevel() <= muduo::Logger::INFO) \\muduo::Logger(__FILE__, __LINE__).stream()
#define LOG_WARN muduo::Logger(__FILE__, __LINE__, muduo::Logger::WARN).stream()
#define LOG_ERROR muduo::Logger(__FILE__, __LINE__, muduo::Logger::ERROR).stream()
#define LOG_FATAL muduo::Logger(__FILE__, __LINE__, muduo::Logger::FATAL).stream()
#define LOG_SYSERR muduo::Logger(__FILE__, __LINE__, false).stream()
#define LOG_SYSFATAL muduo::Logger(__FILE__, __LINE__, true).stream()
我们再看下setLogLevel()
, 就是把level 设置给全局的loglevel, 比较简单。
void Logger::setLogLevel(Logger::LogLevel level)
{g_logLevel = level;
}
LogStream
在logstream 类里面定义了两个类, 一个是FixedBuffer, 设置我们在stream 定义的缓存的大小,定义了小缓存为kSmallBuffer = 4k, 最大的缓存为4MB.
FixedBuffer
的定义了一个data_[size]
的数组和一个指向当前写入的字节的指针, 作用很简单, 就是预先开辟出一块内存,然后往里面写入(append) len 个字节, cur 会指向当前已写入的字节的多少。
const int kSmallBuffer = 4000;
const int kLargeBuffer = 4000*1000;template<int SIZE>
class FixedBuffer : noncopyable
{public:FixedBuffer(): cur_(data_){setCookie(cookieStart);}~FixedBuffer(){setCookie(cookieEnd);}void append(const char* /*restrict*/ buf, size_t len){// FIXME: append partiallyif (implicit_cast<size_t>(avail()) > len){memcpy(cur_, buf, len);cur_ += len;}}const char* data() const { return data_; }int length() const { return static_cast<int>(cur_ - data_); }// write to data_ directlychar* current() { return cur_; }int avail() const { return static_cast<int>(end() - cur_); }void add(size_t len) { cur_ += len; }void reset() { cur_ = data_; }void bzero() { memZero(data_, sizeof data_); }// for used by GDBconst char* debugString();void setCookie(void (*cookie)()) { cookie_ = cookie; }// for used by unit teststring toString() const { return string(data_, length()); }StringPiece toStringPiece() const { return StringPiece(data_, length()); }private:const char* end() const { return data_ + sizeof data_; }// Must be outline function for cookies.static void cookieStart();static void cookieEnd();void (*cookie_)();char data_[SIZE];char* cur_;
};
这里最重要的是logstream的类。定义了Buffer的大小为4k, 并且重载了若干个operator<<
, 这样我们写log文件的时候可以使用LOGTRACE<<
来写入, 这里也很简单, 就是把字符串,数字等写入缓存buffer内。
class LogStream : noncopyable
{typedef LogStream self;public:typedef detail::FixedBuffer<detail::kSmallBuffer> Buffer;self& operator<<(bool v){buffer_.append(v ? "1" : "0", 1);return *this;}self& operator<<(short);self& operator<<(unsigned short);self& operator<<(int);self& operator<<(unsigned int);self& operator<<(long);self& operator<<(unsigned long);self& operator<<(long long);self& operator<<(unsigned long long);self& operator<<(const void*);self& operator<<(float v){*this << static_cast<double>(v);return *this;}self& operator<<(double);// self& operator<<(long double);self& operator<<(char v){buffer_.append(&v, 1);return *this;}self& operator<<(const char* str){if (str){buffer_.append(str, strlen(str));}else{buffer_.append("(null)", 6);}return *this;}self& operator<<(const unsigned char* str){return operator<<(reinterpret_cast<const char*>(str));}self& operator<<(const string& v){buffer_.append(v.c_str(), v.size());return *this;}self& operator<<(const StringPiece& v){buffer_.append(v.data(), v.size());return *this;}self& operator<<(const Buffer& v){*this << v.toStringPiece();return *this;}void append(const char* data, int len) { buffer_.append(data, len); }const Buffer& buffer() const { return buffer_; }void resetBuffer() { buffer_.reset(); }private:void staticCheck();template<typename T>void formatInteger(T);Buffer buffer_;static const int kMaxNumericSize = 48;
};
接下来就是我们最重要的异步log的实现。
AsyncLogging
在AsyncLogging 中, 采用了四个缓冲区,这样可以进一步减少或者避免等待。
class AsyncLogging : noncopyable
{public:AsyncLogging(const string& basename,off_t rollSize,int flushInterval = 3);~AsyncLogging(){if (running_){stop();}}void append(const char* logline, int len);void start(){running_ = true;thread_.start();latch_.wait();}void stop() NO_THREAD_SAFETY_ANALYSIS{running_ = false;cond_.notify();thread_.join();}private:void threadFunc();typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer; // 4MB的缓冲typedef std::vector<std::unique_ptr<Buffer>> BufferVector; // 缓冲数组typedef BufferVector::value_type BufferPtr; // 缓冲的指针const int flushInterval_;std::atomic<bool> running_;const string basename_;const off_t rollSize_;muduo::Thread thread_;muduo::CountDownLatch latch_;muduo::MutexLock mutex_; // 线程安全muduo::Condition cond_ GUARDED_BY(mutex_);BufferPtr currentBuffer_ GUARDED_BY(mutex_); // 当前缓冲BufferPtr nextBuffer_ GUARDED_BY(mutex_); // 预备缓冲BufferVector buffers_ GUARDED_BY(mutex_); //待写入文件的已填满的缓冲
};
先来看发送方的代码。
AsyncLogging::AsyncLogging(const string& basename,off_t rollSize,int flushInterval): flushInterval_(flushInterval),running_(false),basename_(basename),rollSize_(rollSize),thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"),latch_(1),mutex_(),cond_(mutex_),currentBuffer_(new Buffer),nextBuffer_(new Buffer),buffers_()
{currentBuffer_->bzero();nextBuffer_->bzero();buffers_.reserve(16);
}void AsyncLogging::append(const char* logline, int len)
{muduo::MutexLockGuard lock(mutex_);if (currentBuffer_->avail() > len) // 缓存没满, 写入缓冲中{currentBuffer_->append(logline, len);}else // 缓存满了{buffers_.push_back(std::move(currentBuffer_)); // 放入待写入的缓存中if (nextBuffer_) // 准备好另一块缓冲{currentBuffer_ = std::move(nextBuffer_); // 把当前缓存的指针指向空buffer中}else{currentBuffer_.reset(new Buffer); // Rarely happens}currentBuffer_->append(logline, len); // 当前缓冲继续append, 并通知cond_.notify();}
}
前端生成一条日志消息的时候会调用append()。 在这个函数中,如果当前缓冲(currentBuffer_)剩余空间足够大,则会直接把消息追加在到缓存中。否则说明当前缓冲区已满/不够,就把它送入buffers, 并把另一块备用的buffer设置为当前的缓冲区,然后追加消息并通知后端写入日志数据。如果写入速度太快,一下子把两块缓存都用完了,那么只好分配新的缓存作为当前缓存。
再看下接收方实现。
void AsyncLogging::threadFunc()
{assert(running_ == true);latch_.countDown();LogFile output(basename_, rollSize_, false); // LogfIle是真正写入的BufferPtr newBuffer1(new Buffer); // 准备好第一块bufferBufferPtr newBuffer2(new Buffer); // 准备好第二块buffernewBuffer1->bzero();newBuffer2->bzero();BufferVector buffersToWrite;buffersToWrite.reserve(16);while (running_){assert(newBuffer1 && newBuffer1->length() == 0);assert(newBuffer2 && newBuffer2->length() == 0);assert(buffersToWrite.empty());{muduo::MutexLockGuard lock(mutex_);if (buffers_.empty()) // unusual usage!{cond_.waitForSeconds(flushInterval_); // 等待条件触发}buffers_.push_back(std::move(currentBuffer_)); // 条件满足时, 先将当前缓冲区移入bufferscurrentBuffer_ = std::move(newBuffer1); // 并立刻将空闲的newBuffer1 作为新的缓冲buffersToWrite.swap(buffers_); // 将空的buffersToWrite 填充if (!nextBuffer_){nextBuffer_ = std::move(newBuffer2); // 如果nextBuffer为空, 将buffer2 作为新的nextBuffer, 这样前端始终有一个buffer可供调配}}assert(!buffersToWrite.empty());if (buffersToWrite.size() > 25) // 如果长度超过25, 丢弃一部分{char buf[256];snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\\n",Timestamp::now().toFormattedString().c_str(),buffersToWrite.size()-2);fputs(buf, stderr);output.append(buf, static_cast<int>(strlen(buf)));buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());}for (const auto& buffer : buffersToWrite) // 遍历数组,真正写入内存中{// FIXME: use unbuffered stdio FILE ? or use ::writev ?output.append(buffer->data(), buffer->length());}if (buffersToWrite.size() > 2){// drop non-bzero-ed buffers, avoid trashingbuffersToWrite.resize(2); // buffersToWrite的大小设置为2}if (!newBuffer1) // 将buffersToWrite内的buffer重新填充为newBuffer1 {assert(!buffersToWrite.empty());newBuffer1 = std::move(buffersToWrite.back());buffersToWrite.pop_back();newBuffer1->reset();}if (!newBuffer2) // 将buffersToWrite内的buffer重新填充为newBuffer2{assert(!buffersToWrite.empty());newBuffer2 = std::move(buffersToWrite.back());buffersToWrite.pop_back();newBuffer2->reset();}buffersToWrite.clear();output.flush();}output.flush();
}
我们看下运行的图示:
一开始我们先分配好四个缓存, A, B ,C ,D. 初始时都是空的。
- 缓冲A, B都没满, 但是超时。写入频度不高, 后端3s 超市后将" 当前缓冲currentBuffer" 写入文件。在2.9s的时候, currBuffer使用了80%, 在第三秒的时候后端线程先把curr送到buffers, 再把newBuffer1设置为curr。 随后3s+, 交换buffers 和buffersToWrite, 后端开始将buffer A写入文件,写完再把new1再填上,供下次cond返回。
- 缓冲A满了,B未满。在3s超时之前已经写满了当前缓冲,于是唤醒后端线程开始写入文件。在第1.5s的时候, currBuffer使用了80%, 第1.8s, curr写满,将当前缓冲送入buffers_, 并将nextbuffer_ 设置为当前缓冲,然后开始写入。当后端线程唤醒之后(1.8s+), 先将curr送入buffers_, 再将new1, new2设置为当前缓冲。
- 缓冲AB都满。前端在短时间内写入大量的消息,用完了两个缓冲,并重新分配了一块新的缓冲。在第1.8s的时候, A已经写满, B也接近写满。并且已经notify() 后端线程,但是由于种种原因, 后端线程并没有立刻开始工作, 直到1.9s的之后, B也写满了, 前端线程重新分配了缓冲E, 到了1.8s+, 后端线程开始写入,将CD 两块缓冲交给前端,并开始将ABE写入文件。完成之后, 用AB重新填充那两块缓冲,释放缓冲E。
- 文件写入速度慢,导致前端耗尽了两个缓冲。 前1.8s+ 和前面的第二种相同, 前端写满了一个缓冲,唤醒后后端线程开始写入文件。之后,后端花了较长时间才将数据写完。这期间前端又用完了两个缓冲(CD),并分配了新的缓冲(E), 这期间前端的notify已经丢失。当后端写完之后,发现buffers_ 不为空, 立刻进入下一循环。替换前端的两个缓冲,并开始一次将CDE写入。如果buffersToWrite的大小超过了两个, 将重新把buffersToWrite的大小设置为2个, 再将new1, new2填充缓冲。