libco源码解析(1) 协程运行与基本结构
libco源码解析(2) 创建协程,co_create
libco源码解析(3) 协程执行,co_resume
libco源码解析(4) 协程切换,coctx_make与coctx_swap
libco源码解析(5) poll
libco源码解析(6) co_eventloop
libco源码解析(7) read,write与条件变量
libco源码解析(8) hook机制探究
libco源码解析(9) closure实现
引言
这篇文章我们来看一看如何创建一个协程的实例,即co_create
函数的具体实现。
正文
我们在上一篇文章中说过libco创建协程的接口为了程序员可以更快的接收,采取了类似于线程的创建方法,我们看看co_create
的函数定义
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
ppco
是协程的主体结构,存储着一个协程所有的信息。attr
其实和线程一样,是我们希望创建的协程的一些属性,不过libco中这个参数简单一点,只是标记了栈的大小和是否使用共享栈。pfn
是我们希望协程执行的函数,当然实际执行的是一个封装后的函数,后面我们会看到。arg
没什么说的,是传入函数的参数。
搞清楚了各个参数的意义,接下来我们就来看看co_create的函数实现吧!
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
if( !co_get_curr_thread_env() ) // 是一个线程私有的变量
{
co_init_curr_thread_env();
}
stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
*ppco = co;
return 0;
}
我们可以看到函数逻辑其实非常简单,只有区区七行代码,调用了三个不同的函数而已。
co_get_curr_thread_env
我们先来看看co_get_curr_thread_env
函数的实现;
static __thread stCoRoutineEnv_t* gCoEnvPerThread = NULL;
...........
stCoRoutineEnv_t *co_get_curr_thread_env()
{
return gCoEnvPerThread;
}
可以看到其实就是返回一个线程私有的变量,不懂__thread关键字的同学可以自行了解一下,这里为什么不使用C++版的thread_local呢?我对这个问题的看法是这样的,传送门
这个函数其实也就是在每个线程的第一个协程被创建的时候去初始化gCoEnvPerThread
。那么如何初始化呢,函数逻辑为co_init_curr_thread_env
co_init_curr_thread_env
void co_init_curr_thread_env()
{
gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) );
stCoRoutineEnv_t *env = gCoEnvPerThread;
env->iCallStackSize = 0; // 修改"调用栈"顶指针
struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
self->cIsMain = 1; // 一个线程调用这个函数的肯定是主协程喽
env->pending_co = NULL;
env->occupy_co = NULL;
coctx_init( &self->ctx ); // 能跑到这里一定是main,所以清空上下文
env->pCallStack[ env->iCallStackSize++ ] = self; // 放入线程独有环境中
stCoEpoll_t *ev = AllocEpoll();
SetEpoll( env,ev );
}
首先为gCoEnvPerThread分配一份内存,这本没什么说的,但是这也显示出libco其实整体是一个偏C的写法。硬要说它是C++的话。可能就是用了一些STL吧,还有一点,就是为什么不使用RAII去管理内存,而是使用原始的手动管理内存?我的想法是为了提升效率,库本身并没有什么预期之外操作会出现,所以不存在运行到一半throw了(当然整个libco也没有throw),手动管理内存完全是可以的,只不过比较麻烦罢了,不过确实省去了智能指针的开销。
后面调用了co_create_env
创建了一个stCoRoutine_t
类型的结构,我们前面说过stCoRoutine_t其实是一个协程的实体,存储着协程的所有信息,这里创建的了一个协程是为什么呢?仔细一想再结合着后面的IsMain就非常明显了,这个结构是主协程,因为co_init_curr_thread_env在一个线程内只会被调用一次,那么调用这个函数的线程理所当然就是主协程喽。co_create_env
我们后面再说。
创建协程的下面四句其实都是一些内部成员的初始化,第五句其实是有些意思的,把self付给了pCallStack,并自增iCallStackSize,我们前面说过pCallStack其实是一个调用栈的结构,那么这个调用栈的第一个肯定是主协程,第0个元素是self,然后iCallStackSize增为1,等待主协程调用其他协程的时候放入调用栈。
然后就是对于env中epoll封装结构的初始化,我们来看看AllocEpoll
和epoll的封装结构:
struct stCoEpoll_t
{
int iEpollFd; // epollfd
static const int _EPOLL_SIZE = 1024 * 10; // 一次 epoll_wait 最多返回的就绪事件个数
struct stTimeout_t *pTimeout; // 单轮时间轮
struct stTimeoutItemLink_t *pstTimeoutList; // 链表用于临时存放超时事件的item
struct stTimeoutItemLink_t *pstActiveList; // 该链表用于存放epoll_wait得到的就绪事件和定时器超时事件
// 对 epoll_wait() 第二个参数的封装,即一次 epoll_wait 得到的结果集
co_epoll_res *result;
};
stCoEpoll_t *AllocEpoll()
{
stCoEpoll_t *ctx = (stCoEpoll_t*)calloc( 1,sizeof(stCoEpoll_t) );
ctx->iEpollFd = co_epoll_create( stCoEpoll_t::_EPOLL_SIZE );
ctx->pTimeout = AllocTimeout( 60 * 1000 );
ctx->pstActiveList = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) );
ctx->pstTimeoutList = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) );
return ctx;
}
这里有一点值得一提,就是时间轮这个结构,我们先来看看它的结构:
/*
* 毫秒级的超时管理器
* 使用时间轮实现
* 但是是有限制的,最长超时时间不可以超过iItemSize毫秒
*/
struct stTimeout_t
{
/*
时间轮
超时事件数组,总长度为iItemSize,每一项代表1毫秒,为一个链表,代表这个时间所超时的事件。
这个数组在使用的过程中,会使用取模的方式,把它当做一个循环数组来使用,虽然并不是用循环链表来实现的
*/
stTimeoutItemLink_t *pItems;
int iItemSize; // 数组长度
unsigned long long ullStart; // 时间轮第一次使用的时间
long long llStartIdx; // 目前正在使用的下标
};
struct stTimeout_t
{
stTimeoutItemLink_t *pItems;
int iItemSize; // 数组长度
unsigned long long ullStart; // 时间轮第一次使用的时间
long long llStartIdx; // 目前正在使用的下标
};
极其疑惑,就一个链表,它什么就叫时间轮了?注释中已经很清楚了,在这里的时候我想到了以前对时间轮的思考,这里到底是单轮时间轮效率高,还是多轮时间轮效率高呢?我想这个问题没有什么意义,因为对于时间轮的选择取决于事件的超时时间。不给出场景讨论效率就是耍流氓。一般来说单轮时间轮复杂度降低的时候超时时间大于时间轮长度的时候需要取余放入,导致每次从时间轮取出的时候都会有一些无效的遍历,libco在超时时间大于时间轮长度的时候就直接拒绝了。而多轮时间轮因为其特性很难出现超时时间大于时间轮长度,所有就没有了无效遍历,但是需要一些拷贝。想要深入了解多轮时间轮的朋友可以继续深入学习,但最好不要拿我那篇博客。。因为当时写的时候不知道写文章的时候在想什么,文字非常的简洁,现在看来根本没有讲清楚问题,且代码没写注释,光顾自己写的嗨了。不过那个封装好的利用多轮时间轮去除不活跃连接的代码倒是可以用,我在我的一个项目上就用了,没有什么问题,就是接口有点不太好用。
co_create_env
/**
* @env 环境变量
* @attr 协程信息
* @pfn 函数指针
* @arg 函数参数
*/
struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
pfn_co_routine_t pfn,void *arg )
{
stCoRoutineAttr_t at;
// 如果指定了attr的话就执行拷贝
if( attr )
{
memcpy( &at,attr,sizeof(at) );
}
// stack_size 有效区间为[0, 1024 * 1024 * 8]
if( at.stack_size <= 0 )
{
at.stack_size = 128 * 1024;
}
else if( at.stack_size > 1024 * 1024 * 8 )
{
at.stack_size = 1024 * 1024 * 8;
}
// 4KB对齐,也就是说如果对stacksize取余不为零的时候对齐为4KB
// 例如本来5KB,经过了这里就变为8KB了
if( at.stack_size & 0xFFF )
{
at.stack_size &= ~0xFFF;
at.stack_size += 0x1000;
}
// 为协程分配空间
stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
memset( lp,0,(long)(sizeof(stCoRoutine_t)));
lp->env = env;
lp->pfn = pfn;
lp->arg = arg;
stStackMem_t* stack_mem = NULL;
if( at.share_stack ) // 共享栈模式 栈需要自己指定
{ // 共享栈相关,下一篇文章会说
stack_mem = co_get_stackmem( at.share_stack);
at.stack_size = at.share_stack->stack_size;
}
else // 每个协程有一个私有的栈
{
stack_mem = co_alloc_stackmem(at.stack_size);
}
lp->stack_mem = stack_mem;
lp->ctx.ss_sp = stack_mem->stack_buffer; // 这个协程栈的基址
lp->ctx.ss_size = at.stack_size;// 未使用大小,与前者相加为esp指针,见coctx_make解释
lp->cStart = 0;
lp->cEnd = 0;
lp->cIsMain = 0;
lp->cEnableSysHook = 0;
lp->cIsShareStack = at.share_stack != NULL;
lp->save_size = 0;
lp->save_buffer = NULL;
return lp;
}
这里有一点需要说,就是ss_size
其实是未使用的大小,为什么要记录未使用大小呢?我们思考一个问题,这个栈其实是要把基址付给寄存器的,而系统栈中指针由高地址向低地址移动,而我们分配的堆内存实际上低地址是起始地址,这里是把从线程分配的堆内存当做协程的栈,所以esp其实是指向这片堆地址的最末尾的,所以记录未使用大小,使得基址加上未使用大小就是esp。简单用简笔画描述一下:
|------------|
| esp |
|------------|
| ss_size |
|------------|
|stack_buffer|
|------------|
到了这里,一个还没有运行的协程实体就被创建好啦!