如何构建一个高性能的异步日志库?
日志系统的要求
日志库大致可分为前端和后端两部分,前端负责提供应用程序日志接口并生成日志消息,后端负责接收日志消息并将消息写往某处。
在多线程程序中,多个线程有前端,同时共用一个后端,这是一个典型的多生产者单消费者问题。对前端而言需要做到低延迟、低CPU开销、无阻塞;后端要做到大吞吐量、占用少量资源。
功能:
- 日志消息多级别
- 日志多目的地
- 日志格式可配置
- 运行时过滤:不同级别消息的日志分目的地
线程安全问题:多个线程并发写日志,日志消息不会交织出现。比如线程1写消息abc,线程2写消息123,不能出现a1b23c,只能是abc123或123abc。
muduo日志使用方式
muduo采用流式日志,使用时非常简单,只需向LOG_INFO写数据即可LOG_INFO << "Hello 0123456789";
,默认为终端输出。
1 2 3 4 5 6
| #define LOG_TRACE if (muduo::Logger::logLevel() <= muduo::Logger::TRACE) \ muduo::Logger(__FILE__, __LINE__, muduo::Logger::TRACE, __func__).stream() #define LOG_DEBUG if (muduo::Logger::logLevel() <= muduo::Logger::DEBUG) \ muduo::Logger(__FILE__, __LINE__, muduo::Logger::DEBUG, __func__).stream() #define LOG_INFO if (muduo::Logger::logLevel() <= muduo::Logger::INFO) \ muduo::Logger(__FILE__, __LINE__).stream()
|
实际上LOG_INFO为一个宏,只有日志级别小于INFO才生效。这个宏构造了匿名对象Logger,并调用stream方法获取Stream流,然后重载运算符<<。
1 2 3 4 5 6
| Logger::OutputFunc Logger::g_output = [](const char* msg, int len) { fwrite(msg, 1, len, stdout); }; Logger::FlushFunc Logger::g_flush = []() { fflush(stdout); };
|
异步日志使用
- 构造后端 AsyncLogging
- 设置前端的输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| off_t kRollSize = 500*1000*1000; muduo::AsyncLogging* g_asyncLog = NULL;
void asyncOutput(const char* msg, int len){ g_asyncLog->append(msg, len); }
int main(){ muduo::AsyncLogging log(::basename(name), kRollSize); log.start(); g_asyncLog = &log; muduo::Logger::setOutput(asyncOutput); LOG_INFO << "Hello 0123456789"; }
|
muduo异步日志原理
前述日志库分为前端和后端,前后端之间如何交互呢?如果只靠一个blockingQueue,那么每条日志都会通知后端一次,不太现实。
实际上muduo采用了双缓冲技术:前端写入缓冲区A,后端读取缓冲区B,当A满或者B空时,交换两个缓冲区。
这样做的好处是:
- 前端将多条消息合并为一个大buffer发给后端,相当于批处理;
- 前端写消息和后端读消息互不阻塞,分开进行。
在实现时,为了消息处理的及时性,即便前端缓冲区未满,每隔3秒也会执行一次缓冲区交换操作。
核心代码实现
在实现时采用了四个缓冲区,前端两个,后端两个。这样做是为了当一个缓冲区耗尽时,能够及时启用下一个缓冲区,进一步减少前端的等待。
前端:
- currentBufer_
- nextBuffer_
- buffer_ 缓冲区的队列
后端:
- newBuffer1
- newBuffer2
- buffersToWrite 缓冲区的队列
前端日志写入
实际上,前端调用(g_output)的还是asyncLogging的接口void AsyncLogging::append(const char* logline, int len)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class AsyncLogging : noncopyable{ private: void threadFunc(); typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer; typedef std::vector<std::unique_ptr<Buffer>> BufferVector; typedef BufferVector::value_type BufferPtr;
const int flushInterval_; std::atomic<bool> running_; const string basename_; const off_t rollSize_; muduo::Thread thread_; muduo::CountDownLatch latch_; muduo::MutexLock mutex_; muduo::Condition cond_ GUARDED_BY(mutex_); BufferPtr currentBuffer_ GUARDED_BY(mutex_); BufferPtr nextBuffer_ GUARDED_BY(mutex_); BufferVector buffers_ GUARDED_BY(mutex_); };
|
这里的buffers_
是bufferPtr的队列,已满的缓冲区会先收集到这个队列中。append函数主要逻辑就是:
- 判断curentBuffer_是否已经满
- 未满则写入一条日志,结束
- 已满,则将cureentBuffer_放入buffers_,并启用nextBuffer_
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void AsyncLogging::append(const char *logLine, int len) { std::lock_guard<std::mutex> lockGuard(mutex_); if (currentBuffer_->avail() > len) { currentBuffer_->append(logLine, len); } else { buffers_.emplace_back(currentBuffer_.release()); if (nextBuffer_) { currentBuffer_ = std::move(nextBuffer_); } else { currentBuffer_ = std::make_unique<Buffer>(); } currentBuffer_->append(logLine, len); cond_.notify_one(); } }
|
后端日志落盘
AsyncLogging::threadFunc()
:后端日志落盘线程的执行函数。
这里准备了三块缓冲区newBuffer1和newBuffer2前面介绍过,用于和前端的缓冲区交换,buffersToWrite
关键看临界区内的代码,并没有采用while循环,而是采用了带时间间隔的条件变量的唤醒。
异常处理:当已满缓冲队列中的数据堆积(> 默认缓冲数25),就会丢弃多余缓冲,只保留最开始2个。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| void AsyncLogging::theadFunc() { assert(running_ == true); LogFile output(basename_, rollSize_); BufferPtr newBuffer1 = std::make_unique<Buffer>(); BufferPtr newBuffer2 = std::make_unique<Buffer>(); newBuffer1->bzero(); newBuffer2->bzero(); BufferVector buffersToWrite; buffersToWrite.reserve(16);
while (running_) { assert(newBuffer1 && newBuffer1->length() == 0); assert(newBuffer2 && newBuffer2->length() == 0); assert(buffersToWrite.empty());
{ std::unique_lock<std::mutex> uniqueLock(mutex_); if (buffers_.empty()) { cond_.wait_for(uniqueLock, std::chrono::seconds(flushInterval_)); } buffers_.emplace_back(currentBuffer_.release()); currentBuffer_ = std::move(newBuffer1); buffersToWrite.swap(buffers_); if (!nextBuffer_) { nextBuffer_ = std::move(newBuffer2); } } if (buffersToWrite.size() > 25) { char buf[256]; snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n", Timestamp::now().toFormattedString().c_str(), buffersToWrite.size()-2); fputs(buf, stderr); output.append(buf, static_cast<int>(strlen(buf))); buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end()); } for (const auto& b : buffersToWrite) { output.append(b->data(), b->length()); } if (buffersToWrite.size() > 2) { buffersToWrite.resize(2); } if (!newBuffer1) { newBuffer1 = std::move(buffersToWrite.back()); buffersToWrite.pop_back(); newBuffer1->reset(); } if (!newBuffer2) { newBuffer2 = std::move(buffersToWrite.back()); buffersToWrite.pop_back(); newBuffer2->reset(); } buffersToWrite.clear(); output.flush(); } output.flush(); }
|
前端
前述前端是调用g_output将消息发送过去,那么整个过程是什么样的呢?
Logger -> Impl -> LogStream -> operator<< -> LogStream的FixBuffer内 -> ~Logger -> g_output
1 2 3 4 5 6 7 8 9 10 11
| Logger::~Logger() { impl_.finish(); const LogStream::Buffer& buf(stream().buffer()); g_output(buf.data(), buf.length()); if (impl_.level_ == FATAL) { g_flush(); abort(); } }
|
- Logger 提供用户接口,一个pointer to implement实现,提供日志等级、文件行数代码详细信息
- Impl Logger的详细实现
- LogStream 重载operator<<,将用户输入保存在类中的small buffer中
- FixedBuffer 固定大小的缓冲区
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| class Logger { public: enum LogLevel { TRACE, DEBUG, INFO, WARN, ERROR, FATAL, NUM_LOG_LEVELS, };
Logger(const char* file, int line); Logger(const char* file, int line, LogLevel level); ~Logger(); LogStream& stream();
public: static LogLevel logLevel() { return g_logLevel; } static void setLogLevel(LogLevel level) { g_logLevel = level; }
typedef void (*OutputFunc)(const char* msg, int len); typedef void (*FlushFunc)(); static void setOutput(OutputFunc o) { g_output = o; } static void setFlush(FlushFunc f) { g_flush = f; }
private: static LogLevel g_logLevel; static OutputFunc g_output; static FlushFunc g_flush;
class Impl; std::unique_ptr<Impl> impl; };
class Logger::Impl { public: typedef Logger::LogLevel LogLevel; Impl(LogLevel level, int old_errno, const char* file, int line) : time_(Timestamp::now()), stream_(), level_(level), line_(line), basename_(file) { formatTime(); CurrentThread::tid(); stream_.append(CurrentThread::tidString(), CurrentThread::t_tidLen); stream_.append(LogLevelName[level], 8); if (old_errno != 0) { stream_ << strerror_tl(old_errno) << " (errno=" << old_errno << ") "; } stream_.append(basename_.data(), basename_.size()); stream_ << ':' << line_ << " - "; }
void formatTime();
Timestamp time_; LogStream stream_; LogLevel level_; int line_; SourceFile basename_; };
|
参考
https://xiaodongfan.com/Muduo网络库实现-一-异步日志.html
https://www.cnblogs.com/fortunely/p/15973948.html