muduo网络库学习

《Linux多线程服务端编程:使用muduo C++网络库》陈硕大佬的这本是一直想看没时间看,这次好好看看。

前置知识

std::bind

std::function

reactor模式

eventfd

从echo服务器例子看起

muduo搭建echo服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
int main()
{
EventLoop loop;
//这个EventLoop就是main EventLoop,即负责循环事件监听处理新用户连接事件的事件循环器。

InetAddress addr(4567);
//InetAddress其实是对socket编程中的sockaddr_in进行封装,使其变为更友好简单的接口而已。

EchoServer server(&loop, addr, "EchoServer-01");
//EchoServer类,自己等一下往下翻一下。

server.start();
//启动TcpServer服务器

loop.loop();
//执行EventLoop::loop()函数,这个函数在概述篇的EventLoop小节有提及,自己去看一下!!
return 0;
}

class EchoServer
{
public:
EchoServer(EventLoop *loop, const InetAddress &addr, const std::string &name)
: server_(loop, addr, name), loop_(loop){
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, std::placeholders::_1)
);
// 将用户定义的连接事件处理函数注册进TcpServer中,TcpServer发生连接事件时会执行onConnection函数。

server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this,
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)
);
//将用户定义的可读事件处理函数注册进TcpServer中,TcpServer发生可读事件时会执行onMessage函数。

server_.setThreadNum(3);
//设置sub reactor数量,你这里设置为3,就和概述篇图2中的EventLoop2 EventLoop3 EventLoop4对应,有三个sub EventLoop。
}
void start(){
server_.start();
}
private:
void onConnection(const TcpConnectionPtr &conn)
{
//用户定义的连接事件处理函数:当服务端接收到新连接建立请求,则打印Connection UP,如果是关闭连接请求,则打印Connection Down
if (conn->connected())
LOG_INFO("Connection UP : %s", conn->peerAddress().toIpPort().c_str());
else
LOG_INFO("Connection DOWN : %s", conn->peerAddress().toIpPort().c_str());
}

void onMessage(const TcpConnectionPtr &conn,
Buffer *buf,
Timestamp time)
{
//用户定义的可读事件处理函数:当一个Tcp连接发生了可读事件就把它这个接收到的消息原封不动的还回去
std::string msg = buf->retrieveAllAsString();
conn->send(msg);
conn->shutdown();
}
EventLoop *loop_;
TcpServer server_;
};

构造echoServer,里面其实是TcpServer,注册ConnextionCallBack和MessageCallback并设置线程数量(reactor数量)。这里注意setConnectionCallback时传入的是this指针,意味着TcpServer调用这个函数时,其实调用的是EchoServer对象的onConnection(也就是用户自定义的连接事件处理函数)。

再看TcpServer的构造函数与start函数和EvenLoop的loop函数。start函数启动threadpool(创建线程与线程对应的eventloop,调用对应eventloop的loop函数)。

紧接着调用runInLoop,传入Acceptor的liste函数。runInLoop函数十分重要,对应one loop per thread语义,一个线程绑定一个eventloop,你有任务交给这个eventloop,那么eventloop对应的线程就会干活。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
ipPort_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
}

void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
threadPool_->start(threadInitCallback_);

assert(!acceptor_->listening());
loop_->runInLoop(
std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}

loop函数可以看到是阻塞在poller调用,那么上述listen就是对应监听连接。有连接到来后,封装在activeChannels中,然后遍历activeChannel,调用handleEvent进行处理事件;最后调用doPendingFuncots来处理一些不太紧急的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";

while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}

LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}

核心模块

Channel

Pooller

EventLoop

Channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
EventLoop* loop_;
const int fd_;
int events_;
int revents_; // it's the received event types of epoll or poll
int index_; // used by Poller.
bool logHup_;

ReadEventCallback readCallback_;
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;

void enableReading() { events_ |= kReadEvent; update(); }
void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }

