Connector class
Connector class
负责主动发起连接, 他不单独使用, 而是包含在TcpClient class
内, Connector
不负责创建 socket, 只负责连接的建立, 包括这其中的错误处理和重连
需要考虑的难点:
- socket 是一次性的, 当 connect 出错, 我们必须关闭该 socket, 重新创建一个 socket, 但是
Connector
是可以反复使用的, - 错误码和
Acceptor
不同, EAGAIN 是真的出错了, 表明本机临时端口暂时用完了( ephemeral port ), 需要延期重试, “正在连接” 的的返回码是 EINPROGRESS, - 重试的间隔应该逐渐延长,
- 需要处理自连接
Connector.h
#ifndef MUDUO_NET_CONNECTOR_H
#define MUDUO_NET_CONNECTOR_H
#include <muduo/base/noncopyable.h>
#include <muduo/net/InetAddress.h>
#include <functional>
#include <memory>
namespace muduo
{
namespace net
{
class Channel;
class EventLoop;
class Connector : noncopyable,
public std::enable_shared_from_this<Connector>
{
public:
typedef std::function<void (int sockfd)> NewConnectionCallback;
Connector(EventLoop* loop, const InetAddress& serverAddr);
~Connector();
void setNewConnectionCallback(const NewConnectionCallback& cb)
{ newConnectionCallback_ = cb; }
void start(); // can be called in any thread
void restart(); // must be called in loop thread
void stop(); // can be called in any thread
const InetAddress& serverAddress() const { return serverAddr_; }
private:
enum States { kDisconnected, kConnecting, kConnected };
static const int kMaxRetryDelayMs = 30*1000; //最大重连延迟30000ms
static const int kInitRetryDelayMs = 500; //初始化重连延迟500ms
void setState(States s) { state_ = s; }
void startInLoop();
void stopInLoop();
void connect();
void connecting(int sockfd);
void handleWrite();
void handleError();
void retry(int sockfd);
int removeAndResetChannel();
void resetChannel();
EventLoop* loop_; //所属的EventLoop
InetAddress serverAddr_; //要连接的server地址
bool connect_; // atomic
States state_; // FIXME: use atomic variable
std::unique_ptr<Channel> channel_; //对应的channel
NewConnectionCallback newConnectionCallback_; //连接成功时的回调函数
int retryDelayMs_; //连接失败时的重连延迟时间
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_CONNECTOR_H
Connector.cc
#include <muduo/net/Connector.h>
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/SocketsOps.h>
#include <errno.h>
using namespace muduo;
using namespace muduo::net;
const int Connector::kMaxRetryDelayMs;
Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
: loop_(loop),
serverAddr_(serverAddr),
connect_(false),
state_(kDisconnected),
retryDelayMs_(kInitRetryDelayMs)
{
LOG_DEBUG << "ctor[" << this << "]";
}
Connector::~Connector()
{
LOG_DEBUG << "dtor[" << this << "]";
assert(!channel_);
}
void Connector::start()
{
connect_ = true;
//在所属IO线程Lloop中调用该函数
loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}
void Connector::startInLoop()
{
loop_->assertInLoopThread();
assert(state_ == kDisconnected);
if (connect_) //在start中会将其设置为true
{
connect(); //请求连接
}
else
{
LOG_DEBUG << "do not connect";
}
}
void Connector::stop()
{
connect_ = false;
loop_->queueInLoop(std::bind(&Connector::stopInLoop, this)); // FIXME: unsafe
// FIXME: cancel timer
}
void Connector::stopInLoop()
{
loop_->assertInLoopThread();
if (state_ == kConnecting)
{
setState(kDisconnected);
int sockfd = removeAndResetChannel();
retry(sockfd);
}
}
void Connector::connect()
{
//使用socket选项SOCK_NONBLOCK创建一个非阻塞socket
int sockfd = sockets::createNonblockingOrDie(serverAddr_.family());
//调用connect进行请求连接
int ret = sockets::connect(sockfd, serverAddr_.getSockAddr());
int savedErrno = (ret == 0) ? 0 : errno;
switch (savedErrno)
{
case 0:
case EINPROGRESS: //socket是非阻塞,返回这个错误码表示正在连接
case EINTR:
case EISCONN: //连接成功
connecting(sockfd);
break;
case EAGAIN: //connect返回EAGAIN是真的错误了
case EADDRINUSE:
case EADDRNOTAVAIL:
case ECONNREFUSED:
case ENETUNREACH:
retry(sockfd); //重连
break;
case EACCES:
case EPERM:
case EAFNOSUPPORT:
case EALREADY:
case EBADF:
case EFAULT:
case ENOTSOCK:
LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd); //关闭socket
break;
default:
LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
sockets::close(sockfd);
// connectErrorCallback_();
break;
}
}
void Connector::restart()
{
loop_->assertInLoopThread();
setState(kDisconnected);
retryDelayMs_ = kInitRetryDelayMs;
connect_ = true;
startInLoop();
}
void Connector::connecting(int sockfd)
{
//设置状态为正在进行连接
setState(kConnecting);
assert(!channel_);
//将该channel和EventLoop,socket关联
//channel对应于一个文件描述符,所以在有了socket后才能创建channel
channel_.reset(new Channel(loop_, sockfd));
//设置回调函数
channel_->setWriteCallback(
std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
channel_->setErrorCallback(
std::bind(&Connector::handleError, this)); // FIXME: unsafe
// channel_->tie(shared_from_this()); is not working,
// as channel_ is not managed by shared_ptr
//注册可写事件
channel_->enableWriting();
}
int Connector::removeAndResetChannel()
{
channel_->disableAll();
channel_->remove();
int sockfd = channel_->fd();
// Can't reset channel_ here, because we are inside Channel::handleEvent
loop_->queueInLoop(std::bind(&Connector::resetChannel, this)); // FIXME: unsafe
return sockfd;
}
void Connector::resetChannel()
{
channel_.reset();
}
//可写事件的回调函数
void Connector::handleWrite()
{
LOG_TRACE << "Connector::handleWrite " << state_;
if (state_ == kConnecting)
{
//移除channel(Connector的channel只管理建立连接的阶段),成功建立连接后
//交给TcpClient的TcpConnection来管理
int sockfd = removeAndResetChannel();
//可写并不一定连接建立成功
//如果连接发生错误,socket会是可读可写的
//所以还需要调用getsockopt检查是否出错
int err = sockets::getSocketError(sockfd);
if (err)
{
LOG_WARN << "Connector::handleWrite - SO_ERROR = "
<< err << " " << strerror_tl(err);
retry(sockfd); //出错重连
}
else if (sockets::isSelfConnect(sockfd)) //是否是自连接
{
LOG_WARN << "Connector::handleWrite - Self connect";
retry(sockfd); //自连接的话断开重连
}
else
{
//连接成功建立,更改状态
//调用TcpClient设置的回调函数,创建TcpConnection对象
setState(kConnected);
if (connect_)
{
newConnectionCallback_(sockfd);
}
else
{
sockets::close(sockfd);
}
}
}
else
{
// what happened?
assert(state_ == kDisconnected);
}
}
void Connector::handleError()
{
LOG_ERROR << "Connector::handleError state=" << state_;
if (state_ == kConnecting)
{
int sockfd = removeAndResetChannel();
int err = sockets::getSocketError(sockfd);
LOG_TRACE << "SO_ERROR = " << err << " " << strerror_tl(err);
retry(sockfd);
}
}
void Connector::retry(int sockfd)
{
//socket是一次性的,失败后需要关闭重新创建
sockets::close(sockfd);
setState(kDisconnected);
if (connect_)
{
LOG_INFO << "Connector::retry - Retry connecting to " << serverAddr_.toIpPort()
<< " in " << retryDelayMs_ << " milliseconds. ";
//隔一段时间后重连,重新执行startInLoop
loop_->runAfter(retryDelayMs_/1000.0,
std::bind(&Connector::startInLoop, shared_from_this()));
//间隔时间翻倍
retryDelayMs_ = std::min(retryDelayMs_ * 2, kMaxRetryDelayMs);
}
else
{
LOG_DEBUG << "do not connect";
}
}
TcpClient
TCPClient
使用Conneccor
发起连接, 连接建立成功后, 用 socket 创建TcpConnection
来管理连接, 每个TcpClient class
只管理一个TcpConnecction
TcpClient.h
#ifndef MUDUO_NET_TCPCLIENT_H
#define MUDUO_NET_TCPCLIENT_H
#include <muduo/base/Mutex.h>
#include <muduo/net/TcpConnection.h>
namespace muduo
{
namespace net
{
class Connector;
typedef std::shared_ptr<Connector> ConnectorPtr;
class TcpClient : noncopyable
{
public:
// TcpClient(EventLoop* loop);
// TcpClient(EventLoop* loop, const string& host, uint16_t port);
TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg);
~TcpClient(); // force out-line dtor, for std::unique_ptr members.
void connect();
void disconnect();
void stop();
TcpConnectionPtr connection() const
{
MutexLockGuard lock(mutex_);
return connection_;
}
EventLoop* getLoop() const { return loop_; }
bool retry() const { return retry_; }
void enableRetry() { retry_ = true; }
const string& name() const
{ return name_; }
/// Set connection callback.
/// Not thread safe.
void setConnectionCallback(ConnectionCallback cb)
{ connectionCallback_ = std::move(cb); }
/// Set message callback.
/// Not thread safe.
void setMessageCallback(MessageCallback cb)
{ messageCallback_ = std::move(cb); }
/// Set write complete callback.
/// Not thread safe.
void setWriteCompleteCallback(WriteCompleteCallback cb)
{ writeCompleteCallback_ = std::move(cb); }
private:
/// Not thread safe, but in loop
void newConnection(int sockfd);
/// Not thread safe, but in loop
void removeConnection(const TcpConnectionPtr& conn);
//用户创建传入的EventLoop
EventLoop* loop_;
//指向Connector的shared_ptr
ConnectorPtr connector_; // avoid revealing Connector
//用户指定的名字
const string name_;
//这些回调函数都需要显示设置
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
//连接断开后是否重连
bool retry_; // atomic
bool connect_; // atomic
// always in loop thread
int nextConnId_;
mutable MutexLock mutex_;
TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};
} // namespace net
} // namespace muduo
#endif // MUDUO_NET_TCPCLIENT_H
TcpClient.cc
#include <muduo/net/TcpClient.h>
#include <muduo/base/Logging.h>
#include <muduo/net/Connector.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/SocketsOps.h>
#include <stdio.h> // snprintf
using namespace muduo;
using namespace muduo::net;
// TcpClient::TcpClient(EventLoop* loop)
// : loop_(loop)
// {
// }
// TcpClient::TcpClient(EventLoop* loop, const string& host, uint16_t port)
// : loop_(CHECK_NOTNULL(loop)),
// serverAddr_(host, port)
// {
// }
namespace muduo
{
namespace net
{
namespace detail
{
void removeConnection(EventLoop* loop, const TcpConnectionPtr& conn)
{
loop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
void removeConnector(const ConnectorPtr& connector)
{
//connector->
}
} // namespace detail
} // namespace net
} // namespace muduo
TcpClient::TcpClient(EventLoop* loop,
const InetAddress& serverAddr,
const string& nameArg)
: loop_(CHECK_NOTNULL(loop)),
connector_(new Connector(loop, serverAddr)),//初始化一个Connector
name_(nameArg),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
retry_(false),
connect_(true),
nextConnId_(1)
{
//将Connector的的连接回调函数设置为TcpClient::newConnection
//该函数在Connector::handleWrite中被调用
connector_->setNewConnectionCallback(
std::bind(&TcpClient::newConnection, this, _1));
// FIXME setConnectFailedCallback
LOG_INFO << "TcpClient::TcpClient[" << name_
<< "] - connector " << get_pointer(connector_);
}
TcpClient::~TcpClient()
{
LOG_INFO << "TcpClient::~TcpClient[" << name_
<< "] - connector " << get_pointer(connector_);
TcpConnectionPtr conn;
bool unique = false;
{
MutexLockGuard lock(mutex_);
unique = connection_.unique();
conn = connection_;
}
if (conn)
{
assert(loop_ == conn->getLoop());
// FIXME: not 100% safe, if we are in different thread
CloseCallback cb = std::bind(&detail::removeConnection, loop_, _1);
loop_->runInLoop(
std::bind(&TcpConnection::setCloseCallback, conn, cb));
if (unique)
{
conn->forceClose();
}
}
else
{
connector_->stop();
// FIXME: HACK
loop_->runAfter(1, std::bind(&detail::removeConnector, connector_));
}
}
void TcpClient::connect()
{
// FIXME: check state
LOG_INFO << "TcpClient::connect[" << name_ << "] - connecting to "
<< connector_->serverAddress().toIpPort();
connect_ = true;
//调用Connector::start,发起连接
connector_->start();
}
void TcpClient::disconnect()
{
connect_ = false;
{
MutexLockGuard lock(mutex_);
if (connection_)
{
connection_->shutdown();
}
}
}
void TcpClient::stop()
{
connect_ = false;
connector_->stop();
}
//新连接的回调函数
//将新连接封装为TcpConnection交给TcpClient来管理
void TcpClient::newConnection(int sockfd)
{
loop_->assertInLoopThread();
InetAddress peerAddr(sockets::getPeerAddr(sockfd));
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
std::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
{
MutexLockGuard lock(mutex_);
//将新创建的TcpConnection赋给TcpClient类内成员connection_
connection_ = conn;
}
conn->connectEstablished();
}
void TcpClient::removeConnection(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
assert(loop_ == conn->getLoop());
{
MutexLockGuard lock(mutex_);
assert(connection_ == conn);
connection_.reset();
}
loop_->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
if (retry_ && connect_)
{
LOG_INFO << "TcpClient::connect[" << name_ << "] - Reconnecting to "
<< connector_->serverAddress().toIpPort();
connector_->restart();
}
}
TcpClient
在构造函数中设置Connector
的新连接到来时的回调函数为TcpClient::newConnection
, 该函数接收 connect 成功后的新 TCP 连接 socket, 使用该 socket new 一个 TcpConnection
并让TcpClient
中的connection_
成员指向他,
判断连接建立成功
Connector
connect 成功返回后, 调用Connector::connecting
, 注册了该 socket 的可写事件, 一个新 TCP 连接 socket, 刚建立好, 他是可写的, 这时就会调用可写回调Connector::handleWrite
, 在该函数中又会调用TcpClient
在构造函数中设置Connector
的新连接到来时的回调函数TcpClient::newConnection
, 该函数接收 connect 成功后的新 TCP 连接 socket, 使用该 socket new 一个 TcpConnection
并让TcpClient
中的connection_
成员指向他,
连接发生错误时, socket 是可读可写的, 需要用 getsockopt 检查是否出错, 这也是
Connector::handleWrite
中做的事