《Linux多线程服务端编程:使用muduo C++网络库》陈硕大佬的这本是一直想看没时间看,这次好好看看。
前置知识 std::bind
std::function
reactor模式
eventfd
从echo服务器例子看起 muduo搭建echo服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 int main () { EventLoop loop; InetAddress addr (4567 ) ; EchoServer server (&loop, addr, "EchoServer-01" ) ; server.start (); loop.loop (); return 0 ; } class EchoServer { public : EchoServer (EventLoop *loop, const InetAddress &addr, const std::string &name) : server_ (loop, addr, name), loop_ (loop){ server_.setConnectionCallback ( std::bind (&EchoServer::onConnection, this , std::placeholders::_1) ); server_.setMessageCallback ( std::bind (&EchoServer::onMessage, this , std::placeholders::_1, std::placeholders::_2, std::placeholders::_3) ); server_.setThreadNum (3 ); } void start () { server_.start (); } private : void onConnection (const TcpConnectionPtr &conn) { if (conn->connected ()) LOG_INFO ("Connection UP : %s" , conn->peerAddress ().toIpPort ().c_str ()); else LOG_INFO ("Connection DOWN : %s" , conn->peerAddress ().toIpPort ().c_str ()); } void onMessage (const TcpConnectionPtr &conn, Buffer *buf, Timestamp time) { std::string msg = buf->retrieveAllAsString (); conn->send (msg); conn->shutdown (); } EventLoop *loop_; TcpServer server_; };
构造echoServer,里面其实是TcpServer,注册ConnextionCallBack和MessageCallback并设置线程数量(reactor数量)。这里注意setConnectionCallback时传入的是this指针,意味着TcpServer调用这个函数时,其实调用的是EchoServer对象的onConnection(也就是用户自定义的连接事件处理函数)。
再看TcpServer的构造函数与start函数和EvenLoop的loop函数。start函数启动threadpool(创建线程与线程对应的eventloop,调用对应eventloop的loop函数)。
紧接着调用runInLoop,传入Acceptor的liste函数。runInLoop函数十分重要,对应one loop per thread语义,一个线程绑定一个eventloop,你有任务交给这个eventloop,那么eventloop对应的线程就会干活。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string & nameArg, Option option) : loop_(CHECK_NOTNULL(loop)), ipPort_(listenAddr.toIpPort()), name_(nameArg), acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)), threadPool_(new EventLoopThreadPool(loop, name_)), connectionCallback_(defaultConnectionCallback), messageCallback_(defaultMessageCallback), nextConnId_(1 ) { acceptor_->setNewConnectionCallback( std ::bind(&TcpServer::newConnection, this, _1, _2)); } void TcpServer::start () { if (started_.getAndSet(1 ) == 0 ) { threadPool_->start(threadInitCallback_); assert(!acceptor_->listening()); loop_->runInLoop( std ::bind(&Acceptor::listen, get_pointer(acceptor_))); } }
loop函数可以看到是阻塞在poller调用,那么上述listen就是对应监听连接。有连接到来后,封装在activeChannels中,然后遍历activeChannel,调用handleEvent进行处理事件;最后调用doPendingFuncots来处理一些不太紧急的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void EventLoop::loop () { assert(!looping_); assertInLoopThread(); looping_ = true ; quit_ = false ; LOG_TRACE << "EventLoop " << this << " start looping" ; while (!quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); ++iteration_; if (Logger::logLevel() <= Logger::TRACE) { printActiveChannels(); } eventHandling_ = true ; for (Channel* channel : activeChannels_) { currentActiveChannel_ = channel; currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL ; eventHandling_ = false ; doPendingFunctors(); } LOG_TRACE << "EventLoop " << this << " stop looping" ; looping_ = false ; }
核心模块 Channel
Pooller
EventLoop
Channel 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 EventLoop* loop_; const int fd_;int events_;int revents_; int index_; bool logHup_;ReadEventCallback readCallback_; EventCallback writeCallback_; EventCallback closeCallback_; EventCallback errorCallback_; void enableReading () { events_ |= kReadEvent; update(); } void disableReading () { events_ &= ~kReadEvent; update(); }void enableWriting () { events_ |= kWriteEvent; update(); }void disableWriting () { events_ &= ~kWriteEvent; update(); }void handleEvent (Timestamp receiveTime) ;void setReadCallback (ReadEventCallback cb) { readCallback_ = std ::move(cb); }void setWriteCallback (EventCallback cb) { writeCallback_ = std ::move(cb); }void setCloseCallback (EventCallback cb) { closeCallback_ = std ::move(cb); }void setErrorCallback (EventCallback cb) { errorCallback_ = std ::move(cb); }
Channel类是文件描述符的保姆,它封装了该fd上的感兴趣事件以及发生的事件,封装了注册修改事件以及发生对应事件的回调函数。
Poller 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 class Poller : noncopyable{ public: typedef std ::vector <Channel*> ChannelList; Poller(EventLoop* loop); virtual ~Poller(); virtual Timestamp poll (int timeoutMs, ChannelList* activeChannels) = 0 ; virtual void updateChannel (Channel* channel) = 0 ; virtual void removeChannel (Channel* channel) = 0 ; virtual bool hasChannel (Channel* channel) const ; static Poller* newDefaultPoller (EventLoop* loop) ; void assertInLoopThread () const { ownerLoop_->assertInLoopThread(); } protected: typedef std ::map <int , Channel*> ChannelMap; ChannelMap channels_; private: EventLoop* ownerLoop_; }; class PollPoller : public Poller{ public: PollPoller(EventLoop* loop); ~PollPoller() override; Timestamp poll (int timeoutMs, ChannelList* activeChannels) override; void updateChannel (Channel* channel) override; void removeChannel (Channel* channel) override; private: void fillActiveChannels (int numEvents, ChannelList* activeChannels) const ; typedef std ::vector <struct pollfd > PollFdList; PollFdList pollfds_; };
Pooler是Poll或Epoll的封装,提供注册修改Channel的接口,Channel用一个map保存(channels_)。其中,poll是IO复用的重要封装,它将返回的事件填充到activeChannels中。
理解muduo
连接建立
消息处理(读/写)
连接断开
one loop per thread
首先就是要理解mainReactor和subReactor,它们都是对应有eventLoop,分别称为mainEventloop和subEventloop。mainEventloop主要处理连接事件,包含了acceptor这个重要组件。subReactor有自己的监听器,处理读写事件。
连接建立
上图中可能不太准确,5改为2,6改为3。
在muduo启动后,acceptor处理可读事件,调用TcpServer::newConnection。它获取一个subEventLoop,构造TcpConnection,然后在subEventLoop所在的线程中进行连接的建立(runInLoop、connectEstablished)。
connectEstablished会将fd注册到subEventloop中的Poller中 channel_->enableReading();
这里获取一个eventLoop然后调用runInLoop就能在所在线程执行任务是一个很关键的设计点,后续会讲。
消息读取
消息读取的主要逻辑就在loop函数中,对于每个channle依次调用对应事件的处理函数。读取处理分为两步:
读取消息至inputbuffer中
调用用户的messageCallBack
1 2 3 4 5 for (Channel* channel : activeChannels_){ currentActiveChannel_ = channel; currentActiveChannel_->handleEvent(pollReturnTime_); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void TcpConnection::handleRead (TimeStamp receiveTime) { int savedErrno = 0 ; ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0 ) { messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0 ) handleClose(); else { errno = savedErrno; LOG_ERROR("TcpConnection::handleRead" ); handleError(); } }
消息发送 调用TcpConnetion::send(buf)
函数,将buf内的数据发送给对应客户端。
这里对于写是多段处理的:如果TCP的缓冲区放不下buf的数据,那么剩余未发送的数据会存放到TcpConnection::outputBuffer_
中。然后注册可写事件,在监听到可写时,由TcpConnection::handleWrite( )
函数把TcpConnection::outputBuffer_
中剩余的数据发送出去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 void TcpConnection::sendInLoop (const void * data, size_t len) { loop_->assertInLoopThread(); ssize_t nwrote = 0 ; size_t remaining = len; bool faultError = false ; if (state_ == kDisconnected) { LOG_WARN << "disconnected, give up writing" ; return ; } if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0 ) { nwrote = sockets::write(channel_->fd(), data, len); if (nwrote >= 0 ) { remaining = len - nwrote; if (remaining == 0 && writeCompleteCallback_) { loop_->queueInLoop(std ::bind(writeCompleteCallback_, shared_from_this())); } } else { nwrote = 0 ; if (errno != EWOULDBLOCK) { LOG_SYSERR << "TcpConnection::sendInLoop" ; if (errno == EPIPE || errno == ECONNRESET) { faultError = true ; } } } } assert(remaining <= len); if (!faultError && remaining > 0 ) { size_t oldLen = outputBuffer_.readableBytes(); if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { loop_->queueInLoop(std ::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)); } outputBuffer_.append(static_cast<const char *>(data)+nwrote, remaining); if (!channel_->isWriting()) { channel_->enableWriting(); } } }
连接关闭 muduo处理连接关闭的方式只有一种——被动关闭。等待客户端关闭连接,服务端调用read返回0后被动关闭连接。
连接关闭既然从本地对应的subEventloop中删除fd,也要在mainEventLoop的TcpServer中删除该fd。
这里交互的关键是queueInLoop,后续会讲。
One Loop Per Thread
前置知识:eventfd相当于一个计数器,可以随时对它写,但只能在计数器有值时可读。写入为累加操作,读取可每次读1或者读全部,随后减去读取的值。
那么eventfd有什么作用?事件通知/事件驱动。
线程b监听一个eventfd,线程a如果想要通知线程b,只需要向eventfd中写值就行了。
引用eventfs的Manual中NOTE段落的第一句话:
Applications can use an eventfd file descriptor instead of a pipe in all cases where a pipe is used simply to signal events.
在信号通知的场景下,相比pipe有非常大的资源和性能优势。其根本在于counter(计数器)和channel(数据信道)的区别。
第一,是打开文件数量的巨大差别。由于pipe是半双工的传统IPC方式,所以两个线程通信需要两个pipe文件,而用eventfd只要打开一个文件。众所周知,文件描述符可是系统中非常宝贵的资源,linux的默认值也只有1024而已。那开发者可能会说,1相比2也只节省了一半嘛。要知道pipe只能在两个进程/线程间使用,并且是面向连接(类似TCP socket)的,即需要之前准备好两个pipe;而eventfd是广播式的通知,可以多对多的。如上面的NxM的生产者-消费者例子,如果需要完成全双工的通信,需要NxMx2个的pipe,而且需要提前建立并保持打开,作为通知信号实在太奢侈了,但如果用eventfd,只需要在发通知的时候瞬时创建、触发并关闭一个即可。
第二,是内存使用的差别。eventfd是一个计数器,内核维护几乎成本忽略不计,大概是自旋锁+唤醒队列(后续详细介绍),8个字节的传输成本也微乎其微。但pipe可就完全不是了,一来一回数据在用户空间和内核空间有多达4次的复制,而且更糟糕的是,内核还要为每个pipe分配至少4K的虚拟内存页,哪怕传输的数据长度为0。
第三,对于timerfd,还有精准度和实现复杂度的巨大差异。由内核管理的timerfd底层是内核中的hrtimer(高精度时钟定时器),可以精确至纳秒(1e-9秒)级,完全胜任实时任务。而用户态要想实现一个传统的定时器,通常是基于优先队列/二叉堆,不仅实现复杂维护成本高,而且运行时效率低,通常只能到达毫秒级。
所以,第一个最佳实践法则:当pipe只用来发送通知(传输控制信息而不是实际数据),放弃pipe,放心地用eventfd/timerfd,”in all cases”。
另外一个重要优势就是eventfd/timerfd被设计成与epoll完美结合,比如支持非阻塞的读取等。事实上,二者就是为epoll而生的(但是pipe就不是,它在Unix的史前时代就有了,那时不仅没有epoll连Linux都还没诞生)。应用程序可以在用epoll监控其他文件描述符的状态的同时,可以“顺便“”一起监控实现了eventfd的内核通知机制,何乐而不为呢?
https://zhuanlan.zhihu.com/p/40572954
(1)t_loopInThisThread 线程局部变量
该变量保证了一个线程只对应一个eventLoop。如何保证的?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 __thread EventLoop *t_loopInThisThread = nullptr; EventLoop::EventLoop() : threadId_(CurrentThread::tid()), wakeupFd_(createEventfd()), wakeupChannel_(new Channel(this, wakeupFd_)) { LOG_DEBUG("EventLoop created %p in thread %d \n" , this, threadId_); if (t_loopInThisThread) LOG_FATAL("Another EventLoop %p exits in this thread %d \n" , t_loopInThisThread, threadId_); else t_loopInThisThread = this; wakeupChannel_->setReadCallback(std ::bind(&EventLoop::handleRead, this)); wakeupChannel_->enableReading(); }
eventLoop的构造函数会检查这个线程局部变量,如果已经存在,那么构造失败,否则将这个变量设置为自己。另外在许多函数中,都有 loop_->assertInLoopThread();
来检查当前线程id是否与eventloop构造时存储的线程id一致。
(2)runInLoop & queueInLoop
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void EventLoop::runInLoop (Functor cb) { if (isInLoopThread()){ cb(); } else { queueInLoop(std ::move(cb)); } } void EventLoop::queueInLoop (Functor cb) { { MutexLockGuard lock (mutex_) ; pendingFunctors_.push_back(std ::move(cb)); } if (!isInLoopThread() || callingPendingFunctors_){ wakeup(); } }
runInLoop检测是否在当前对应线程,如果在对应线程直接执行,否则queueInLoop将当前cb加入到eventloop的等待队列pendingFunctors,然后执行唤醒操作。唤醒操作也很简单,直接往eventfd写一个数据。
1 2 3 4 5 6 7 void EventLoop::wakeup () { uint64_t one = 1 ; ssize_t n = write(wakeupFd_, &one, sizeof (one)); if (n != sizeof (n)) LOG_ERROR("EventLoop::wakeup() writes %lu bytes instead of 8 \n" , n); }
eventLoop会在处理完所有activeChannl的事件后处理pendingFunctor队列里积累的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void EventLoop::loop () { looping_ = true ; quit_ = false ; LOG_INFO("EventLoop %p start looping \n" , this); while (!quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); for (Channel *channel : activeChannels_) channel->HandlerEvent(pollReturnTime_); doPendingFunctors(); } } void EventLoop::doPendingFunctors () { std ::vector <Functor> functors; callingPendingFunctors_ = true ; { unique_lock<mutex> lock (mutex_) ; functors.swap(pendingFunctors_); } for (const Functor &functor:functors) { functor(); } callingPendingFunctors_ = false ; }
其余模块 Acceptor
Socket
Buffer
TcpConnection
TcpServer
线程类:ThreadPool、ThreadData、Thread
TimerQueue
TimerQueue 定时器的实现比较有趣,时间轮:当连接有事件发生时,将连接注册到当前指针指向的格子中;指针每隔固定时间转动一格,当指向某格时,执行当前格里所有回调函数。
代码实现:不是真的把一个连接从一个格子移动到另一个格子中,而是利用引用计数方式。
格子以unordered_set的方式管理entry,一个entry就是一个连接的共享指针。
时间轮以环形队列管理
注册每秒的事件:往队尾添加一个空的Bucket。这样队头的Bucket就会弹出自动析构。
在接收到消息时,将Entry放到时间轮的队尾(如此,引用计数递增)
采用共享指针可以确保连接出现在格子中时,引用计数不为零。而当引用计数减为零时,说明连接没有在任何一个格子中出现,那么连接超时,Entry的析构函数会断开连接。
1 2 3 4 typedef std ::shared_ptr <Entry> EntryPtr;typedef std ::weak_ptr<Entry> WeakEntryPtr;typedef std ::unordered_set <EntryPtr> Bucket;typedef boost::circular_buffer<Bucket> WeakConnectionList;
Buffer
Buffer 封装了一段缓冲区,用于消息读取和写入。在muduo里,你不必自己去read或write某个socket,只会操作TcpConnection的input buffer和output buffer。
特性表现:
对外表现为连续内存,大小可以自动增长
表现为queue,从末尾写入数据,从头部读出数据
线程不安全
值的借鉴的设计有:缓冲区前增加一段8字节空间(prependable space),可以让程序以极低的代价在数据前面添加几个字节,比如说消息的长度。
代码设计:
两个游标readerIndex和writerIndex
size指向实际使用的空间,capacity指向扩容的总空间
内部腾挪:当writable空间不足时,会将已有数据整体前挪,再根据判断是否触发扩容
参考 长文梳理Muduo库核心代码及优秀编程细节剖析-CSDN博客