void handleEvent(Timestamp receiveTime);
void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }
void setWriteCallback(EventCallback cb) { writeCallback_ = std::move(cb); }
void setCloseCallback(EventCallback cb) { closeCallback_ = std::move(cb); }
void setErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); }

Channel类是文件描述符的保姆,它封装了该fd上的感兴趣事件以及发生的事件,封装了注册修改事件以及发生对应事件的回调函数。

Poller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/// Base class for IO Multiplexing
///
/// This class doesn't own the Channel objects.
class Poller : noncopyable
{
public:
typedef std::vector<Channel*> ChannelList;

Poller(EventLoop* loop);
virtual ~Poller();

/// Polls the I/O events.
/// Must be called in the loop thread.
virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0;

/// Changes the interested I/O events.
/// Must be called in the loop thread.
virtual void updateChannel(Channel* channel) = 0;

/// Remove the channel, when it destructs.
/// Must be called in the loop thread.
virtual void removeChannel(Channel* channel) = 0;

virtual bool hasChannel(Channel* channel) const;

static Poller* newDefaultPoller(EventLoop* loop);

void assertInLoopThread() const
{
ownerLoop_->assertInLoopThread();
}

protected:
typedef std::map<int, Channel*> ChannelMap;
ChannelMap channels_;

private:
EventLoop* ownerLoop_;
};

class PollPoller : public Poller
{
public:

PollPoller(EventLoop* loop);
~PollPoller() override;

Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
void updateChannel(Channel* channel) override;
void removeChannel(Channel* channel) override;

private:
void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const;

typedef std::vector<struct pollfd> PollFdList;
PollFdList pollfds_;
};

Pooler是Poll或Epoll的封装,提供注册修改Channel的接口,Channel用一个map保存(channels_)。其中,poll是IO复用的重要封装,它将返回的事件填充到activeChannels中。

理解muduo

  • 连接建立
  • 消息处理(读/写)
  • 连接断开
  • one loop per thread

首先就是要理解mainReactor和subReactor,它们都是对应有eventLoop,分别称为mainEventloop和subEventloop。mainEventloop主要处理连接事件,包含了acceptor这个重要组件。subReactor有自己的监听器,处理读写事件。

连接建立

上图中可能不太准确,5改为2,6改为3。

在muduo启动后,acceptor处理可读事件,调用TcpServer::newConnection。它获取一个subEventLoop,构造TcpConnection,然后在subEventLoop所在的线程中进行连接的建立(runInLoop、connectEstablished)。

connectEstablished会将fd注册到subEventloop中的Poller中 channel_->enableReading();

这里获取一个eventLoop然后调用runInLoop就能在所在线程执行任务是一个很关键的设计点,后续会讲。

消息读取

消息读取的主要逻辑就在loop函数中,对于每个channle依次调用对应事件的处理函数。读取处理分为两步:

  1. 读取消息至inputbuffer中
  2. 调用用户的messageCallBack
1
2
3
4
5
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void TcpConnection::handleRead(TimeStamp receiveTime)
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if(n > 0) //从fd读到了数据,并且放在了inputBuffer_上
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if(n == 0)
handleClose();
else
{
errno = savedErrno;
LOG_ERROR("TcpConnection::handleRead");
handleError();
}
}

消息发送

调用TcpConnetion::send(buf)函数,将buf内的数据发送给对应客户端。

这里对于写是多段处理的:如果TCP的缓冲区放不下buf的数据,那么剩余未发送的数据会存放到TcpConnection::outputBuffer_中。然后注册可写事件,在监听到可写时,由TcpConnection::handleWrite( )函数把TcpConnection::outputBuffer_中剩余的数据发送出去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
}

assert(remaining <= len);
if (!faultError && remaining > 0)
{
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting();
}
}
}

连接关闭

muduo处理连接关闭的方式只有一种——被动关闭。等待客户端关闭连接,服务端调用read返回0后被动关闭连接。

连接关闭既然从本地对应的subEventloop中删除fd,也要在mainEventLoop的TcpServer中删除该fd。

