muduo高性能异步日志

如何构建一个高性能的异步日志库?

日志系统的要求

日志库大致可分为前端和后端两部分,前端负责提供应用程序日志接口并生成日志消息,后端负责接收日志消息并将消息写往某处。

在多线程程序中,多个线程有前端,同时共用一个后端,这是一个典型的多生产者单消费者问题。对前端而言需要做到低延迟、低CPU开销、无阻塞;后端要做到大吞吐量、占用少量资源。

功能:

  • 日志消息多级别
  • 日志多目的地
  • 日志格式可配置
  • 运行时过滤:不同级别消息的日志分目的地

线程安全问题:多个线程并发写日志,日志消息不会交织出现。比如线程1写消息abc,线程2写消息123,不能出现a1b23c,只能是abc123或123abc。

muduo日志使用方式

muduo采用流式日志,使用时非常简单,只需向LOG_INFO写数据即可LOG_INFO << "Hello 0123456789";,默认为终端输出。

1
2
3
4
5
6
#define LOG_TRACE if (muduo::Logger::logLevel() <= muduo::Logger::TRACE) \
muduo::Logger(__FILE__, __LINE__, muduo::Logger::TRACE, __func__).stream()
#define LOG_DEBUG if (muduo::Logger::logLevel() <= muduo::Logger::DEBUG) \
muduo::Logger(__FILE__, __LINE__, muduo::Logger::DEBUG, __func__).stream()
#define LOG_INFO if (muduo::Logger::logLevel() <= muduo::Logger::INFO) \
muduo::Logger(__FILE__, __LINE__).stream()

实际上LOG_INFO为一个宏,只有日志级别小于INFO才生效。这个宏构造了匿名对象Logger,并调用stream方法获取Stream流,然后重载运算符<<。

1
2
3
4
5
6
Logger::OutputFunc Logger::g_output = [](const char* msg, int len) {
fwrite(msg, 1, len, stdout); // ok, thread safe
};
Logger::FlushFunc Logger::g_flush = []() {
fflush(stdout);
};

异步日志使用

  1. 构造后端 AsyncLogging
  2. 设置前端的输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
off_t kRollSize = 500*1000*1000;
muduo::AsyncLogging* g_asyncLog = NULL;

void asyncOutput(const char* msg, int len){
g_asyncLog->append(msg, len);
}

int main(){
muduo::AsyncLogging log(::basename(name), kRollSize);
log.start();
g_asyncLog = &log;

muduo::Logger::setOutput(asyncOutput);
LOG_INFO << "Hello 0123456789";
}

muduo异步日志原理

前述日志库分为前端和后端,前后端之间如何交互呢?如果只靠一个blockingQueue,那么每条日志都会通知后端一次,不太现实。

实际上muduo采用了双缓冲技术:前端写入缓冲区A,后端读取缓冲区B,当A满或者B空时,交换两个缓冲区。

这样做的好处是:

  1. 前端将多条消息合并为一个大buffer发给后端,相当于批处理;
  2. 前端写消息和后端读消息互不阻塞,分开进行。

在实现时,为了消息处理的及时性,即便前端缓冲区未满,每隔3秒也会执行一次缓冲区交换操作。

核心代码实现

在实现时采用了四个缓冲区,前端两个,后端两个。这样做是为了当一个缓冲区耗尽时,能够及时启用下一个缓冲区,进一步减少前端的等待。

前端:

  • currentBufer_
  • nextBuffer_
  • buffer_ 缓冲区的队列

后端:

  • newBuffer1
  • newBuffer2
  • buffersToWrite 缓冲区的队列

前端日志写入

实际上,前端调用(g_output)的还是asyncLogging的接口void AsyncLogging::append(const char* logline, int len)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class AsyncLogging : noncopyable{
private:
void threadFunc();
typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
typedef BufferVector::value_type BufferPtr;

const int flushInterval_;
std::atomic<bool> running_;
const string basename_;
const off_t rollSize_;
muduo::Thread thread_;
muduo::CountDownLatch latch_;
muduo::MutexLock mutex_;
muduo::Condition cond_ GUARDED_BY(mutex_);

BufferPtr currentBuffer_ GUARDED_BY(mutex_);
BufferPtr nextBuffer_ GUARDED_BY(mutex_);
BufferVector buffers_ GUARDED_BY(mutex_);
};

这里的buffers_是bufferPtr的队列,已满的缓冲区会先收集到这个队列中。append函数主要逻辑就是:

  1. 判断curentBuffer_是否已经满
  2. 未满则写入一条日志,结束
  3. 已满,则将cureentBuffer_放入buffers_,并启用nextBuffer_
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void AsyncLogging::append(const char *logLine, int len) {
std::lock_guard<std::mutex> lockGuard(mutex_);
// 当前缓冲区空间足够
if (currentBuffer_->avail() > len) {
currentBuffer_->append(logLine, len);
} else {
buffers_.emplace_back(currentBuffer_.release());
if (nextBuffer_) {
// 使用另一个缓冲区
currentBuffer_ = std::move(nextBuffer_); // now nextBuffer_ == nullptr
} else {
// 两个缓冲区都满了,重新分配一个缓冲区,很少发生
currentBuffer_ = std::make_unique<Buffer>();
}
currentBuffer_->append(logLine, len);
cond_.notify_one();
}
}

