文章目录
Linux下内存布局
协程切换的原理
协程栈
共享栈下文介绍,此处先介绍非共享栈。
在非共享栈模式下,每个非主协程有自己的栈,而该栈 是在堆上分配的,并不是系统栈
,但主协程的栈仍然是系统栈
。
struct stCoRoutine_t *co_create_env(stCoRoutineEnv_t *env,
const stCoRoutineAttr_t *attr,//设置一些参数,比如:是否是共享栈之类的
pfn_co_routine_t pfn,
void *arg)
{
stCoRoutineAttr_t at;
if (attr)
{
memcpy(&at, attr, sizeof(at));
}
if (at.stack_size <= 0)
{
at.stack_size = 128 * 1024;
}
else if (at.stack_size > 1024 * 1024 * 8)
{
at.stack_size = 1024 * 1024 * 8;
}
if (at.stack_size & 0xFFF)
{
at.stack_size &= ~0xFFF;
at.stack_size += 0x1000;
}
stCoRoutine_t *lp = (stCoRoutine_t *)malloc(sizeof(stCoRoutine_t));
memset(lp, 0, (long)(sizeof(stCoRoutine_t)));
lp->env = env;
lp->pfn = pfn;
lp->arg = arg;
stStackMem_t *stack_mem = NULL;
if (at.share_stack)
{
stack_mem = co_get_stackmem(at.share_stack);
at.stack_size = at.share_stack->stack_size;
}
else
{
stack_mem = co_alloc_stackmem(at.stack_size); //申请128KB
}
lp->stack_mem = stack_mem;
lp->ctx.ss_sp = stack_mem->stack_buffer;
lp->ctx.ss_size = at.stack_size;
}
libco 在 stCoRoutineEnv_t 定义了 pCallStack 数组,大小为128,数组里的每个元素均为协程。pCallStack用于获取当前协程pCallStack[iCallStackSize - 1]
;当前协程挂起后应该切到的协程pCallStack[iCallStackSize - 2]
。
pCallStack存的是递归调用(暂且称之为递归,并不是递 归)的协程
,pCallStack[0]一定是主协程
。例如主协程调用协程1,协程1调用协程2…协程k-1 调用协程k,这种递归关系的k最大为127,调到协程127时,此时pCallStack[0]存主协程, pCallStack[1]存协程1...pCallStack[k]存协程k..pCallStack[127]存协程127
。
但递归如此之深的协程实际中不会遇到,更多的场景应该是主协程调用协程1,协程1挂起切回主协程,主协程再 调用协程2,协程2挂起切回主协程,主协程再调用协程3…因此主协程调到协程k时, pCallStack[0]是主协程,pCallStack[1]是协程k,其他元素为空;协程k挂起切回主协程时, pCallStack[0]是主协程,其他元素为空。因此128大小的pCallStack足够上万甚至更多协程使用。
共享栈
示例代码可参看 example_copystack.cpp。共享栈对主协程没有影响,共享栈仍然是在 堆上,而主协程的栈在系统栈上。
采用共享栈时,每个协程的栈从共享栈拷出时,需要分配空间存储,但按需分配空间。因为绝 大部分协程的栈空间都远低于128K,因此拷出时只需分配很小的空间,相比私有栈能节省大 量内存。共享栈可以只开一个,但为了避免频繁换入换出,一般开多个共享栈。每个共享栈可 以申请大空间,降低栈溢出的风险。
用时间换空间
:假设开10个共享栈,每个协程模10映射到对应的共享栈。假设协程调用顺序为主协程、协程 2、协程3、协程12。协程2切到协程3时,因为协程2、3使用的共享栈分别是第2、3个共享 栈,没有冲突,所以协程2的栈内容仍然保留在第2个共享栈,并不拷出来,但协程2的寄存器 会被coctx_swap保存在regs数组。调用到协程12时,协程12和协程2都映射到第2个共享栈, 因此需要将协程2的栈内容拷出,并将协程12的栈内容拷贝到第2个共享栈中。所以共享栈多了 拷出协程2的栈、拷进协程12的栈两个操作,即用拷贝共享栈的时间换取每个协程栈的空间。
协程的优势
这里对协程的优势做一个总结:
- 1.
不用陷入内核
,OS对协程一无所知。 - 2.
占用资源相比线程,进程很少
,只有少量的自己的寄存器上下文和栈 - 3.完全由应用程序自己进行调度,
调度执行流程可控
。 - 4.
切换代价很小
。 - 5.
在接近异步效率的同时,可以使用同步的写法
(仅仅是 同步的写法,不是同步调用)。例如read函数的调用代码后,紧接着可以写处理数据的逻辑, 不用再定义回调函数。调用read后协程挂起,其他协程被调用,数据就绪后在read后面处理数 据。
对于第五条这里加个餐:
同步调用,异步调用和协程调用
同步异步见:阻塞-非阻塞-异步-同步-的理解
以read
来举例:
-
read
的两个阶段:- 1.等待数据;
- 2.将数据从kernel拷贝到用户线程
-
同步调用如何做:如果是同步阻塞,那么就两个阶段都阻塞。
-
异步调用如何做:
两个阶段都不阻塞。
效率最高。但是异步的调用逻辑和回调逻辑需要分开
,在异步调用多时,代码结构不清晰
-
协程又是如何处理的:只阻塞第二个阶段。但因为第二阶段读数据耗时很少,因此效率略低于异步调用。者也就是它最大的优点:
在接近异步效率的同时,可以使用同步的写法
。
协程的Hook层
钩子函数read
ssize_t read( int fd, void *buf, size_t nbyte )
{
HOOK_SYS_FUNC( read );
if( !co_is_enable_sys_hook() )
{
return g_sys_read_func( fd,buf,nbyte );
}
rpchook_t *lp = get_by_fd( fd );
if( !lp || ( O_NONBLOCK & lp->user_flag ) )
{
ssize_t ret = g_sys_read_func( fd,buf,nbyte );
return ret;
}
int timeout = ( lp->read_timeout.tv_sec * 1000 )
+ ( lp->read_timeout.tv_usec / 1000 );
struct pollfd pf = { 0 };
pf.fd = fd;
pf.events = ( POLLIN | POLLERR | POLLHUP );
int pollret = poll( &pf,1,timeout );
ssize_t readret = g_sys_read_func( fd,(char*)buf ,nbyte );
if( readret < 0 )
{
co_log_err(
"CO_ERR: read fd %d ret %ld errno %d poll ret %d timeout %d",
fd,readret,errno,pollret,timeout
);
}
return readret;
}
这时read
就分为了三种情况:
- 1.未开启hook,直接调用系统 read;
- 2.未开启hook,但用户指定了O_NONBLOCK,也直接调用系统read,此时是非阻塞的
- 3.用户开启了hook,libco 悄悄的设置O_NONBLOCK,user_flag 表示用户有没有设置O_NONBLOCK
然后调用int pollret = poll( &pf,1,timeout );
将当前协程挂起,直到有数据可读或者超时,协程才会重新调度,自然,如果没有开启Hook,那么就不存在协程的切换。
钩子函数write
ssize_t write( int fd, const void *buf, size_t nbyte )
{
HOOK_SYS_FUNC( write );
if( !co_is_enable_sys_hook() )
{
return g_sys_write_func( fd,buf,nbyte );
}
rpchook_t *lp = get_by_fd( fd );
if( !lp || ( O_NONBLOCK & lp->user_flag ) )
{
ssize_t ret = g_sys_write_func( fd,buf,nbyte );
return ret;
}
size_t wrotelen = 0;
int timeout = ( lp->write_timeout.tv_sec * 1000 )
+ ( lp->write_timeout.tv_usec / 1000 );
ssize_t writeret = g_sys_write_func( fd,(const char*)buf + wrotelen,nbyte - wrotelen );
if (writeret == 0)
{
return writeret;
}
if( writeret > 0 )
{
wrotelen += writeret;
}
while( wrotelen < nbyte )
{
struct pollfd pf = { 0 };
pf.fd = fd;
pf.events = ( POLLOUT | POLLERR | POLLHUP );
poll( &pf,1,timeout );
writeret = g_sys_write_func( fd,(const char*)buf + wrotelen,nbyte - wrotelen );
if( writeret <= 0 )
{
break;
}
wrotelen += writeret ;
}
if (writeret <= 0 && wrotelen == 0)
{
return writeret;
}
return wrotelen;
}
三种情况同 read 。
poll将协程挂起,等待发送缓冲区有空余空间唤醒协程或者超时唤醒
钩子函数connect
int connect(int fd, const struct sockaddr *address, socklen_t address_len)
{
HOOK_SYS_FUNC( connect );
if( !co_is_enable_sys_hook() )
{
return g_sys_connect_func(fd,address,address_len);
}
//1.sys call
int ret = g_sys_connect_func( fd,address,address_len );
rpchook_t *lp = get_by_fd( fd );
if( !lp ) return ret;
if( sizeof(lp->dest) >= address_len )
{
memcpy( &(lp->dest),address,(int)address_len );
}
if( O_NONBLOCK & lp->user_flag )
{
return ret;
}
if (!(ret < 0 && errno == EINPROGRESS))
{
return ret;
}
//2.wait
int pollret = 0;
struct pollfd pf = { 0 };
for(int i=0;i<3;i++) //25s * 3 = 75s
{
memset( &pf,0,sizeof(pf) );
pf.fd = fd;
pf.events = ( POLLOUT | POLLERR | POLLHUP );
pollret = poll( &pf,1,25000 );
if( 1 == pollret )
{
break;
}
}
if( pf.revents & POLLOUT ) //connect succ
{
errno = 0;
return 0;
}
//3.set errno
int err = 0;
socklen_t errlen = sizeof(err);
getsockopt( fd,SOL_SOCKET,SO_ERROR,&err,&errlen);
if( err )
{
errno = err;
}
else
{
errno = ETIMEDOUT;
}
return ret;
}
如果用户启用hook,且未设置 O_NONBLOCK,libco悄悄帮用户设置了O_NONBLOCK,但调用connect后不能立即返回, 因为connect有三次握手的过程,内核中对三次握手的超时限制是75秒,超时则会失败。libco 设置O_NONBLOCK后,立即调用系统 connect 可能会失败,因此循环三次, 每次设置超时时间25秒,然后挂起协程,等待 connect 成功或超时。
协程的事件注册
/*
{ struct pollfd pf = {0};
pf.fd = g_listen_fd;
pf.events = (POLLIN | POLLERR | POLLHUP);
co_poll(co_get_epoll_ct(), &pf, 1, 1000);
}
int co_poll(stCoEpoll_t *ctx, struct pollfd fds[], nfds_t nfds, int timeout_ms)
{
return co_poll_inner(ctx, fds, nfds, timeout_ms, NULL);
}
所以 fds[].fd = g_listen_fd
*/
typedef int (*poll_pfn_t)(struct pollfd fds[], nfds_t nfds, int timeout);
int co_poll_inner(stCoEpoll_t *ctx, struct pollfd fds[], nfds_t nfds, int timeout, poll_pfn_t pollfunc)
{
if (timeout == 0)
{
return pollfunc(fds, nfds, timeout);
}
if (timeout < 0)
{
timeout = INT_MAX;
}
int epfd = ctx->iEpollFd;
//(1). epoll_create创建的epoll描述符,后文通过 epfd 管理所有fd
stCoRoutine_t *self = co_self();
//1.struct change
stPoll_t &arg = *((stPoll_t *)malloc(sizeof(stPoll_t)));
memset(&arg, 0, sizeof(arg));
arg.iEpollFd = epfd;
arg.fds = (pollfd *)calloc(nfds, sizeof(pollfd));
arg.nfds = nfds;
//(2).nfds少于3个时直接在栈上分配空间(更快),否则在堆上 分配
stPollItem_t arr[2];
if (nfds < sizeof(arr) / sizeof(arr[0]) && !self->cIsShareStack)
{
arg.pPollItems = arr;
}
else
{
arg.pPollItems = (stPollItem_t *)malloc(nfds * sizeof(stPollItem_t));
}
memset(arg.pPollItems, 0, nfds * sizeof(stPollItem_t));
//(3)记录就绪fd的回调函数OnPollProcessEvent,该回调函数会切回对应的协程。等价于co_resume
arg.pfnProcess = OnPollProcessEvent;
/*
void OnPollProcessEvent(stTimeoutItem_t *ap)
{
stCoRoutine_t *co = (stCoRoutine_t *)ap->pArg;
co_resume(co);
}
事件到来是直接调用这个函数就行了,而这个函数就会唤醒结构对应的协程开始进行工作
*/
arg.pArg = GetCurrCo(co_get_curr_thread_env());
//2. add epoll
for (nfds_t i = 0; i < nfds; i++)
{
arg.pPollItems[i].pSelf = arg.fds + i;
arg.pPollItems[i].pPoll = &arg;
//(4)记录切回协程之前的预处理函数 OnPollPreparePfn
arg.pPollItems[i].pfnPrepare = OnPollPreparePfn;
struct epoll_event &ev = arg.pPollItems[i].stEvent;
if (fds[i].fd > -1) //只有 fd >-1 才会添加进去,所以请看:
/*accept_routine():
struct pollfd pf = {0};
pf.fd = -1;
poll(&pf, 1, 1000);
*/
{
ev.data.ptr = arg.pPollItems + i;
ev.events = PollEvent2Epoll(fds[i].events);
//加入EPOLL
int ret = co_epoll_ctl(epfd, EPOLL_CTL_ADD, fds[i].fd, &ev); if (ret < 0 && errno == EPERM && nfds == 1 && pollfunc != NULL)
{
if (arg.pPollItems != arr)
{
free(arg.pPollItems);
arg.pPollItems = NULL;
}
free(arg.fds);
free(&arg);
return pollfunc(fds, nfds, timeout);
}
}
//if fail,the timeout would work
}
//3.add timeout,添加对应的定时器
unsigned long long now = GetTickMS();
arg.ullExpireTime = now + timeout;
//(5) 将arg加入超时队列pTimeOut
int ret = AddTimeout(ctx->pTimeout, &arg, now); //添加操作,该函数执行成功返回值为 0
int iRaiseCnt = 0;
//不成功
if (ret != 0)
{
co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld",
ret, now, timeout, arg.ullExpireTime);
errno = EINVAL;
iRaiseCnt = -1;
}
//成功添加
else
{
printf("co_poll_inner 切换出 CPU\n");
//(6)挂起协程。等到所有协程均挂起, 主协程开始运行。
co_yield_env(co_get_curr_thread_env());
//从 accept 协程,切回到主协程中,回到 main 里面
//切换回来不是到这里了吗?下面删除掉干嘛啊?
iRaiseCnt = arg.iRaiseCnt;
}
{
//clear epoll status and memory,
//这里将定时器也删除了
//这是为什么?实在是有点想不通啊!!!
RemoveFromLink<stTimeoutItem_t, stTimeoutItemLink_t>(&arg);
for (nfds_t i = 0; i < nfds; i++)
{
printf("清除相关信息\n");
int fd = fds[i].fd; //假如是 accept_routine g_listen_fd
if (fd > -1)
{
co_epoll_ctl(epfd, EPOLL_CTL_DEL, fd,
&arg.pPollItems[i].stEvent);
}
fds[i].revents = arg.fds[i].revents;
}
if (arg.pPollItems != arr)
{
free(arg.pPollItems);
arg.pPollItems = NULL;
}
free(arg.fds);
free(&arg);
}
return iRaiseCnt;
}
- 参数
stCoEpoll_t *ctx
:为 EPOLL的环境变量,包含:epoll描述符iEpollFd,超时队列pTimeOut,已超时队列pstTimeoutList,就绪队列pstActiveList,epoll_wait就绪事件集合result
struct stCoEpoll_t
{
int iEpollFd;
static const int _EPOLL_SIZE = 1024 * 10;
struct stTimeout_t *pTimeout;
struct stTimeoutItemLink_t *pstTimeoutList;
struct stTimeoutItemLink_t *pstActiveList;
co_epoll_res *result;
};
- 参数
struct pollfd fds[], nfds_t nfds, int timeout
:顾名思义 - 参数
poll_pfn_t pollfunc
:系统poll 函数。
注意事项:
注意到co_poll_inner传入的fd数组,而arg只是链表中的一个元素。
假设co_poll_inner传入10 个文件描述符,如果只有1个fd就绪,OnPollPreparePfn从pTimeOut删除arg,则10个文件fd都 从超时队列删除
在切回协程时将10个描述符都从红黑树删除,然 后应用层需要将9个未就绪的fd重新调用co_poll_inner再加入红黑树。
如果每次只就绪一个fd, 这样共需要加入红黑树:10 + 9+ 8 +… +1次,效率低于10次poll,每次只poll一个fd。 co_poll_inner提供传入fd数组的原因是,co_poll_inner是poll调用的,而poll是hook的系统函 数,不能改变系统的语义。系统poll支持数组的原因是,调用系统poll一次,可检查多个fd,比 调用系统poll多次,每次检查一个fd,效率更高。因此系统poll适合一次poll多个fd,但libco自定义的钩子函数poll不适合一次 poll 多个 fd ,所以libco使用poll时需避免一次poll多个fd。
协程的事件循环(epoll + 非阻塞)
协程的epoll多路复用IO模型使用的是非阻塞IO,发起read操作后,可立即挂起协程,并调度其 他协程
void co_eventloop(stCoEpoll_t *ctx, pfn_co_eventloop_t pfn, void *arg)
{
if (!ctx->result)
{
ctx->result = co_epoll_res_alloc(stCoEpoll_t::_EPOLL_SIZE);
}
co_epoll_res *result = ctx->result;
for (;;)
{
//epoll_wait() 等待 I/O 就绪事件,最大等待时长设置为 1 毫秒,
//为了配合时间轮工作,滴答时钟
int ret = co_epoll_wait(ctx->iEpollFd, result, stCoEpoll_t::_EPOLL_SIZE, 1);
//active 指针指向当前执行环境的 pstActiveList 队列,注意这里面可能已经有“活跃”的待处理事件
stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
memset(timeout, 0, sizeof(stTimeoutItemLink_t));
//循环处理 epoll_wait() 得到的 I/O 就绪文件描述符
for (int i = 0; i < ret; i++)
{
stTimeoutItem_t *item = (stTimeoutItem_t *)result->events[i].data.ptr;
/*
处理就绪的文件描述符。如果用户设置了预处理回调,则调用 pfnPrepare 做预处理(15行);
否则直接将就绪事件 item 加入 active 队列。实际上,pfnPrepare() 预处理函数内部也是会将就绪 item 加入 active 队列,最终都是加入到 active 队列等待统一处理。*/
//(1)执行预处理函数,将就绪 fd 从超时队列 pTimeOut 移除,并加入就绪队列 pstActiveList
if (item->pfnPrepare)
{
item->pfnPrepare(item, result->events[i], active);
}
else
{
AddTail(active, item);
}
}
unsigned long long now = GetTickMS();
TakeAllTimeout(ctx->pTimeout, now, timeout);
stTimeoutItem_t *lp = timeout->head;
// 遍历 timeout 队列,设置事件已超时标志(bTimeout 设为 true)。
while (lp)
{
//printf("raise timeout %p\n",lp);
lp->bTimeout = true; ////标记为超时
lp = lp->pNext;
}
// (2) TakeAllTimeout 拿出超时队列里所有超时元素,并加入 pstActiveList
Join<stTimeoutItem_t, stTimeoutItemLink_t>(active, timeout);
//才开始真正 处理 active 中的事件 1.epoll就绪事件、2.时间轮超时事件。
/*
遍历 active 队列,调用工作协程设置的 pfnProcess() 回调函数 resume 挂起的工作协程,处理对应的 I/O 或超时事件。
*/
lp = active->head;
while (lp)
{
PopHead<stTimeoutItem_t, stTimeoutItemLink_t>(active);
if (lp->bTimeout && now < lp->ullExpireTime)
{
//(3)将807行取出的未超时事件再加回超时队列,
// 因为 TakeAllTimeout 拿出的不一定都是超时事件
/*
超时队列底层实现是60000大小的循环数组,存放每毫秒(共60000毫秒)的超时事件,
每个数组的元素均是一条链表,循环数组的目的是便 于通过下标找到所有超时链表。
例如超时时间是10毫秒的所有事件均记录在数组下标为9
(在 循环数组实际的下标可能不是9,仅举个例子)的链表里,
所有超时时间大于60000毫秒的事 件均记录在数组下标为59999的链表里。
如果取出超时时间是60000毫秒的事件,
TakeAllTimeout会把超时时间大于60000毫秒的也取出来,
因此需要再把超时时间大于60000 毫秒的重新加回超时队列
*/
int ret = AddTimeout(ctx->pTimeout, lp, now);
if (!ret)
{
lp->bTimeout = false;
lp = active->head;
continue;
}
}
if (lp->pfnProcess)
{
//(4)协程超时或fd就绪时调用pfnProcess即 OnPollProcessEvent切回协程
lp->pfnProcess(lp); //注意这里,
/*
co_poll_inner:
arg.pfnProcess = OnPollProcessEvent( ==> resume );
*/
}
lp = active->head;
}
if (pfn)
{
if (-1 == pfn(arg))
{
break;
}
}
}
}
激活对应协程的方式
-
EPOLL回调激活:协程hook住了底层socket族函数,设置了O_NONBLOCK,调用socket族函数后,调用 poll 注 册epoll事件并挂起协程,让其他协程执行,所有协程都挂起后通过epoll,在主协程里检查注册 的IO事件,若 fd 就绪则切到对应协程。
-
超时激活:当前协程通过语句poll(NULL, 0, duration),可设置协程的超时时间间隔duration。poll是被 hook住的函数,执行poll之后,当前协程会被加到超时队列pTimeOut,并被切换到其他协程, 所有协程挂起后,主协程扫描超时队列,找到超时的协程,并切换。因此可用poll实现协程的 睡眠
注意不可用sleep,因为sleep会睡眠线程,线程睡眠了,协程无法被调度,所有的协程 也都不会执行了。
使用Libco需要注意的地方
- 共享栈下内容篡改
- poll 效率
- 协程栈的128KB溢出