Acceptor
Acceptor class
用来接收一个新的 TCP 连接
Acceptor.h
#ifndef MUDUO_NET_ACCEPTOR_H
#define MUDUO_NET_ACCEPTOR_H
#include <functional>
#include <muduo/net/Channel.h>
#include <muduo/net/Socket.h>
namespace muduo
{
namespace net
{
class EventLoop;
class InetAddress;
///
/// Acceptor of incoming TCP connections.
///
class Acceptor : noncopyable
{
public:
typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallback;
//在TcpServer中创建时,传入loop也是传给TcpServer构造函数的loop,
//即创建TcpServer所在的线程创建的EventLoop
Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport);
~Acceptor();
//使用该类时由用户主动调用该函数设置回调函数,在handleRead中调用
//如果是在TCPServer中使用,TCPServer会在构造函数时设置该回调函数为TcpServer::newConnection
void setNewConnectionCallback(const NewConnectionCallback& cb)
{ newConnectionCallback_ = cb; }
bool listenning() const { return listenning_; }
//开始监听
void listen();
private:
//监听套接字上的回调函数,处理可读事件
void handleRead();
//所属的EventLoop所在的IO线程
EventLoop* loop_;
//RAII句柄,Acceptor的acceptSocket_是一个listening socket
Socket acceptSocket_;
//用于观察该socket上的可读事件,回调Acceptor::handleRead(),
//该函数调用accept接收新连接,并调用用户的回调函数
//初始化时传入的EventLoop* 参数是传给loop_的同一个
//即所属的那个EventLoop
Channel acceptChannel_;
//设置处理新连接的回调函数
NewConnectionCallback newConnectionCallback_;
bool listenning_;
int idleFd_;
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_ACCEPTOR_H
Acceptor.cc
#include <muduo/net/Acceptor.h>
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/SocketsOps.h>
#include <errno.h>
#include <fcntl.h>
//#include <sys/types.h>
//#include <sys/stat.h>
#include <unistd.h>
using namespace muduo;
using namespace muduo::net;
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop), //用户传入一个EventLoop
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())), //创建一个socketfd
acceptChannel_(loop, acceptSocket_.fd()),
listenning_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
//设置socket属性
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport);
acceptSocket_.bindAddress(listenAddr);
//该Channel使用传入的EventLoop创建,与其相关联
//设置Channel可读回调函数为handleRead
acceptChannel_.setReadCallback(
std::bind(&Acceptor::handleRead, this));
}
Acceptor::~Acceptor()
{
acceptChannel_.disableAll();
acceptChannel_.remove();
::close(idleFd_);
}
void Acceptor::listen()
{
loop_->assertInLoopThread();
listenning_ = true;
acceptSocket_.listen(); //执行listen (在Socket::listen中调用sockets::listenOrDie)
acceptChannel_.enableReading(); //channel事件类型设置为可读,更新到epoll事件集合中
}
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
//FIXME loop until no more
//接受连接,返回新连接的fd和InetAddress对象peerAddr
if (connfd >= 0)
int connfd = acceptSocket_.accept(&peerAddr);
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr); //之前设置的回调函数
}
else
{
sockets::close(connfd);
}
}
else
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of
// accept()ing when you can't" in libev's doc.
// By Marc Lehmann, author of libev.
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
在Acceptor class
里使用了socket
和对地址转换操作的封装, Acceptor
会在构造函数中创建一个监听套接字
当用户只使用该类时, 调用创建EventLoop
, 然后将EventLoop
对象作为参数创建Acceptor
对象, 然后主动调用setNewConnectionCallback
设置接收新连接时执行的回调函数, 然后执行EventLoop::loop
, 当有事件触发式, 分发给Channel
, Channel
执行回调函数, 而其回调函数已经设置为Acceptor的handleRead
在
TCPServer
里, 由EventLoopThreadPool
来创建 IO 线程并让其执行EventLoop::loop
TCPServer
TCPServer
中包含Acceptor
, EventLoopThreadPool, EventLoopThread
TCPServer
调用EventLoopThreadPool
启动 IO 线程, 执行 EventLoop::loop
, 等待事件发生, 而当有新的 TCP 连接到达时, 调用Channel
的回调函数, 使用Acceptor
接收新连接返回给TCPServer
, TCPServer
中设置的回调函数会创建一个TcpConnection class
Channel
的回调函数在Acceptor
的构造函数中已经设置为Acceptor::handleRead
, 而在TCPServer
的构造函数中会将Acceptor::handleRead
中调用的回调函数设置为TcpServer::newConnection
TCPServer.h
#ifndef MUDUO_NET_TCPSERVER_H
#define MUDUO_NET_TCPSERVER_H
#include <muduo/base/Atomic.h>
#include <muduo/base/Types.h>
#include <muduo/net/TcpConnection.h>
#include <map>
namespace muduo
{
namespace net
{
class Acceptor;
class EventLoop;
class EventLoopThreadPool;
///
/// TCP server, supports single-threaded and thread-pool models.
///
/// This is an interface class, so don't expose too much details.
class TcpServer : noncopyable
{
public:
typedef std::function<void(EventLoop*)> ThreadInitCallback;
enum Option
{
kNoReusePort,
kReusePort,
};
//TcpServer(EventLoop* loop, const InetAddress& listenAddr);
TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option = kNoReusePort);
~TcpServer(); // force out-line dtor, for std::unique_ptr members.
const string& ipPort() const { return ipPort_; }
const string& name() const { return name_; }
EventLoop* getLoop() const { return loop_; }
/// Set the number of threads for handling input.
///
/// Always accepts new connection in loop's thread.
/// Must be called before @c start
/// @param numThreads
/// - 0 means all I/O in loop's thread, no thread will created.
/// this is the default value.
/// - 1 means all I/O in another thread.
/// - N means a thread pool with N threads, new connections
/// are assigned on a round-robin basis.
void setThreadNum(int numThreads);
void setThreadInitCallback(const ThreadInitCallback& cb)
{ threadInitCallback_ = cb; }
/// valid after calling start()
std::shared_ptr<EventLoopThreadPool> threadPool()
{ return threadPool_; }
/// Starts the server if it's not listenning.
///
/// It's harmless to call it multiple times.
/// Thread safe.
void start();
/// Set connection callback.
/// Not thread safe.
void setConnectionCallback(const ConnectionCallback& cb)
{ connectionCallback_ = cb; }
/// Set message callback.
/// Not thread safe.
void setMessageCallback(const MessageCallback& cb)
{ messageCallback_ = cb; }
/// Set write complete callback.
/// Not thread safe.
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{ writeCompleteCallback_ = cb; }
private:
/// Not thread safe, but in loop
//新连接到达时Acceptor调用的回调函数
void newConnection(int sockfd, const InetAddress& peerAddr);
/// Thread safe.
void removeConnection(const TcpConnectionPtr& conn);
/// Not thread safe, but in loop
void removeConnectionInLoop(const TcpConnectionPtr& conn);
typedef std::map<string, TcpConnectionPtr> ConnectionMap;
EventLoop* loop_; // the acceptor loop
const string ipPort_;
const string name_; //服务器的名字,用户自己命名
std::unique_ptr<Acceptor> acceptor_; // avoid revealing Acceptor
std::shared_ptr<EventLoopThreadPool> threadPool_; //线程池,每个线程运行一个EventLoop
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
ThreadInitCallback threadInitCallback_;
AtomicInt32 started_;
// always in loop thread
int nextConnId_; //每一个连接的唯一ID,
ConnectionMap connections_; //连接表 map存储
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_TCPSERVER_H
TCPServer.cc
#include <muduo/net/TcpServer.h>
#include <muduo/base/Logging.h>
#include <muduo/net/Acceptor.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/EventLoopThreadPool.h>
#include <muduo/net/SocketsOps.h>
#include <stdio.h> // snprintf
using namespace muduo;
using namespace muduo::net;
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr, //一个有给定IP和port创建的InetAddress对象
const string& nameArg, //用户给定的一个string类的服务器名字
Option option)
: loop_(CHECK_NOTNULL(loop)),
ipPort_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)), //new一个Acceptor对象
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1) //序列号,用来给每一个connections_生成不同的key
{
//将Acceptor的可读回调函数设置为TcpServer::newConnection
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
}
//析构时关闭所有连接
TcpServer::~TcpServer()
{
loop_->assertInLoopThread();
LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";
for (auto& item : connections_)
{
TcpConnectionPtr conn(item.second); //map里second是TcpConnectionPtr类
item.second.reset();
conn->getLoop()->runInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));//关闭连接
}
}
//设置线程池的线程数目
void TcpServer::setThreadNum(int numThreads)
{
assert(0 <= numThreads);
threadPool_->setThreadNum(numThreads);
}
//核心函数
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
//启动线程池创建IO线程
//IO线程执行EventLoop::loop,开始等待事件的发生
//
threadPool_->start(threadInitCallback_);
//确保还没有开始监听
assert(!acceptor_->listenning());
///开始监听,loop_是创建TCPServer所在线程传进来的EventLoop
//执行EventLoop:::runInLoop执行任务,接收新连接
loop_->runInLoop(
std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}
//Acceptor调用的回调函数,主要用来创建一个TCPConnection对象,
///并将其加入ConnectionMap,设置好callback
//再调用conn->connectEstablished()
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
//从线程池取一个EventLoop对象
EventLoop* ioLoop = threadPool_->getNextLoop();
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
//序列号+1
++nextConnId_;
string connName = name_ + buf;
LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
//创建指向TcpConnection类型的智能指针
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
//connections_是map类
//key是服务器名称(用户给的string)name_,和一个int组成
connections_[connName] = conn;
//设置TcpConncetion的一系列回调函数
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
//TCP连接关闭时的回调函数
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
//在IO线程中执行建立连接的过程
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
// FIXME: unsafe
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] - connection " << conn->name();
size_t n = connections_.erase(conn->name());
(void)n;
assert(n == 1);
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
std::bind(&TcpConnection::connectDestroyed, conn));
}