文章目录
日后的学习中必然会有新的认识, 保持更新系列
muduo 的 Reactor 模式
muduo 中构成 reactor 模式的最核心的三个类是 Channel class
, EpollPoller class
, EventLoop class
, 这三个类负责将 IO 复用拿到的各个类型的事件分发给各个文件描述符对应的事件处理函数
本文假设是在默认使用
epoll
的情况下分析的
大概逻辑
首先调用 EventLoop::loop
函数, 该函数循环调用 EpollPoller::poll
函数, 在poll
函数中会调用epoll_wait
函数, 然后EventLoop::loop
拿到更新的就绪事件列表, 然后遍历就绪事件列表(一个vector<Channel *>
)再调用 Channel::handleEvent
函数进行事件的分类处理 (真正进行事件的逻辑分类的是Channel::handleEvent
中调用Channel::handleEventWithGuard
, 并在其中调用相应的回调函数)
这个就绪事件列表就是 Channel *
, 是将epoll_event
中的epoll_data_t
的共用体的指针void* ptr
指向了Channel
, epoll_wait 返回时, 会将 ptr 所指的结构体原封不动的返回, 然后会更新其中的事件类型->epoll_event::events
EventLoop class
EventLoop
类是 Reactor 模式的核心, 一个线程对应一个事件循环, 他的生命周期和所属线程一样, 它主要负责在循环中等待各类事件的触发, 然后调用对应的Channel
, (每一个事件对应一个Channel
),
EventLoop.h
#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H
#include <atomic>
#include <functional>
#include <vector>
#include <boost/any.hpp>
#include <muduo/base/Mutex.h>
#include <muduo/base/CurrentThread.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>
#include <muduo/net/TimerId.h>
namespace muduo
{
namespace net
{
//前向声明
class Channel;
class Poller;
class TimerQueue;
///
/// Reactor, at most one per thread.
///
/// This is an interface class, so don't expose too much details.
class EventLoop : noncopyable
{
public:
typedef std::function<void()> Functor; //回调函数
EventLoop();
~EventLoop(); // force out-line dtor, for std::unique_ptr members.
///
/// Loops forever.
///
/// Must be called in the same thread as creation of the object.
///
/*
核心接口
当线程创建一个 EventLoop 实例后, 调用该函数, 启动事件循环
*/
void loop();
/// Quits loop.
///
/// This is not 100% thread safe, if you call through a raw pointer,
/// better to call through shared_ptr<EventLoop> for 100% safety.
void quit();
///
/// Time when poll returns, usually means data arrival.
///
Timestamp pollReturnTime() const { return pollReturnTime_; }
int64_t iteration() const { return iteration_; }
/// Runs callback immediately in the loop thread.
/// It wakes up the loop, and run the cb.
/// If in the same loop thread, cb is run within the function.
/// Safe to call from other threads.
void runInLoop(Functor cb);
/// Queues callback in the loop thread.
/// Runs after finish pooling.
/// Safe to call from other threads.
void queueInLoop(Functor cb);
size_t queueSize() const;
// timers
///
/// Runs callback at 'time'.
/// Safe to call from other threads.
///
TimerId runAt(Timestamp time, TimerCallback cb);
///
/// Runs callback after @c delay seconds.
/// Safe to call from other threads.
///
TimerId runAfter(double delay, TimerCallback cb);
///
/// Runs callback every @c interval seconds.
/// Safe to call from other threads.
///
TimerId runEvery(double interval, TimerCallback cb);
///
/// Cancels the timer.
/// Safe to call from other threads.
///
void cancel(TimerId timerId);
// internal usage
//唤醒IO线程
//向wakeupFd_写入8字节数据,好让其返回触发事件
//会在handleRead函数中读取
void wakeup();
//更新事件集合
void updateChannel(Channel* channel);
void removeChannel(Channel* channel);
bool hasChannel(Channel* channel);
// pid_t threadId() const { return threadId_; }
void assertInLoopThread()
{
if (!isInLoopThread())
{
abortNotInLoopThread();
}
}
bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }
// bool callingPendingFunctors() const { return callingPendingFunctors_; }
bool eventHandling() const { return eventHandling_; }
void setContext(const boost::any& context)
{ context_ = context; }
const boost::any& getContext() const
{ return context_; }
boost::any* getMutableContext()
{ return &context_; }
static EventLoop* getEventLoopOfCurrentThread();
private:
void abortNotInLoopThread();
void handleRead(); // waked up 唤醒, 读取wakeup写入的8字节数据
void doPendingFunctors();
void printActiveChannels() const; // DEBUG
typedef std::vector<Channel*> ChannelList;
bool looping_; /* atomic 是否正在运行*/
std::atomic<bool> quit_;
bool eventHandling_; /* atomic */
bool callingPendingFunctors_; /* atomic */
int64_t iteration_; //8字节的一个元素
const pid_t threadId_;
Timestamp pollReturnTime_;
std::unique_ptr<Poller> poller_; //Poller(父类)成员默认初始为epoll
std::unique_ptr<TimerQueue> timerQueue_; //定时器队列
int wakeupFd_;
// unlike in TimerQueue, which is an internal class,
// we don't expose Channel to client.
std::unique_ptr<Channel> wakeupChannel_;
boost::any context_;
// scratch variables
ChannelList activeChannels_; //就绪的事件列表
Channel* currentActiveChannel_; //当前正在处理的那一个就绪事件
mutable MutexLock mutex_;
std::vector<Functor> pendingFunctors_ GUARDED_BY(mutex_);
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_EVENTLOOP_H
我们在这里只介绍最关键的EventLoop::loop
函数
其实是因为其他的没看, 不过这不重要, 留待日后更新吧
EventLoop::loop
线程执行该函数, 主循环, 进行事件的分发处理
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";
while (!quit_)
{
//activeChannels该类中vector<Channel*>类型的私有变量,存储就绪事件集合
activeChannels_.clear();
//poller_是unique_ptr<Poller>类的智能指针,在初始化函数中指向子类EpollPoller
//调用EpollPoller类中poll函数,在其中调用epoll_wait,
//kPollTimeMs是epoll_wait的timeout,在上面定义的
//返回时间戳TimeStamp的实例
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_; //不懂??????
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
//currentActiveChannel_是该类里Channel*类型成员
currentActiveChannel_ = channel; //拿到一个就绪事件
//执行相应事件回调函数,又在该函数中调用handleEventWithGuard处理各种类型事件
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}
Channel class
负责事件分发的一个类, 每一个Channel class
自始至终只属于一个EventLoop
(EventLoop
可以拥有很多Channel
, 当然的了, 毕竟不可能只处理一个文件描述符上的事件么)
每一个Channel
只负责一个文件描述符上的事件分发, 进行判断事件类型后, 调用其对应的回调函数, 但是他不拥有这个文件描述符, 在Channel
实例析构时不会 close 文件描述符,
Channel.h
#ifndef MUDUO_NET_CHANNEL_H
#define MUDUO_NET_CHANNEL_H
#include <muduo/base/noncopyable.h>
#include <muduo/base/Timestamp.h>
#include <functional>
#include <memory>
namespace muduo
{
namespace net
{
class EventLoop;
///
/// A selectable I/O channel.
///
/// This class doesn't own the file descriptor.
/// The file descriptor could be a socket,
/// an eventfd, a timerfd, or a signalfd
class Channel : noncopyable
{
public:
typedef std::function<void()> EventCallback;
typedef std::function<void(Timestamp)> ReadEventCallback;
Channel(EventLoop* loop, int fd);
~Channel();
//核心函数,进行事件的分类
void handleEvent(Timestamp receiveTime);
void setReadCallback(ReadEventCallback cb)
{ readCallback_ = std::move(cb); }
void setWriteCallback(EventCallback cb)
{ writeCallback_ = std::move(cb); }
void setCloseCallback(EventCallback cb)
{ closeCallback_ = std::move(cb); }
void setErrorCallback(EventCallback cb)
{ errorCallback_ = std::move(cb); }
/// Tie this channel to the owner object managed by shared_ptr,
/// prevent the owner object being destroyed in handleEvent.
void tie(const std::shared_ptr<void>&);
int fd() const { return fd_; }
int events() const { return events_; }
//设置就绪事件的类型, 在调用 epoll_wait 更新就绪事件列表后调用该函数更新事件类型
void set_revents(int revt) { revents_ = revt; } // used by pollers
// int revents() const { return revents_; }
bool isNoneEvent() const { return events_ == kNoneEvent; }
void enableReading() { events_ |= kReadEvent; update(); }
void disableReading() { events_ &= ~kReadEvent; update(); }
void enableWriting() { events_ |= kWriteEvent; update(); }
void disableWriting() { events_ &= ~kWriteEvent; update(); }
void disableAll() { events_ = kNoneEvent; update(); }
bool isWriting() const { return events_ & kWriteEvent; }
bool isReading() const { return events_ & kReadEvent; }
// for Poller
int index() { return index_; }
void set_index(int idx) { index_ = idx; }
// for debug
string reventsToString() const;
string eventsToString() const;
void doNotLogHup() { logHup_ = false; }
EventLoop* ownerLoop() { return loop_; }
void remove();
private:
static string eventsToString(int fd, int ev);
void update();
void handleEventWithGuard(Timestamp receiveTime);
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
EventLoop* loop_; //所属 EventLoop
const int fd_; //文件描述符,但不负责关闭该文件描述符
int events_; //关注的事件
int revents_; // poll / epoll 返回的事件
int index_; //表示在 poll 的事件数组中的序号
bool logHup_;
std::weak_ptr<void> tie_;
bool tied_;
bool eventHandling_; //是否处于处理事件中
bool addedToLoop_;
ReadEventCallback readCallback_; //各类回调函数
EventCallback writeCallback_;
EventCallback closeCallback_;
EventCallback errorCallback_;
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_CHANNEL_H
Channel.cc
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <sstream>
#include <poll.h>
using namespace muduo;
using namespace muduo::net;
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;
//初始化
Channel::Channel(EventLoop *loop, int fd__)
: loop_(loop), //指向所属EventLoop
fd_(fd__),
events_(0),
revents_(0),
index_(-1),
logHup_(true),
tied_(false),
eventHandling_(false),
addedToLoop_(false)
{
}
Channel::~Channel()
{
assert(!eventHandling_);
assert(!addedToLoop_);
if (loop_->isInLoopThread())
{
assert(!loop_->hasChannel(this));
}
}
void Channel::tie(const std::shared_ptr<void> &obj)
{
tie_ = obj;
tied_ = true;
}
//loop_指向所属的那个EventLoop,所以最终会调用EpollPoller::updateChannel
//该函数在enablereading()函数中被调用
void Channel::update()
{
addedToLoop_ = true;
//这又会调用EpollPoller::updateChannel
loop_->updateChannel(this);
}
//逻辑和上面一样
void Channel::remove()
{
assert(isNoneEvent());
addedToLoop_ = false;
loop_->removeChannel(this);
}
//核心函数, 调用Channel::handleEventWithGuard
void Channel::handleEvent(Timestamp receiveTime)
{
std::shared_ptr<void> guard;
//tied_是该类里bool类型
if (tied_)
{
guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime);
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
//处理各种类型事件
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true; //防止在执行期间channel析构,析构函数中会判断该bool值
LOG_TRACE << reventsToString();
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
}
if (closeCallback_)
closeCallback_();
}
if (revents_ & POLLNVAL)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL)) //错误事件处理
{
if (errorCallback_)
errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) //可读事件处理
{
if (readCallback_)
readCallback_(receiveTime);
}
if (revents_ & POLLOUT) //可写事件处理
{
if (writeCallback_)
writeCallback_();
}
eventHandling_ = false;
}
string Channel::reventsToString() const
{
return eventsToString(fd_, revents_);
}
string Channel::eventsToString() const
{
return eventsToString(fd_, events_);
}
string Channel::eventsToString(int fd, int ev)
{
std::ostringstream oss;
oss << fd << ": ";
if (ev & POLLIN)
oss << "IN ";
if (ev & POLLPRI)
oss << "PRI ";
if (ev & POLLOUT)
oss << "OUT ";
if (ev & POLLHUP)
oss << "HUP ";
if (ev & POLLRDHUP)
oss << "RDHUP ";
if (ev & POLLERR)
oss << "ERR ";
if (ev & POLLNVAL)
oss << "NVAL ";
return oss.str();
}
EpollPoller class
Poller class
Poller class
是 muduo 中的 IO 复用的基类, 因为 muduo 同时支持 poll epoll 两种机制, 拥有 PollPoller
和 EpollPoller
两个子类, 如果不设置环境变量的话, 会默认初始化为 epoll,
初始化
在 EventLoop
的初始化函数中:
EventLoop::EventLoop()
: looping_(false),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
iteration_(0),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)), //基类指针指向了EpollPoller子类
timerQueue_(new TimerQueue(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_)),
currentActiveChannel_(NULL)
{
LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_;
if (t_loopInThisThread)
{
LOG_FATAL << "Another EventLoop " << t_loopInThisThread
<< " exists in this thread " << threadId_;
}
else
{
t_loopInThisThread = this;
}
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
wakeupChannel_->enableReading();
}
我们注意到调用了
Poller::newDefaultPoller(this)
而这个函数如下:
Poller* Poller::newDefaultPoller(EventLoop* loop)
{
//getenv函数获取环境变量, 如果环境变量中没有设置MUDUO_USE_POLL,默认使用epoll,
if (::getenv("MUDUO_USE_POLL")) //没有该环境变量就返回NULL
{
return new PollPoller(loop);
}
else
{
return new EPollPoller(loop);
}
}
所以说, 如果我们没有设置环境变量MUDUO_USE_POLL
, 就会new
一个子类EpollPoller
, 并让EventLoop
中的std::unique_ptr<Poller> poller_
一个Poller(父类)成员指针指向它, 并以此调用Poller
中声明在EpollPoller
中实现的各种virtual
函数
EpollPoller.h
#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H
#define MUDUO_NET_POLLER_EPOLLPOLLER_H
#include <muduo/net/Poller.h>
#include <vector>
struct epoll_event;
namespace muduo
{
namespace net
{
///
/// IO Multiplexing with epoll(4).
///
class EPollPoller : public Poller
{
public:
EPollPoller(EventLoop* loop);
~EPollPoller() override;
//核心函数, 返回就绪事件列表
Timestamp poll(int timeoutMs, ChannelList* activeChannels) override;
//更新事件集合
void updateChannel(Channel* channel) override;
void removeChannel(Channel* channel) override;
private:
static const int kInitEventListSize = 16;
static const char* operationToString(int op);
void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const;
//对 epoll_ctl 的封装
void update(int operation, Channel* channel);
typedef std::vector<struct epoll_event> EventList;
int epollfd_;
EventList events_;
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_POLLER_EPOLLPOLLER_H
EpollPoller.cc
主要的函数接口是其中的poll
函数
#include <muduo/net/poller/EPollPoller.h>
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <assert.h>
#include <errno.h>
#include <poll.h>
#include <sys/epoll.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
// On Linux, the constants of poll(2) and epoll(4)
// are expected to be the same.
static_assert(EPOLLIN == POLLIN, "epoll uses same flag values as poll");
static_assert(EPOLLPRI == POLLPRI, "epoll uses same flag values as poll");
static_assert(EPOLLOUT == POLLOUT, "epoll uses same flag values as poll");
static_assert(EPOLLRDHUP == POLLRDHUP, "epoll uses same flag values as poll");
static_assert(EPOLLERR == POLLERR, "epoll uses same flag values as poll");
static_assert(EPOLLHUP == POLLHUP, "epoll uses same flag values as poll");
namespace
{
const int kNew = -1;
const int kAdded = 1;
const int kDeleted = 2;
}
EPollPoller::EPollPoller(EventLoop* loop)
: Poller(loop),
epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize)
{
if (epollfd_ < 0)
{
LOG_SYSFATAL << "EPollPoller::EPollPoller";
}
}
EPollPoller::~EPollPoller()
{
::close(epollfd_);
}
//核心函数, 调用 epoll_wait, 返回就绪事件列表
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
LOG_TRACE << "fd total count " << channels_.size();
//调用epoll_wait
//events是一个EPollPoller中struct epoll_event的vector私有变量
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(), //可以使用events_.data()返回指向第一个成员的指针
static_cast<int>(events_.size()),
timeoutMs);
int savedErrno = errno;
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
LOG_TRACE << numEvents << " events happened";
//更新Channel列表
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size()*2); //如果返回的事件数目等于当前事件数组大小,就分配2倍空间
}
}
else if (numEvents == 0)
{
LOG_TRACE << "nothing happened";
}
else
{
// error happens, log uncommon ones
if (savedErrno != EINTR)
{
errno = savedErrno;
LOG_SYSERR << "EPollPoller::poll()";
}
}
return now; //返回时间戳
}
//更新事件列表
void EPollPoller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
assert(implicit_cast<size_t>(numEvents) <= events_.size());
for (int i = 0; i < numEvents; ++i)
{
//epoll_wait会原封不动返回ptr指向的结构体
Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
#ifndef NDEBUG
int fd = channel->fd(); //拿到其中的文件描述符
ChannelMap::const_iterator it = channels_.find(fd); //ChannelMap以文件描述符为key
assert(it != channels_.end());
assert(it->second == channel);
#endif
channel->set_revents(events_[i].events); //将事件类型赋值给Channel类中revents_元素
activeChannels->push_back(channel); //添加进就绪事件合集
}
}
void EPollPoller::updateChannel(Channel* channel)
{
Poller::assertInLoopThread();
const int index = channel->index(); //获得该Channel在poll事件数组中的下标
LOG_TRACE << "fd = " << channel->fd()
<< " events = " << channel->events() << " index = " << index;
if (index == kNew || index == kDeleted)
{
// a new one, add with EPOLL_CTL_ADD
int fd = channel->fd();
if (index == kNew)
{
assert(channels_.find(fd) == channels_.end());
channels_[fd] = channel;
}
else // index == kDeleted
{
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
}
channel->set_index(kAdded);
update(EPOLL_CTL_ADD, channel);
}
else
{
// update existing one with EPOLL_CTL_MOD/DEL
int fd = channel->fd();
(void)fd;
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
assert(index == kAdded);
if (channel->isNoneEvent())
{
update(EPOLL_CTL_DEL, channel);
channel->set_index(kDeleted);
}
else
{
update(EPOLL_CTL_MOD, channel);
}
}
}
void EPollPoller::removeChannel(Channel* channel)
{
Poller::assertInLoopThread();
int fd = channel->fd();
LOG_TRACE << "fd = " << fd;
assert(channels_.find(fd) != channels_.end());
assert(channels_[fd] == channel);
assert(channel->isNoneEvent());
int index = channel->index();
assert(index == kAdded || index == kDeleted);
size_t n = channels_.erase(fd);
(void)n;
assert(n == 1);
if (index == kAdded)
{
update(EPOLL_CTL_DEL, channel);
}
channel->set_index(kNew);
}
void EPollPoller::update(int operation, Channel* channel) //对epoll_ctl()的封装
{
struct epoll_event event;
memZero(&event, sizeof event);
event.events = channel->events();
event.data.ptr = channel; //将epoll_event中epoll_data的ptr指向channel
int fd = channel->fd();
LOG_TRACE << "epoll_ctl op = " << operationToString(operation)
<< " fd = " << fd << " event = { " << channel->eventsToString() << " }";
if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
{
if (operation == EPOLL_CTL_DEL)
{
LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
}
else
{
LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd;
}
}
}
const char* EPollPoller::operationToString(int op)
{
switch (op)
{
case EPOLL_CTL_ADD:
return "ADD";
case EPOLL_CTL_DEL:
return "DEL";
case EPOLL_CTL_MOD:
return "MOD";
default:
assert(false && "ERROR op");
return "Unknown Operation";
}
}
在EPollPoller::poll
以及EPollPoller::fillActiveChannels
函数执行完后, 就绪事件列表更新完成, 还记得EventLoop::loop
函数么, 在更新完就绪事件链表后, 就会进入下面这个 for 循环, 遍历就绪事件列表, 执行相应的回调函数
for (Channel* channel : activeChannels_)
{
//currentActiveChannel_是该类里Channel*类型成员
currentActiveChannel_ = channel; //拿到一个就绪事件
//执行相应事件回调函数,又在该函数中调用handleEventWithGuard处理各种类型事件
currentActiveChannel_->handleEvent(pollReturnTime_);
}
至此, 已经完成了一次事件的监听, 分发, 处理