muduo网络库源码解析(1):多线程异步日志库(上)
muduo网络库源码解析(2):多线程异步日志库(中)
muduo网络库源码解析(3):多线程异步日志库(下)
muduo网络库源码解析(4):TimerQueue定时机制
muduo网络库源码解析(5):EventLoop,Channel与事件分发机制
muduo网络库源码解析(6):TcpServer与TcpConnection(上)
muduo网络库源码解析(7):TcpServer与TcpConnection(下)
muduo网络库源码解析(8):EventLoopThreadPool与EventLoopThread
muduo网络库源码解析(9):Connector与TcpClient
引言
前几篇文章主要对服务端的一系列问题做了分析,其中的一些组件可以Connector与TcpClient中,这两个类是编写服务端的基础,就是干了一件事情而已,即连接.
我们先来看看Connector的构造函数.
const int Connector::kMaxRetryDelayMs; //最大重连间隔
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
: loop_(loop), //本线程事件循环
serverAddr_(serverAddr), //要连接的服务端地址
connect_(false), //是否开始连接 start中触发
state_(kDisconnected), //设置连接状态
retryDelayMs_(kInitRetryDelayMs) //默认重连的时间参数
{
LOG_DEBUG << "ctor[" << this << "]";
}
有一点值得说,就是我们发现在数据成员中其实是有Channel对象的,但却并没有初始化,这也很好解释,在连接成功的时候才能有一个有效的fd,那时才可以创建一个有效的Channel.
我们来看看提供给外界的接口start.
void Connector::start()
{
connect_ = true;
loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}
void Connector::startInLoop()
{
loop_->assertInLoopThread();
assert(state_ == kDisconnected);
if (connect_)
{
connect();
}
else
{
LOG_DEBUG << "do not connect";
}
}
这里我们可以看到其实核心在于connect.
void Connector::connect()
{
int sockfd = sockets::createNonblockingOrDie(serverAddr_.family()); //创建一个一个非阻塞套接字
int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
int savedErrno = (ret == 0) ? 0 : errno;
switch (savedErrno)
{
case 0:
case EINPROGRESS: //正在连接
case EINTR: //当阻塞于某个慢系统调用的一个进程捕获某个信号且相应信号处理函数返回时,该系统调用可能返回一个EINTR错误。
case EISCONN: //连接成功
connecting(sockfd);
break;
case EAGAIN: //临时端口(ephemeral port)不足
case EADDRINUSE: //监听的端口已经被使用
case EADDRNOTAVAIL: //配置的IP不对
case ECONNREFUSED: //服务端在我们指定的端口没有进程等待与之连接
case ENETUNREACH: //表示目标主机不可达
retry(sockfd);
break;
case EACCES: //没有权限
case EPERM: //操作不被允许
case EAFNOSUPPORT: //该系统不支持IPV6
case EALREADY: //套接字非阻塞且进程已有待处理的连接
case EBADF: //无效的文件描述符
case EFAULT: //操作套接字时的一些参数无效
case ENOTSOCK: //不是一个套接字
LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
break;
default:
LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
// connectErrorCallback_();
break;
}
}
因为创建的套接字是非阻塞的,显然会有连接错误的情况,各错误代表的含义我已标注,前三种情况表示没有什么错误,会执行connecting,第二部分会进行重连,执行retry,后面的情况会执行close.
void Connector::connecting(int sockfd)
{
setState(kConnecting); //设置状态
assert(!channel_);
channel_.reset(new Channel(loop_, sockfd)); //把channel与fd相关联
channel_->setWriteCallback(
std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
channel_->setErrorCallback(
std::bind(&Connector::handleError, this)); // FIXME: unsafe
// channel_->tie(shared_from_this()); is not working,
// as channel_ is not managed by shared_ptr
channel_->enableWriting(); //IO multiplexing关注可写事件 //socket变的可写时表示连接完成
}
此时返回一个fd,我们可以创建一个channel对象,注意,因为是非阻塞的调用,此时不代表连接成功.
因为socket变的可写时表示连接完成,所以注册可写事件,同时注册了两个回调.
void Connector::handleWrite()
{
LOG_TRACE << "Connector::handleWrite " << state_;
if (state_ == kConnecting)
{
int sockfd = removeAndResetChannel(); //从poller中移除,并把channel置为空 socket出错无法恢复
int err = sockets::getSocketError(sockfd); //可写不一定连接成功 使用getsockopt确认一下 功能为获取一个套接字的选项
if (err)
{
LOG_WARN << "Connector::handleWrite - SO_ERROR = "
<< err << " " << strerror_tl(err);
retry(sockfd); //使用TimerQueue重新发起连接
}
else if (sockets::isSelfConnect(sockfd)) //出现自连接的情况
{
LOG_WARN << "Connector::handleWrite - Self connect";
retry(sockfd);
}
else
{
setState(kConnected);
if (connect_)
{
newConnectionCallback_(sockfd); //连接成功时执行回调 在TCPClient中设置
}
else
{
sockets::close(sockfd);
}
}
}
else
{
// what happened?
assert(state_ == kDisconnected);
}
}
void Connector::handleError()
{
LOG_ERROR << "Connector::handleError state=" << state_;
if (state_ == kConnecting)
{
int sockfd = removeAndResetChannel();
int err = sockets::getSocketError(sockfd);
LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err);
retry(sockfd); //出现错误的时候重新连接
}
}
注意在可写事件出现的时候不一定连接成功,原因是连接错误的时候仍会触发可读可写事件,成功只会触发可写事件,而我们只注册的可写,所以需要进行两个判断.我们再来看看尝试重新连接的情况.当然第一个条件判断我们也可以read,如过返回0则成功,调用失败则失败.
void Connector::retry(int sockfd)
{
sockets::close(sockfd);
setState(kDisconnected); //设置状态为未连接
if (connect_) //执行了start后执行这个
{
LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
<< " in " << retryDelayMs_ << " milliseconds. ";
loop_->runAfter(retryDelayMs_/1000.0, //最大30秒
std::bind(&Connector::startInLoop, shared_from_this())); //设置一个定时事件
retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs); //延迟的时间会逐渐增加
}
else
{
LOG_DEBUG << "do not connect";
}
}
失败的时候会先设置状态,最重要的是会使用到TimerQueue的功能,即设置定时事件,每次的间隔会乘2,初始为0.5秒,最大30秒
接下来我们来说说TcpClient,其实这与TcpServer很像,只不过TcpClient是一对一,而TcpServer是一对多而已,顺便把Acceptor换成了Connector,其他功能上没什么不一样的.我们来看看构造函数.
TcpClient::TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg)
: loop_(CHECK_NOTNULL(loop)),
connector_(new Connector(loop, serverAddr)), //创建一个connector,与Tcpclient一对一
name_(nameArg),
connectionCallback_(defaultConnectionCallback), //默认打印日志
messageCallback_(defaultMessageCallback), //默认重置缓冲区
retry_(false), //失败时是否重连
connect_(true),
nextConnId_(1) //对成功连接进行计数
{
connector_->setNewConnectionCallback(
std::bind(&TcpClient::newConnection, this, _1)); //给connector设置连接成功时的回调
// FIXME setConnectFailedCallback
LOG_INFO << "TcpClient::TcpClient[" << name_
<< "] - connector " << get_pointer(connector_);
}
我们可以看到其实和TcpServer的逻辑差不多.我们来看看一个连接成功时会发生什么.
void TcpClient::newConnection(int sockfd)
{
loop_->assertInLoopThread();
InetAddress peerAddr(sockets::getPeerAddr(sockfd));
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
++nextConnId_; //对连接进行计数
string connName = name_ + buf;
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));
conn->setConnectionCallback(connectionCallback_); //默认打印日志
conn->setMessageCallback(messageCallback_); //默认重置缓冲区
conn->setWriteCompleteCallback(writeCompleteCallback_); //默认不存在
conn->setCloseCallback(
std::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe //默认
{
MutexLockGuard lock(mutex_);
connection_ = conn; //每个Tcpclient对应一个TcpConnection
}
conn->connectEstablished(); //加入IO multiplexing //注册可读事件
}
这样的话一个连接就建立好了,剩下的话和服务端就差不多了,等待可读事件即可.值得注意的我们可以把一个loop传给多个Tcpclient,这样就可以在一个循环上进行了.
总结
使用非阻塞connection是必要的,因为三次握手的过程的时间是我们不可控的,我们应该把时间空出来去干有用的事情,但非阻塞意味着自己管理套接字的状态,muduo中的处理是值得我们借鉴的.
参考:
非阻塞connect的各种返回值