后端日志落盘

AsyncLogging::threadFunc():后端日志落盘线程的执行函数。

这里准备了三块缓冲区newBuffer1和newBuffer2前面介绍过,用于和前端的缓冲区交换,buffersToWrite

关键看临界区内的代码,并没有采用while循环,而是采用了带时间间隔的条件变量的唤醒。

异常处理:当已满缓冲队列中的数据堆积(> 默认缓冲数25),就会丢弃多余缓冲,只保留最开始2个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void AsyncLogging::theadFunc() {
assert(running_ == true);
LogFile output(basename_, rollSize_);
BufferPtr newBuffer1 = std::make_unique<Buffer>();
BufferPtr newBuffer2 = std::make_unique<Buffer>();
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector buffersToWrite;
buffersToWrite.reserve(16);

while (running_) {
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());

{
std::unique_lock<std::mutex> uniqueLock(mutex_);
if (buffers_.empty()) {
cond_.wait_for(uniqueLock, std::chrono::seconds(flushInterval_));
}
buffers_.emplace_back(currentBuffer_.release());
currentBuffer_ = std::move(newBuffer1);
buffersToWrite.swap(buffers_);
if (!nextBuffer_) {
nextBuffer_ = std::move(newBuffer2);
}
}
// 丢弃过多的日志
if (buffersToWrite.size() > 25)
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
}
// 日志落盘,写入文件中
for (const auto& b : buffersToWrite) {
output.append(b->data(), b->length());
}
if (buffersToWrite.size() > 2) {
buffersToWrite.resize(2);
}
if (!newBuffer1) {
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->reset();
}
if (!newBuffer2) {
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();
output.flush();
}
output.flush();
}

前端

前述前端是调用g_output将消息发送过去,那么整个过程是什么样的呢?

Logger -> Impl -> LogStream -> operator<< -> LogStream的FixBuffer内 -> ~Logger -> g_output

1
2
3
4
5
6
7
8
9
10
11
Logger::~Logger()
{
impl_.finish(); // 往Small Buffer添加后缀 文件名:行数 换行符
const LogStream::Buffer& buf(stream().buffer());
g_output(buf.data(), buf.length()); // 回调保存的g_output, 输出Small Buffer到指定文件流
if (impl_.level_ == FATAL) // 发生致命错误, 输出log并终止程序
{
g_flush(); // 回调冲刷
abort();
}
}
  • Logger 提供用户接口,一个pointer to implement实现,提供日志等级、文件行数代码详细信息
  • Impl Logger的详细实现
  • LogStream 重载operator<<,将用户输入保存在类中的small buffer中
  • FixedBuffer 固定大小的缓冲区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class Logger {
public:
enum LogLevel {
TRACE,
DEBUG,
INFO,
WARN,
ERROR,
FATAL,
NUM_LOG_LEVELS,
};

Logger(const char* file, int line);
Logger(const char* file, int line, LogLevel level);
~Logger();
LogStream& stream();

public:
static LogLevel logLevel() { return g_logLevel; }
static void setLogLevel(LogLevel level) { g_logLevel = level; }

typedef void (*OutputFunc)(const char* msg, int len);
typedef void (*FlushFunc)();
static void setOutput(OutputFunc o) { g_output = o; }
static void setFlush(FlushFunc f) { g_flush = f; }

private:
static LogLevel g_logLevel;
static OutputFunc g_output;
static FlushFunc g_flush;

class Impl;
std::unique_ptr<Impl> impl;
};

class Logger::Impl {
public:
typedef Logger::LogLevel LogLevel;
Impl(LogLevel level, int old_errno, const char* file, int line)
: time_(Timestamp::now()),
stream_(),
level_(level),
line_(line),
basename_(file)
{
formatTime();
CurrentThread::tid();
stream_.append(CurrentThread::tidString(), CurrentThread::t_tidLen);
stream_.append(LogLevelName[level], 8);
if (old_errno != 0) {
stream_ << strerror_tl(old_errno) << " (errno=" << old_errno << ") ";
}
stream_.append(basename_.data(), basename_.size());
stream_ << ':' << line_ << " - ";
}

void formatTime();

Timestamp time_;
LogStream stream_;
LogLevel level_;
int line_;
SourceFile basename_;
};

参考

https://xiaodongfan.com/Muduo网络库实现-一-异步日志.html

https://www.cnblogs.com/fortunely/p/15973948.html

作者

Desirer

发布于

2024-11-15

更新于

2024-11-15

许可协议