muduo网络库源码解析(1):多线程异步日志库(上)
muduo网络库源码解析(2):多线程异步日志库(中)
muduo网络库源码解析(3):多线程异步日志库(下)
muduo网络库源码解析(4):TimerQueue定时机制
muduo网络库源码解析(5):EventLoop,Channel与事件分发机制
muduo网络库源码解析(6):TcpServer与TcpConnection(上)
muduo网络库源码解析(7):TcpServer与TcpConnection(下)
muduo网络库源码解析(8):EventLoopThreadPool与EventLoopThread
muduo网络库源码解析(9):Connector与TcpClient
引言
这个题目有一点点不符合文章内容.因为TcpServer其实已经在上一篇文章中说完了,同时上一篇文章中解析了TcpConnection的连接与断开,这一篇解析收发消息.至此TcpConnection也就解析完了.我们直入主题吧.先从收消息开始.
收消息的重点当然是handleread
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); //从套接字向inputbuffer中读数据
if (n > 0) //证明有数据 并不是断开连接
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); //执行数据到来时的回调
}
else if (n == 0)
{//断开连接 从map中删除
handleClose();//TcpConnection::handleClose
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}
这里面基本我们在上一篇中都已经说过,剩下一个readfd没有说,但这却是核心函数.
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
// saved an ioctl()/FIONREAD call to tell how much to read
char extrabuf[65536]; //额外的内存
struct iovec vec[2]; //辅助readv
const size_t writable = writableBytes(); //line_30
vec[0].iov_base = begin()+writerIndex_;
vec[0].iov_len = writable;
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
// when there is enough space in this buffer, don't read into extrabuf.
// when extrabuf is used, we read 128k-1 bytes at most.
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1; //在我看来这是为了处理最下面注释掉的那个功能
const ssize_t n = sockets::readv(fd, vec, iovcnt);
if (n < 0)
{
*savedErrno = errno;
}
else if (implicit_cast<size_t>(n) <= writable)
{
writerIndex_ += n; //写入大小小于可写大小 即栈上空间没有被写入数据.
}
else
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable); //如果栈上空间有数据的话就写入缓冲区中
}
// if (n == writable + sizeof extrabuf) 这种情况是为了防止缓冲区和栈上空间都写满的情况
// {
// goto line_30;
// }
return n;
}
这个函数很有意思,首先我们可以看到在栈上有一个64KB的空间,同时使用scatter式的read,即readv进行读取,这样的好处是减少系统调用,为什么要这样写呢,原因是muduo中的buffer并不是固定大小的,而是vector< char>类型的,原因是为了防止多个连接分别维护两个buffer(inputbuffer,outputbuffer)****,buffer中有大量没有被用到的内存,所以选择了vector< char>,初始大小为1024+8,后面可随流量大小自动扩容,这是一个非常亮眼的点.我们可以看到最后还有一个被注释掉的功能,就是处理栈上的buffer也满了,逻辑也是比较好理解的.再看看iovcnt的作用,我猜测使用的原因是当出现buffer满了以后会触发(writable < sizeof extrabuf)条件,iovcnt会被设置为1,这个时候重写一个函数应该是个更好的选择,但作者并没有写,不确定为什么.不过有一说一,64KB已经很大了,一般情况下并不会超过这个值,正如muduo书上P315页下面注释所言,一次收到的数据确实在一定意义下可视为带宽延迟积,带宽延迟积(BandWidth-Dealy Product BDP) 即理想吞吐量
理想吞吐量(bit) =路径带宽(bit/秒为单位) *往返时间( RTT ) (以秒为单位),这也就是TCP的理想窗口大小(TCP拥塞控制)
我们再来看看发送数据,
void TcpConnection::send(const StringPiece& message)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread())
{
sendInLoop(message);
}
else
{
void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
loop_->runInLoop(
std::bind(fp,
this, // FIXME
message.as_string()));
//std::forward<string>(message)));
}
}
}
void TcpConnection::sendInLoop(const StringPiece& message)
{
sendInLoop(message.data(), message.size());
}
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0; //已写入大小
size_t remaining = len; //剩余大小
bool faultError = false; //是否出现错误
if (state_ == kDisconnected) //正常的状态记录 显然如果已经当前连接是kDisconnected 不应该继续写
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0) //写入正常
{
remaining = len - nwrote;
//如果已写完且writeCompleteCallback_回调被注册就进行调用
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));//延长对象生命周期
}
}
else // nwrote < 0 //写入失败
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{//服务端收到RST报文后 write出现EPIPE recv出现ECONNRESET
faultError = true; //出现错误
}
}
}
}
assert(remaining <= len);
if (!faultError && remaining > 0) //出现窗口大小不够而写入失败的原因 且剩余未写入大小大于零
{
size_t oldLen = outputBuffer_.readableBytes(); //已写大小
if (oldLen + remaining >= highWaterMark_ //如果数据再写的话会超过高水位线,防止本地数据太多
&& oldLen < highWaterMark_
&& highWaterMarkCallback_) //高水位回调存在的话
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting()) //很大可能开始为可读事件,写入失败后要注册可写事件
{
channel_->enableWriting();
}
}
}
这里我们可以看到对于一种特殊情况的处理,即因为TCP窗口不够导致write写入失败的情况,这是TCP流量控制的一种手法,处理的方法也很简单,即注册可读事件就好,我们可以在最后看到判断当前事件是否为可写事件,不是的话设置为可写事件,为什么不在开始时都设置为可写呢?原因是muduo默认为LT,开始注册可写会导致只要缓冲区还有空余就会触发可写事件,从效率上来说是不可取的.还有两个有意思的回调,即writeCompleteCallback_,highWaterMarkCallback_,即写完成回调与高水位线回调,为什么要设置这两个回调呢,第一个可以在写入操作完成是打上日志,在出现问题时日志方便定位.至于第二个,即高水位线回调,我们可以在构造函数中看到highWaterMark_的初始值为64x1024x1024,即64MB,如果写入失败且要写64MB的数据的话,也许有理由怀疑是这个连接导致窗口减小的问题,就会执行提前设置highWaterMarkCallback_,但这两个回调实际均未初始化.
总结
这一篇东西并不多,但却是拳拳到肉,十分的重要,在网络编程中数据的收发一定是一个及其重要的点,值得我们拿出足够的精力去处理.C++程序员是可怜的,到了2020年标准库中才加入了网络库,还是转正的Asio,但是聊胜于无,其实这些如果只专注与业务代码的处理的话不必考虑这些,只需要写一个回调就可以了.但是我认为这是重要的,学这些其实是培养一种分析的能力,并可以把网络相关的知识联系进来,把系统编程的知识运用进来,这当然是极好的.最重要的是网络库的结构决定了其适用的时机,一招鲜是无法吃遍天的,这也更给了我们去学习这些的理由.
参考:
https://www.cnblogs.com/kzd666/p/9437764.html scatter/gather接口
https://www.jianshu.com/p/932e3dd7dea3 带宽延迟积
EPIPE和ECONNRESET