muduo网络库源码解析(1):多线程异步日志库(上)
muduo网络库源码解析(2):多线程异步日志库(中)
muduo网络库源码解析(3):多线程异步日志库(下)
muduo网络库源码解析(4):TimerQueue定时机制
muduo网络库源码解析(5):EventLoop,Channel与事件分发机制
muduo网络库源码解析(6):TcpServer与TcpConnection(上)
muduo网络库源码解析(7):TcpServer与TcpConnection(下)
muduo网络库源码解析(8):EventLoopThreadPool与EventLoopThread
muduo网络库源码解析(9):Connector与TcpClient
引言
从这一篇中正式进入muduo的核心部分解析,分两篇进行,这一篇中解析TcpServer这个网络库的枢纽,其完美展示了muduo的事件分发机制,接着介绍TcpConnection这个muduo中最庞大的类中管理Tcp连接与断开的部分.
我们最先来看看TcpServer的构造函数,从这里往出扩展.
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
ipPort_(listenAddr.toIpPort()),
name_(nameArg), //标识和生成唯一键值
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)), //acceptor中存在套接字,回调进行事件分发
threadPool_(new EventLoopThreadPool(loop, name_)), //IO线程池 每一个元素为一个reactor
connectionCallback_(defaultConnectionCallback), //默认为打印日志
messageCallback_(defaultMessageCallback), //默认重置buffer
nextConnId_(1) ////标识和生成唯一键值
{
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
} //新连接到来的时候会回调newConnection 产生一个TcpConnection对象
这里首先看看==Acceptor这个类,它管理了所有的连接.==同样先来看看构造函数.
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())), //创建一个socket套接字
acceptChannel_(loop, acceptSocket_.fd()),
listenning_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true); //设置地址复用
acceptSocket_.setReusePort(reuseport); //根据参数设置端口复用 与上面一条结合在一起取消Time_wait状态
acceptSocket_.bindAddress(listenAddr); //绑定地址
acceptChannel_.setReadCallback( //接收到可读事件的回调 即服务器收到连接后要做的事情`
std::bind(&Acceptor::handleRead, this));
}
成员构造部分只有一个地方要说,就是idleFd_,为什么要打开/dev/null呢,原因是为了防止busy_loop问题,即描述符满的情况,值得一提的是muduo的做法是有一定问题的,多线程下存在race_condition,本文重点不是这个,这是对这个问题的解析.
我们来看看handleRead这个函数,也就是连接到来时执行的回调.
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr); //peerAddr用于获取对端的协议地址
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
if (newConnectionCallback_)
{
//写一个资源管理类更好 这样用户函数中可能并不会close 且很难保证异常安全
newConnectionCallback_(connfd, peerAddr);
}
else
{
sockets::close(connfd);
}
}
else /*accept失败的时候即代表发生了busyloop
这个时候我们只需要关闭idleFd,接着accept,再close,让对端知道已经关闭即可 接着再打开*/
//注意 这种方法看似可行 实则存在race condition.
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of
// accept()ing when you can't" in libev's doc.
// By Marc Lehmann, author of libev.
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
第一个if中就是连接事件到来时所进行的事情,即执行回调,这个回调其实一般是不变的,会在TcpServer中设置为建立一个新连接,后面会提到.如果accept失败的话,就会进行else中的语句,就是处理busy_loop,做法是先close掉idleFd_,这是文件描述符空出来,再accept成功,紧接着close连接,这样的话可以通知客户端.然后再open idleFd_就可以了,这段代码有两个问题,一个是前面提到的race condition,一个是非异常安全,我们可以把open和close写成一个资源管理类(仍解决不了race condition的问题).
我们可以发现在构造函数中并没有进行listen,
void Acceptor::listen()
{
loop_->assertInLoopThread();
listenning_ = true;
acceptSocket_.listen(); //设置监听套接字
acceptChannel_.enableReading(); //向loop中加入这个事件回调 可读时调用handleRead
}
我们可以看到listen中执行了enableReading,这会把这个套接字加入到此线程的IO multiplexing中,这也符合我们的写法,即主线程有一个IO multiplexing用于处理连接.Acceptor看完了,我们继续来看TcpServer.
看完了acceptor我们在来看Tcpserver的构造函数就容易多了.
{
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
} //新连接到来的时候会回调newConnection 产生一个TcpConnection对象
显然我们会在有连接到来的时候调用newConnection.这是一个核心成员函数.
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();
//事件分发机制 ,因为每个线程的负载能力应该都差不多,所以用round robin进行负载均衡
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf; //以上代码是为了生成每个连接的唯一健值
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd)); //获取客户端地址
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr)); //创建一个TcpConnection
connections_[connName] = conn; //放入map 伴随着一次,即引用计数增加
conn->setConnectionCallback(connectionCallback_); //默认为打印日志
conn->setMessageCallback(messageCallback_); //默认重置缓冲区
conn->setWriteCompleteCallback(writeCompleteCallback_); //写操作完成时触发的回调 默认为空
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); //加入loop事件循环 并延长conn的生命周期
}
第二句就涉及到了一个成员,它负责事件分发,我们来重点看看threadPool这个成员函数,这是TcpServer的核心成员.其实说实话我感觉名字是有点小小的误导性,就像是算法导论和深入理解操作系统一样,前者一点也不是导论,后者一点也不深入.而Threadpool严格来说也不算是一个线程池,实际它是存储IO线程的容器,即其中每一个元素都是一个reactor,这样说应该就清楚了许多,在构造函数中我们已经完成了threadpool_的构造,这一篇我们简单介绍,在第八篇中详细解释.
void TcpServer::setThreadNum(int numThreads) //核心
{
assert(0 <= numThreads);
threadPool_->setThreadNum(numThreads);
}
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)//先get得到0,然后给其赋值为1,意味着这个函数只执行一次
{
threadPool_->start(threadInitCallback_); //启动IO线程池
assert(!acceptor_->listenning());
loop_->runInLoop(
std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}
我们看到在Tcpserver启动的时候,即start这个二次构造的函数启动的时候,执行了threadPool.start
//启动reactor线程
void EventLoopThreadPool::start(const ThreadInitCallback& cb)
{
assert(!started_);
baseLoop_->assertInLoopThread();
started_ = true;
for (int i = 0; i < numThreads_; ++i)
{
char buf[name_.size() + 32];
snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
EventLoopThread* t = new EventLoopThread(cb, buf);
threads_.push_back(std::unique_ptr<EventLoopThread>(t)); //管理线程
loops_.push_back(t->startLoop()); //存储每一个reactor线程的loop指针,用作事件分发
}
if (numThreads_ == 0 && cb) //无reactor线程
{
cb(baseLoop_); //既然没有处理事件的线程当然只能执行回调了
}
}
这我们可以看出想要使你的程序是多线程还是单线程就调用setThreadNum就可以了.
我们可以看到在for循环的最后一句中调用了startloop
EventLoop* EventLoopThread::startLoop()
{
assert(!thread_.started());
thread_.start();
EventLoop* loop = NULL;
{
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
cond_.wait(); //线程中在调用notify是线程的eventloop对象已经被赋予loop_
}
loop = loop_;
}
return loop;
}
我们可以看到在函数的第二句函数执行,
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,
const string& name)
: loop_(NULL),
exiting_(false),
thread_(std::bind(&EventLoopThread::threadFunc, this), name),
mutex_(),
cond_(mutex_),
callback_(cb)
{
}
我们可以看到构造函数部分thread_被绑定的函数是EventLoopThread::threadFunc,这我们在上一节分析过,会使得线程开始执行其事件循环,也就是说reactor的真正执行从这里开始了.
再把newConnection的代码拉一遍,方便解释
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();
//事件分发机制 ,因为每个线程的负载能力应该都差不多,所以用round robin进行负载均衡
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf; //以上代码是为了生成每个连接的唯一健值
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd)); //获取客户端地址
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr)); //创建一个TcpConnection
connections_[connName] = conn; //放入map 伴随着一次,即引用计数增加
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
//这里保存了主线程的Tcpserver,意味着执行这个函数的时候可以调用主线程的loop 下面会说道
std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe //这里放入了
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn)); //加入loop事件循环 并延长conn的生命周期
}
...............................
//typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
typedef std::map<string, TcpConnectionPtr> ConnectionMap;
这个函数其他部分比较简单,我们从conn对象被创建开始说,首先抛出一个问题,为什么存储连接信息的map中Tcpconnection要用shard_ptr?原因就是为了一种特殊的情况,即一个Tcpconnection正在一个线程池中处理事件,但是恰好在reactor的事件循环中客户端关闭,这就会导致事件执行时对象析构,也就是muduo在第一章中提到的问题.我们不妨来回想如何解决这个问题呢,就是在reactor的回调中加入一个weak_ptr,在他执行回调的时候看下对应的shared_ptr是否存在,存在的话进行拷贝,即增加引用计数,把Tcpconnection的生命周期延长为函数的执行周期,这样就可避免上面的问题,这也是它map中存为shared_ptr的原因,在reactor]事件循环中收到断开事件时,就从map中删除,此时如果没有执行函数,那么这个Tcpconnection析构,如果正在执行函数,那么引用计数不会消失,对象会存在到函数执行完毕,特殊的,如果删除时还未执行但已在等待队列,执行的时候如果发现shared_ptr对象不存在不执行即可.muduo中的做法实在channl对象中加入一个类型为std::weak_ptr< void>的成员tie_,已实现上面所说的功能.
接着分析,剩下的便是注册回调了,我们重点来看看Closecallback
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
// FIXME: unsafe //此时loop_为主线程loop runinloop切换线程
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
//参数中自带一个智能指针的计数,TcpConnection生命周期延长到connectDestroyed
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] - connection " << conn->name();
size_t n = connections_.erase(conn->name()); //从主线程的连接集合中删除
(void)n;
assert(n == 1);
EventLoop* ioLoop = conn->getLoop(); //切换到IO线程执行
ioLoop->queueInLoop( //为什么在这里不使用runinloop呢?因为此时必定在主线程
std::bind(&TcpConnection::connectDestroyed, conn));
/*我们发现conn实际上在删除的整个期间是存在的,意味着引用计数一直为1(用户不持有的情况下),
这其实就是利用智能指针的引用技术在删除conn和conn执行回调之间加上的一个同步关系,这也是为什么没有加锁的原因 牛逼*/
}
正如注释中所写,我们可以发现closecallback实际上会延长con的生命周期知道删除完毕才会把引用计数减为1,这样做有什么好处呢?其实就是利用智能指针的引用技术在删除conn和conn执行回调之间加上的一个同步关系,这样就使得删除整个删除结束前对象仍然存在,在删除结束后不存在,执行删除时执行回调的话会继续延长生命周期,不执行回调的话就在删除结束后对象死亡.
我们还看到上面提到了两个TcpConnection中的函数,即TcpConnection::connectDestroyed和TcpConnection::connectEstablished,
void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected); //设置连接状态
channel_->tie(shared_from_this()); //绑定当前对象 为了在执行回调时延长对象生命周期 防止执行时被析构
channel_->enableReading(); //加入IO multiplexing
connectionCallback_(shared_from_this()); //默认为打印日志
}
这是一个很重要的函数,我们发现在newConnection中实际上已经加入了管理连接的map中,这里的enableReading就会把它加入所属的事件循环,同样channel的tie函数上面我们也分析过.
void TcpConnection::connectDestroyed() //通知用户连接已经断开
{
loop_->assertInLoopThread();
if (state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll();
connectionCallback_(shared_from_this()); //默认为打印日志
}
channel_->remove(); //从事件循环中去除当前连接(析构)
}
这没有什么说的.
到这里Tcpserver涉及的部分就说完了,接下来我们看看TcpConnection,先从构造函数入手
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)), //分配的IO线程
name_(nameArg),
state_(kConnecting), //设置初始状态值
reading_(true),
socket_(new Socket(sockfd)), //sockfd为accept的返回值
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
//创建时需要为channel对象设置回调,在TcpConnection::connectEstablished中加入IO multiplexing
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, _1));
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true); //设置TCP保活机制
}
我们可以看到在初始化时会给channel_对象设置回调,这很好理解,上一篇提到过,就是在事件到来时要触发的回调.
我们一个一个看看,首先看看非常重要的对于可读事件的处理
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); //从套接字向inputbuffer中读数据 具体在下一篇中讨论
if (n > 0) //证明有数据 并不是断开连接
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); //执行数据到来时的回调
}
else if (n == 0)
{
handleClose();//TcpConnection::handleClose
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}
这里最需要说的就是第一个if判断中,当出现可读事件的时候我们发现会增加引用计数,什么时候会调用handleRead呢?就是channel的handleEvent中,这样看来,其实就是在时间触发时,如果conn还存在的话才会执行shared_from_this.其实readFd也很重要,但我们放到下一篇中与发数据一起讨论.
void Channel::handleEvent(Timestamp receiveTime) //防止对象执行时被析构
{
std::shared_ptr<void> guard;
if (tied_)
{
//最最重要的一句, 在绑定的TcpConnection对象存在时gaurd为true,并延长TcpConnection的生命周期
guard = tie_.lock();
if (guard)
{
handleEventWithGuard(receiveTime); //正常的执行逻辑 调用注册的回调
}
}
else
{
handleEventWithGuard(receiveTime);
}
}
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
eventHandling_ = true;
LOG_TRACE << reventsToString();
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}
if (revents_ & POLLNVAL)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
最后我们再说一个重要的回调,就是handleClose,这是我们在收到可读事件返回为0的时候触发的回调,
void TcpConnection::handleClose()
{
loop_->assertInLoopThread();
LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
assert(state_ == kConnected || state_ == kDisconnecting);
// we don't close fd, leave it to dtor, so we can find leaks easily.
setState(kDisconnected);
channel_->disableAll();
TcpConnectionPtr guardThis(shared_from_this());
connectionCallback_(guardThis); //打印日志
// must be the last line
closeCallback_(guardThis); //干了一切该干的事情
}
我们发现这里的shared_from_this与在TcpServer::newConnection中提到的延长声明周期便对上了.
到这里,我们就把TcpConnection的连接与断开将清楚了.
这里再说一点,就是我们在这些代码里偶尔会看到的state_,它到底是干什么的呢,其实就是标识当前连接的状态,下面列出了它出现的时候,看类相信大家就清楚它的作用了
state_状态转移 | 重置点 |
---|---|
state_(kConnecting) | 构造函数 |
kConnecting->kConnected | connectEstablished |
kConnected->kDisconnected | connectDestroyed,handleClose |
kConnected->kDisconnecting,kDisconnecting->kDisconnecting | shutdown,forceClose,forceCloseWithDelay |
最有意思的是最后一项,那些函数意味着muduo服务端是可以主动关闭的,还提供forceCloseWithDelay这样延长对象声明周期的删除函数.其实和正常的删除流程是一样的,会延长对象的声明周期.我觉得这个状态存在的原因是为了debug,它唯一除了转移以外出现的地方其实就是打日志了.有兴趣的朋友可以在TcpConnection.cc中Ctrl+F看下出现在哪里stateToString的定义和出现的地方就清楚了.