这里交互的关键是queueInLoop,后续会讲。

One Loop Per Thread

  • runInLoop
  • queueInLoop

前置知识:eventfd相当于一个计数器,可以随时对它写,但只能在计数器有值时可读。写入为累加操作,读取可每次读1或者读全部,随后减去读取的值。

那么eventfd有什么作用?事件通知/事件驱动。

线程b监听一个eventfd,线程a如果想要通知线程b,只需要向eventfd中写值就行了。

引用eventfs的Manual中NOTE段落的第一句话:

Applications can use an eventfd file descriptor instead of a pipe in all cases where a pipe is used simply to signal events.

在信号通知的场景下,相比pipe有非常大的资源和性能优势。其根本在于counter(计数器)和channel(数据信道)的区别。

  • 第一,是打开文件数量的巨大差别。由于pipe是半双工的传统IPC方式,所以两个线程通信需要两个pipe文件,而用eventfd只要打开一个文件。众所周知,文件描述符可是系统中非常宝贵的资源,linux的默认值也只有1024而已。那开发者可能会说,1相比2也只节省了一半嘛。要知道pipe只能在两个进程/线程间使用,并且是面向连接(类似TCP socket)的,即需要之前准备好两个pipe;而eventfd是广播式的通知,可以多对多的。如上面的NxM的生产者-消费者例子,如果需要完成全双工的通信,需要NxMx2个的pipe,而且需要提前建立并保持打开,作为通知信号实在太奢侈了,但如果用eventfd,只需要在发通知的时候瞬时创建、触发并关闭一个即可。
  • 第二,是内存使用的差别。eventfd是一个计数器,内核维护几乎成本忽略不计,大概是自旋锁+唤醒队列(后续详细介绍),8个字节的传输成本也微乎其微。但pipe可就完全不是了,一来一回数据在用户空间和内核空间有多达4次的复制,而且更糟糕的是,内核还要为每个pipe分配至少4K的虚拟内存页,哪怕传输的数据长度为0。
  • 第三,对于timerfd,还有精准度和实现复杂度的巨大差异。由内核管理的timerfd底层是内核中的hrtimer(高精度时钟定时器),可以精确至纳秒(1e-9秒)级,完全胜任实时任务。而用户态要想实现一个传统的定时器,通常是基于优先队列/二叉堆,不仅实现复杂维护成本高,而且运行时效率低,通常只能到达毫秒级。

所以,第一个最佳实践法则:当pipe只用来发送通知(传输控制信息而不是实际数据),放弃pipe,放心地用eventfd/timerfd,”in all cases”。

另外一个重要优势就是eventfd/timerfd被设计成与epoll完美结合,比如支持非阻塞的读取等。事实上,二者就是为epoll而生的(但是pipe就不是,它在Unix的史前时代就有了,那时不仅没有epoll连Linux都还没诞生)。应用程序可以在用epoll监控其他文件描述符的状态的同时,可以“顺便“”一起监控实现了eventfd的内核通知机制,何乐而不为呢?

https://zhuanlan.zhihu.com/p/40572954

(1)t_loopInThisThread 线程局部变量

该变量保证了一个线程只对应一个eventLoop。如何保证的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/***** EventLoop.cc *****/
__thread EventLoop *t_loopInThisThread = nullptr;

EventLoop::EventLoop() :
threadId_(CurrentThread::tid()),
wakeupFd_(createEventfd()), //生成一个eventfd,每个EventLoop对象,都会有自己的eventfd
wakeupChannel_(new Channel(this, wakeupFd_))
{
LOG_DEBUG("EventLoop created %p in thread %d \n", this, threadId_);
if(t_loopInThisThread) //如果当前线程已经绑定了某个EventLoop对象了,那么该线程就无法创建新的EventLoop对象了
LOG_FATAL("Another EventLoop %p exits in this thread %d \n", t_loopInThisThread, threadId_);
else
t_loopInThisThread = this;
wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));
wakeupChannel_->enableReading();
}

