muduo网络库源码解析(1):多线程异步日志库(上)
muduo网络库源码解析(2):多线程异步日志库(中)
muduo网络库源码解析(3):多线程异步日志库(下)
muduo网络库源码解析(4):TimerQueue定时机制
muduo网络库源码解析(5):EventLoop,Channel与事件分发机制
muduo网络库源码解析(6):TcpServer与TcpConnection(上)
muduo网络库源码解析(7):TcpServer与TcpConnection(下)
muduo网络库源码解析(8):EventLoopThreadPool与EventLoopThread
muduo网络库源码解析(9):Connector与TcpClient
引言
上篇文章中分析了muduo日志库的基本流程与同步日志,这篇文章重点介绍异步日志部分,毕竟这才是在多线程编程中主流的方法.
这一篇中只分析一个类,即AsyncLogging,老规矩,首先看一看类的定义
class AsyncLogging : noncopyable
{
public:
AsyncLogging(const string& basename,
off_t rollSize,
int flushInterval = 3);
~AsyncLogging();
void append(const char* logline, int len);
void start();
void stop();
private:
void threadFunc();
typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
typedef BufferVector::value_type BufferPtr;
const int flushInterval_;
std::atomic<bool> running_;
const string basename_;
const off_t rollSize_;
muduo::Thread thread_;
muduo::CountDownLatch latch_;
muduo::MutexLock mutex_;
muduo::Condition cond_ GUARDED_BY(mutex_);
BufferPtr currentBuffer_ GUARDED_BY(mutex_);
BufferPtr nextBuffer_ GUARDED_BY(mutex_);
BufferVector buffers_ GUARDED_BY(mutex_);
};
我们一个一个来说 首先是数据成员
AsyncLogging::AsyncLogging(const string& basename,
off_t rollSize,
int flushInterval)
: flushInterval_(flushInterval), //刷新间隔 默认为三秒 注意一条消息的写入大概是1.2µs左右
running_(false), //用于线程终止
basename_(basename),//这两个解析见第一篇中logfile的分析
rollSize_(rollSize),
thread_(std::bind(&AsyncLogging::threadFunc, this), "Logging"), //线程实体
latch_(1), //这个我们一会重点说下
mutex_(),
cond_(mutex_),//用于写入这个条件的通知,多个生产者一个消费者
currentBuffer_(new Buffer),
nextBuffer_(new Buffer), //double buffers
buffers_() //buffer集合 用于写入磁盘
{
currentBuffer_->bzero();
nextBuffer_->bzero(); //memset
buffers_.reserve(16); //提前分配空间
}
void AsyncLogging::append(const char* logline, int len)
{
muduo::MutexLockGuard lock(mutex_);
if (currentBuffer_->avail() > len) //当前buffer可写长度大于写入长度
{
currentBuffer_->append(logline, len);
}
else
{
buffers_.push_back(std::move(currentBuffer_)); //将当前写入的buffer写入buffers
if (nextBuffer_) //nextbuffer存在的话补充
{
currentBuffer_ = std::move(nextBuffer_);
}
else
{
currentBuffer_.reset(new Buffer); // Rarely happens //不存在的话重新申请
}
currentBuffer_->append(logline, len); //经过上面的判断currentBuffer_保证存在
cond_.notify();//条件变量通知
}
}
本身这个函数没什么说的,但值得一提的是锁的粒度(锁保护数据量的多少),我们绝对有改变的余地,下面会讲到.我们先来看看核心函数threadFunc,这是异步线程执行的函数,
void AsyncLogging::threadFunc()
{
assert(running_ == true);
latch_.countDown();
LogFile output(basename_, rollSize_, false); //也就是说 我们只需要指定output和flush即可使用 即可通过一系列宏函数来使用了
BufferPtr newBuffer1(new Buffer);
BufferPtr newBuffer2(new Buffer);
newBuffer1->bzero();
newBuffer2->bzero();
BufferVector buffersToWrite;
buffersToWrite.reserve(16);
while (running_)
{
assert(newBuffer1 && newBuffer1->length() == 0);
assert(newBuffer2 && newBuffer2->length() == 0);
assert(buffersToWrite.empty());
{
muduo::MutexLockGuard lock(mutex_); //为了buffers不造成race condition
if (buffers_.empty()) // unusual usage!
{
cond_.waitForSeconds(flushInterval_);// 这里蕴含了两个条件 即超过刷新时间和前方写入一个buffer
}
buffers_.push_back(std::move(currentBuffer_));
currentBuffer_ = std::move(newBuffer1);
buffersToWrite.swap(buffers_); //指针交换 常数级 也可使用一个vector指针,交换更快
if (!nextBuffer_)
{
nextBuffer_ = std::move(newBuffer2);
}
}
assert(!buffersToWrite.empty()); //显然如果执行到这里无论如何buffersToWrite中也不会为空
if (buffersToWrite.size() > 25) //日志堆积 即生产者生产速度大于消费者消费速度 进行丢弃日志
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\n",
Timestamp::now().toFormattedString().c_str(),
buffersToWrite.size()-2);
fputs(buf, stderr);
output.append(buf, static_cast<int>(strlen(buf)));
buffersToWrite.erase(buffersToWrite.begin()+2, buffersToWrite.end());
//但是仍然保留前两个 避免浪费 因为后面还是要用
}
for (const auto& buffer : buffersToWrite)
{
// FIXME: use unbuffered stdio FILE ? or use ::writev ?
//在第一篇对LogFile的分析中有提到这个问题
output.append(buffer->data(), buffer->length());
}
//有意思的是为什么不判断小于2呢 想想看 小于而下面的判断中必然有一个是失败的 也必然会触发assert
if (buffersToWrite.size() > 2)
{
// drop non-bzero-ed buffers, avoid trashing
buffersToWrite.resize(2);//resize会默认初始化
}
if (!newBuffer1)
{
assert(!buffersToWrite.empty());
newBuffer1 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer1->reset();
}
if (!newBuffer2)
{
assert(!buffersToWrite.empty());
newBuffer2 = std::move(buffersToWrite.back());
buffersToWrite.pop_back();
newBuffer2->reset();
}
buffersToWrite.clear();
output.flush(); //刷新logfile中的输出 即存储回磁盘
}
output.flush(); //线程终止后转储磁盘
}
思考 CountDownLatch是否必要?
核心部分muduo书中已经讲的通透,我想说的只是一点,即CountDownLatch的使用.可能有的朋友不明白为什么我们只有一个异步线程的情况下要使用,我们都知道CountDownLatch可以维护一个happens-before的关系,即执行wait线程之后的语句一定发生在执行countDown线程的语句之后,这代表了什么呢,即一个指定同步的顺序,这里使用的CountDownLatch的作用就是使得在start结束之前进入threadFun函数.但依我拙见,可能并不需要CountDownLatch,两个线程之间通信的唯一桥梁是条件变量与锁,唯一的竞态条件就是如果不用CountDownLatch的话,可能使得在生产者在notify以后Threadfun异步线程没有收到这个信号,但是数据此时仍旧存在生产者线程中,异步线程执行时总会收到消息而把前面的消息写入,就算最极端的情况,生产者只写入了一条消息,仅notify了一次,而这个消息还被异步线程丢失了,由于异步线程中使用的是wait_for,无论如何也会在一个flushInterval_之后被写入,就算时间不足flushInterval_就程序退出,析构函数也会保护最后一次写入,所以也没有关系.综上,我认为在这种单异步线程的代码中,CountDownLatch是不必要的.希望有不同见解的朋友能提出宝贵的意见!
其实关于buffer的情况也很有意思,但我自认为写的一定没有书上详尽,所以那部分还是参考书上的解释为好.
再抛出一个问题,即锁的粒度问题,异步线程处理看来其实也就是一个生产者消费者的模型,那么如何修改的更加高性能呢,即使用更细粒度的锁,线程安全的hashmap.见第三篇.