libco源码解析(1) 协程运行与基本结构
libco源码解析(2) 创建协程,co_create
libco源码解析(3) 协程执行,co_resume
libco源码解析(4) 协程切换,coctx_make与coctx_swap
libco源码解析(5) poll
libco源码解析(6) co_eventloop
libco源码解析(7) read,write与条件变量
libco源码解析(8) hook机制探究
libco源码解析(9) closure实现
引言
我们总能在运行libco协程代码的最后看到对于函数co_eventloop
的调用,它可以理解为主协程执行的函数。我们举一个简单的例子来说明它的作用:
void* routinefun(void* args){
co_enable_hook_sys();
while(true){
poll(NULL, 0, 1000);
}
return 0;
}
int main(int argc,char *argv[])
{
vector<task_t> v;
for(int i=1;i<argc;i+=2)
{
task_t task = { 0 };
SetAddr( argv[i],atoi(argv[i+1]),task.addr );
v.push_back( task );
}
for(int i=0;i<2;i++)
{
stCoRoutine_t *co = 0;
co_create( &co,NULL,routinefun,v2 );
printf("routine i %d\n",i);
co_resume( co );
}
co_eventloop( co_get_epoll_ct(),0,0 );
return 0;
}
这段代码非常简单,主协程运行两个协程,协程函数所做的事情就是使用poll切换执行权,并在一秒后切换回来(超时)。这里线程的执行过程是这样的,我们把主协程看做A,其他两个协程看做BC。执行过程为:
- B协程执行,使用poll把一个
stPoll_t
结构插入时间轮,切换执行权,回到A协程。 - C协程执行,使用poll把一个
stPoll_t
结构插入时间轮,切换执行权,回到A协程。 - 此时A协程执行Eventloop中,不停的循环,直到B协程注册的事件超时,调用回调回到B协程。
- B协程继续执行,再次使用poll,重复第一步,回到A协程。
- A协程继续执行Eventloop,不停的循环,直到C协程注册的事件超时,调用回调回到C协程。
- C协程继续执行,再次使用poll,重复第二步,回到A协程。
- …
这样我们就可以看清楚co_eventloop
到底做了什么,其实就是不停的轮询等待其他协程注册的事件成立,仅此而已。
co_eventloop
/*
* libco的核心调度
* 在此处调度三种事件:
* 1. 被hook的io事件,该io事件是通过co_poll_inner注册进来的
* 2. 超时事件
* 3. 用户主动使用poll的事件
* 所以,如果用户用到了三种事件,必须得配合使用co_eventloop
*
* @param ctx epoll管理器
* @param pfn 每轮事件循环的最后会调用该函数
* @param arg pfn的参数
*/
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
if( !ctx->result ) // 给结果集分配空间
{ // epoll结果集大小
ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
}
co_epoll_res *result = ctx->result;
for(;;)
{
// 最大超时时间设置为 1 ms
// 所以最长1ms,epoll_wait就会被唤醒
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
// 不使用局部变量的原因是epoll循环并不是元素的唯一来源.例如条件变量相关(co_routine.cpp stCoCondItem_t)
stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
memset( timeout,0,sizeof(stTimeoutItemLink_t) );
// 获取在co_poll_inner放入epoll_event中的stTimeoutItem_t结构体
for(int i=0;i<ret;i++)
{
stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
if( item->pfnPrepare ) // 如果用户设置预处理回调的话就执行
{
// 若是hook后的poll的话,会把此事件加入到active队列中,并更新一些状态
item->pfnPrepare( item,result->events[i],active );
}
else
{
AddTail( active,item );
}
}
// 从时间轮上取出超时事件
unsigned long long now = GetTickMS();
// 以当前时间为超时截止点
// 从时间轮中取出超时的时间放入到timeout中
TakeAllTimeout( ctx->pTimeout,now,timeout );
stTimeoutItem_t *lp = timeout->head;
while( lp ) // 遍历超时链表,设置超时标志,并加入active链表
{
//printf("raise timeout %p\n",lp);
lp->bTimeout = true;
lp = lp->pNext;
}
// 把timeout合并到active中
Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );
lp = active->head;
// 开始遍历active链表
while( lp )
{
// 在链表不为空的时候删除active的第一个元素 如果删除成功,那个元素就是lp
PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
if (lp->bTimeout && now < lp->ullExpireTime)
{ // 一种排错机制,在超时和所等待的时间内已经完成只有一个条件满足才是正确的
int ret = AddTimeout(ctx->pTimeout, lp, now);
if (!ret) //插入成功
{
lp->bTimeout = false;
lp = active->head;
continue;
}
}
// TODO 有问题,如果同一个协程有两个事件在一次epoll循环中触发,
// 那么第一个事件切回去执行协程,第二个呢,已提交issue
if( lp->pfnProcess )
{ // 默认为OnPollProcessEvent 会切换协程
lp->pfnProcess( lp );
}
lp = active->head;
}
// 每次事件循环结束以后执行该函数, 用于终止协程
if( pfn )
{
if( -1 == pfn( arg ) )
{
break;
}
}
}
}
首先我们可以看到active
和timeout
链表都在stCoEpoll_t中存储,而这个结构是线程私有的。那么为什么不把这个值设置成局部变量呢?答案不在co_eventloop中,而藏在其他函数,比如libco实现的条件变量中,条件变量会在signal后把值放入到active链表或者timeout链表,而这些只能放在stCoEpoll_t中。
还有这里的timeout链表其实最终会合并到active中,先分开纯粹是为了处理方便一点。
然后就是把事件从epoll结果集中拿出来,去执行预处理回调。我们来看看预处理回调,我们曾在poll中提到过:
void OnPollPreparePfn( stTimeoutItem_t * ap,struct epoll_event &e,stTimeoutItemLink_t *active )
{
stPollItem_t *lp = (stPollItem_t *)ap;
// 把epoll此次触发的事件转换成poll中的事件
lp->pSelf->revents = EpollEvent2Poll( e.events );
stPoll_t *pPoll = lp->pPoll;
// 已经触发的事件数加一
pPoll->iRaiseCnt++;
// 若此事件还未被触发过
if( !pPoll->iAllEventDetach )
{
// 设置已经被触发的标志
pPoll->iAllEventDetach = 1;
// 将该事件从时间轮中移除
// 因为事件已经触发了,肯定不能再超时了
RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( pPoll );
// 将该事件添加到active列表中
AddTail( active,pPoll );
}
}
我们可以看到其实所做的事情就是把epoll事件对应的stPoll_t
结构中的值执行一些修改,并把此项插入到active链表中。
然后就是从时间轮中取出根据目前时间来说已经超时的事件,并插入到timeout链表中:
inline void TakeAllTimeout( stTimeout_t *apTimeout,unsigned long long allNow,stTimeoutItemLink_t *apResult )
{
// 第一次调用是设置初始时间
if( apTimeout->ullStart == 0 )
{
apTimeout->ullStart = allNow;
apTimeout->llStartIdx = 0;
}
// 当前时间小于初始时间显然是有问题的
if( allNow < apTimeout->ullStart )
{
return ;
}
// 求一个取出事件的有效区间
int cnt = allNow - apTimeout->ullStart + 1;
if( cnt > apTimeout->iItemSize )
{
cnt = apTimeout->iItemSize;
}
if( cnt < 0 )
{
return;
}
for( int i = 0;i<cnt;i++)
{ // 把上面求的有效区间过一遍,某一项存在数据的话插入到超时链表中
int idx = ( apTimeout->llStartIdx + i) % apTimeout->iItemSize;
// 链表操作,没什么可说的
Join<stTimeoutItem_t,stTimeoutItemLink_t>( apResult,apTimeout->pItems + idx );
}
// 更新时间轮属性
apTimeout->ullStart = allNow;
apTimeout->llStartIdx += cnt - 1;
}
然后就是把超时链表处理以后加入到active链表啦,不得不说这些封装的链表操作还是非常实用的。
然后就是遍历active链表,一一执行每一个事件的回调啦,当然没执行一次回调就意味着一次协程的切换,因为我们在poll中注册的回调执行co_resume。
这里其实我个人认为是有一些问题的。如果我们在poll中注册了两个fd的事件,这两个时间在一次epoll_wait中被触发,那么第一个被执行了,第二个呢?如果再执行的话就会core dump,因为这个上下文已经被用过了。这里我们应该做一个哈希表,给每一个协程一个特定编号,在遍历active时如果某一个协程已经被使用,我们在后面的遍历过程不再调用回调,这样就可以避免这个问题,已经提交issume。
我们注意到循环的最后调用了pfn,这是一个我们在调用co_eventloop时传入的函数指针,它的作用是什么呢?我的想法是跳出Eventloop循环,因为不是所有的协程使用都想例子一样把 co_eventloop放在函数最后,协程更多的是嵌到代码中,我们需要在有些时候终止eventloop,传入一个终止回调就是一个不错的方法。