eventLoop的构造函数会检查这个线程局部变量,如果已经存在,那么构造失败,否则将这个变量设置为自己。另外在许多函数中,都有 loop_->assertInLoopThread(); 来检查当前线程id是否与eventloop构造时存储的线程id一致。

(2)runInLoop & queueInLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void EventLoop::runInLoop(Functor cb){
if (isInLoopThread()){
cb();
}
else {
queueInLoop(std::move(cb));
}
}

void EventLoop::queueInLoop(Functor cb){
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(std::move(cb));
}

if (!isInLoopThread() || callingPendingFunctors_){
wakeup();
}
}

runInLoop检测是否在当前对应线程,如果在对应线程直接执行,否则queueInLoop将当前cb加入到eventloop的等待队列pendingFunctors,然后执行唤醒操作。唤醒操作也很简单,直接往eventfd写一个数据。

1
2
3
4
5
6
7
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof(one));
if(n != sizeof(n))
LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n", n);
}

eventLoop会在处理完所有activeChannl的事件后处理pendingFunctor队列里积累的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void EventLoop::loop()
{ //EventLoop 所属线程执行
looping_ = true;
quit_ = false;
LOG_INFO("EventLoop %p start looping \n", this);
while(!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);//此时activeChannels已经填好了事件发生的channel
for(Channel *channel : activeChannels_)
channel->HandlerEvent(pollReturnTime_);
doPendingFunctors(); //执行当前EventLoop事件循环需要处理的回调操作。
}
}
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
unique_lock<mutex> lock(mutex_);
functors.swap(pendingFunctors_); //这里的swap其实只是交换的vector对象指向的内存空间的指针而已。
}
for(const Functor &functor:functors)
{
functor();
}
callingPendingFunctors_ = false;
}

其余模块

Acceptor

Socket

Buffer

TcpConnection

TcpServer

线程类:ThreadPool、ThreadData、Thread

TimerQueue

TimerQueue

定时器的实现比较有趣,时间轮:当连接有事件发生时,将连接注册到当前指针指向的格子中;指针每隔固定时间转动一格,当指向某格时,执行当前格里所有回调函数。

代码实现:不是真的把一个连接从一个格子移动到另一个格子中,而是利用引用计数方式。

  • 格子以unordered_set的方式管理entry,一个entry就是一个连接的共享指针。
  • 时间轮以环形队列管理
  • 注册每秒的事件:往队尾添加一个空的Bucket。这样队头的Bucket就会弹出自动析构。
  • 在接收到消息时,将Entry放到时间轮的队尾(如此,引用计数递增)

采用共享指针可以确保连接出现在格子中时,引用计数不为零。而当引用计数减为零时,说明连接没有在任何一个格子中出现,那么连接超时,Entry的析构函数会断开连接。

1
2
3
4
typedef std::shared_ptr<Entry> EntryPtr;
typedef std::weak_ptr<Entry> WeakEntryPtr;
typedef std::unordered_set<EntryPtr> Bucket;
typedef boost::circular_buffer<Bucket> WeakConnectionList;

Buffer

Buffer 封装了一段缓冲区,用于消息读取和写入。在muduo里,你不必自己去read或write某个socket,只会操作TcpConnection的input buffer和output buffer。

特性表现:

  • 对外表现为连续内存,大小可以自动增长
  • 表现为queue,从末尾写入数据,从头部读出数据
  • 线程不安全

值的借鉴的设计有:缓冲区前增加一段8字节空间(prependable space),可以让程序以极低的代价在数据前面添加几个字节,比如说消息的长度。

代码设计:

  • 两个游标readerIndex和writerIndex
  • size指向实际使用的空间,capacity指向扩容的总空间
  • 内部腾挪:当writable空间不足时,会将已有数据整体前挪,再根据判断是否触发扩容

参考

长文梳理Muduo库核心代码及优秀编程细节剖析-CSDN博客

作者

Desirer

发布于

2024-11-15

更新于

2024-11-15

许可协议