前言
你以为我鸽了其实我没有鸽,这也算是一种鸽。
继续来填坑啦。
在上两篇中,我们都是使用的链表进行保存定时事件,当我们需要增加一个或者删除一个事件时都需要O(n)的时间复杂度,本篇我们通过时间轮(time wheel)这种数据结构来对其进行优化,而libco也是通过时间轮来进行处理的,所以就拿着它的代码来讲啦。
正文
Libco的作为一个协程库,相当于在用户态完成了逻辑流的切换,这里的调度便是一旦遇到阻塞的系统调用(如read)时,将其注册到epoll_wait中并切换逻辑流,等待其I/O事件的到达,一旦到达则进行处理,将同步阻塞I/O换成了I/O多路复用。
而这里便是将I/O事件当作定时事件来处理,将I/O事件设置超时事件,如果超时则直接处理,避免一直等待的情况。
libco管理定时事件便是使用时间轮这种数据结构,通过一种hash的思想使得添加定时事件的时间复杂度降到O(1),大大提高了效率。
我们先来看看时间轮是怎样的东西。
时间轮是个啥
在之前我们通过链表,按照超时时间进行升序或者降序的排列,这样添加事件就需要O(N)的时间复杂度。
而时间轮则将多条链表组合起来,每条链表上的事件都是同样的超时时间,而两条链表超时时间的差值t就是处理超时事件的时间间隔。时间轮内部有一个指针指向当前的链表,t时间过去,t指向下一个链表,判断是否超时。
而当我们想要添加一个定时事件,只需要知道它的超时时间,再除以t,就是它应该插入的位置。
如图,当前指向1号链表,t为50ms,当需要添加一个定时为100ms的定时事件时,直接添加到3号链表即可(O(1))。
(图转自https://www.ibm.com/developerworks/cn/linux/l-cn-timers/index.html)
libco的主循环分析
让我们看看这里的主循环,为了思路清晰,删除部分无关代码
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
co_epoll_res *result = ctx->result;
for(;;)
{
/*在之前的博客中,为了达到定时查看的效果,我们使用epoll_wait的超时参数或者定时信号,而这里则是让epoll_wait以一个非常短的间隙(1ms)返回*/
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
memset( timeout,0,sizeof(stTimeoutItemLink_t) );
for(int i=0;i<ret;i++)
{
stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
if( item->pfnPrepare )//如果有预处理函数则调用预处理函数
{
item->pfnPrepare( item,result->events[i],active );
}
else
{
AddTail( active,item );//否则添加到active链,准备下一步处理
}
}
unsigned long long now = GetTickMS();//获取现在的时间,这里我们下文会叙述。
TakeAllTimeout( ctx->pTimeout,now,timeout );//将时间轮上的超时事件取出,并且让时间轮向前滚动
stTimeoutItem_t *lp = timeout->head;
while( lp )
{
//printf("raise timeout %p\n",lp);
lp->bTimeout = true;
lp = lp->pNext;
}
Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );
lp = active->head;
while( lp )
{
PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
if( lp->pfnProcess )
{
lp->pfnProcess( lp );//处理active链上的事件
}
lp = active->head;
}
}
}
可以看到这个eventlopp和我之前几篇博客的思路差不多,都是:
epoll_wait监听–>等待事件–>处理I/O事件–>得到现在时间,判断是否超时–>处理超时事件。
获取现在的时间
这里比较有趣的是GetTickMS,这个用于获取现在时间的函数,
static unsigned long long GetTickMS()
{
#if defined( __LIBCO_RDTSCP__)
static uint32_t khz = getCpuKhz();//法1
return counter() / khz;
#else
struct timeval now = { 0 };
gettimeofday( &now,NULL );//法2 使用gettimeofday
unsigned long long u = now.tv_sec;
u *= 1000;
u += now.tv_usec / 1000;
return u;
#endif
}
gettimeofday自然不用多说,它的好处是跨平台,不用切换到内核态。
而上面的法1使用的函数如下
#if defined( __LIBCO_RDTSCP__)
static unsigned long long counter(void)
{
register uint32_t lo, hi;
register unsigned long long o;
//__asm__ 内嵌汇编代码 __volatile__阻止编译器优化
__asm__ __volatile__ (
"rdtscp" : "=a"(lo), "=d"(hi)
);//eax寄存器的值赋给lo,edx赋给hi
o = hi;//o为64位,将hi先放在低32位
o <<= 32;//移到高位
return (o | lo);//将lo放在低32位return
}
static unsigned long long getCpuKhz()
{
FILE *fp = fopen("/proc/cpuinfo","r");
if(!fp) return 1;
char buf[4096] = {0};
fread(buf,1,sizeof(buf),fp);
fclose(fp);
char *lp = strstr(buf,"cpu MHz");
if(!lp) return 1;
lp += strlen("cpu MHz");
while(*lp == ' ' || *lp == '\t' || *lp == ':')
{
++lp;
}
double mhz = atof(lp);
unsigned long long u = (unsigned long long)(mhz * 1000);
return u;
}
#endif
如果你看不懂getCpuKhz这个函数,可以打开/proc/cpuinfo看一眼,就可以知道这里记载的是cpu的动态信息。
而counter函数则主要是调用rdtscp这条汇编指令,将计数(来一个时钟脉冲+1)读出来。
我的理解是counter()将总共的时钟脉冲数读出再除以cpu的频率(每秒时钟脉冲)就是时间,但是cpu的频率不是个恒定值啊,对于现代cpu来说。。。所以不是很明白这里为什么要这样计时,希望能有人给出解答
参考资料
C++开源协程库libco-原理与应用 — 滴滴平台技术部·王亮
__asm__ __volatile__
的含义 —– stackoverflow
Linux 下定时器的实现方式分析(时间轮部分) — 赵军
Understanding Processor Frequency part of cat /proc/cpuinfo
再论 Time stamp counter —– 一念天堂的博客
如何精确测量一段代码的执行时间 —– 浅墨的部落格