主函数:
int main(int argc, char *argv[])
{
g_listen_fd = CreateTcpSocket(port, ip, true);
listen(g_listen_fd, 1024);
SetNonBlock(g_listen_fd);
for (int i = 0; i < cnt; i++)
{
task_t *task = (task_t *)calloc(1, sizeof(task_t));
task->fd = -1;
co_create(&(task->co), NULL, readwrite_routine, task);
/*
使用函数 readwrite_coroutine 创建多个读写协程,
调用 co_resume 启动协程运行直到其挂起
*/
co_resume(task->co);
}
stCoRoutine_t *accept_co = NULL;
co_create(&accept_co, NULL, accept_routine, 0);
co_resume(accept_co);
co_eventloop(co_get_epoll_ct(), 0, 0);
/* co_eventloop 实现事件的监听和协程的循环切换;*/
exit(0);
return 0;
}
readwrite_routine 函数
static void *readwrite_routine(void *arg)
{
co_enable_hook_sys();
task_t *co = (task_t *)arg;
char buf[1024 * 16];
for (;;)
{
if (-1 == co->fd)
{
g_readwrite.push(co);
//将新创建的读写协程都加入到队列 g_readwrite 中
co_yield_ct();
//让出 CPU,libco是对称协程,所以会回到调用它的地方(main 主线程)
//下次回来还是到这里
continue;
}
int fd = co->fd;
co->fd = -1;
for (;;)
{
struct pollfd pf = {0};
pf.fd = fd;
pf.events = (POLLIN | POLLERR | POLLHUP);
co_poll(co_get_epoll_ct(), &pf, 1, 1000);
//内部就是:注册fd -> 加入定时器 -> 切换CPU->删除fd和定时器结构
//为什么要删除?
//因为不应该让某一个协程与某个fd绑定啊...(暂时是这样理解的呐!!)
printf("get a message \n");
//之后进行读写
int ret = read(fd, buf, sizeof(buf));
if (ret > 0)
{
printf("massege == %s\n", buf);
ret = write(fd, buf, ret);
}
if (ret > 0 || (-1 == ret && EAGAIN == errno))
{
continue;
}
close(fd);
break;
}
}
return 0;
}
accept_routine 函数
主线程中的函数 co_eventloop 监听网络事件,将来自于客户端新进的连接交由协程 accept_co 处理,关于 co_eventloop 如何唤醒 accept_co 的细节我们将在后续介绍
static void *accept_routine(void *)
{
co_enable_hook_sys();
printf("accept_routine\n");
fflush(stdout);
for (;;)
{
//printf("pid %ld g_readwrite.size %ld\n",getpid(),g_readwrite.size());
/* 检查队列 g_readwrite 是否有空闲的读写 coroutine */
if (g_readwrite.empty())
{
printf("empty\n"); //sleep
struct pollfd pf = {0};
pf.fd = -1;
/*
会调用 co_poll_inner 函数
这里是只是构造了一个空事件,满足参数要求,后续的 co_poll_inner
会检测 fd > -1
并将该协程加入到 Epoll 管理的定时器队列中,时间到达,就会唤醒该协程
时间设置 1000 ms
*/
poll(&pf, 1, 1000);
continue;
}
struct sockaddr_in addr; //maybe sockaddr_un;
memset(&addr, 0, sizeof(addr));
socklen_t len = sizeof(addr);
int fd = co_accept(g_listen_fd, (struct sockaddr *)&addr, &len); // accept
if (fd < 0)
{
printf(" co_accept() fd < 0 \n");
struct pollfd pf = {0};
pf.fd = g_listen_fd;
pf.events = (POLLIN | POLLERR | POLLHUP);
/*
还是会调用 co_poll_inner 函数
如果接收连接失败,那么调用 co_poll 将服务端的 listen_fd 加入到 Epoll 中
并加入其对应的定时器,唤醒的条件有两个:(1)事件到达 (2)定时器超时
之后切换出CPU,交给主线程,主线程中通过eventloop->epoll调度协程,
如果连接套接字有事件发生,就回到co_poll_inne函数,
然后这个函数就会将该套接字的结构从epoll中删除,将其定时器也删除掉,
之后就到了这里
*/
co_poll(co_get_epoll_ct(), &pf, 1, 1000);
continue;
}
if (g_readwrite.empty())
{
close(fd);
continue;
}
SetNonBlock(fd);
printf("a new connection\n");
task_t *co = g_readwrite.top();
co->fd = fd;
g_readwrite.pop();
co_resume(co->co);
}
return 0;
}
eventloop函数
void co_eventloop(stCoEpoll_t *ctx, pfn_co_eventloop_t pfn, void *arg)
{
if (!ctx->result)
{
ctx->result = co_epoll_res_alloc(stCoEpoll_t::_EPOLL_SIZE);
}
co_epoll_res *result = ctx->result;
for (;;)
{
//epoll_wait() 等待 I/O 就绪事件,最大等待时长设置为 1 毫秒,
//为了配合时间轮工作,滴答时钟
int ret = co_epoll_wait(ctx->iEpollFd, result, stCoEpoll_t::_EPOLL_SIZE, 1);
//active 指针指向当前执行环境的 pstActiveList 队列,注意这里面可能已经有“活跃”的待处理事件
stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
memset(timeout, 0, sizeof(stTimeoutItemLink_t));
//循环处理 epoll_wait() 得到的 I/O 就绪文件描述符
for (int i = 0; i < ret; i++)
{
stTimeoutItem_t *item = (stTimeoutItem_t *)result->events[i].data.ptr;
/*
处理就绪的文件描述符。如果用户设置了预处理回调,则调用 pfnPrepare 做预处理(15行);
否则直接将就绪事件 item 加入 active 队列。实际上,pfnPrepare() 预处理函数内部也是会将就绪
item 加入 active 队列,
最终都是加入到 active 队列等待统一处理。*/
//执行预处理函数
if (item->pfnPrepare)
{
item->pfnPrepare(item, result->events[i], active);
}
else
{
AddTail(active, item);
}
}
unsigned long long now = GetTickMS();
TakeAllTimeout(ctx->pTimeout, now, timeout); //从时间轮上取出已超时的事件,放到 timeout 队列,时间轮转动
stTimeoutItem_t *lp = timeout->head;
// 遍历 timeout 队列,设置事件已超时标志(bTimeout 设为 true)。
while (lp)
{
//printf("raise timeout %p\n",lp);
lp->bTimeout = true; ////标记为超时
lp = lp->pNext;
}
// 将 timeout 队列中事件合并到 active 队列。
Join<stTimeoutItem_t, stTimeoutItemLink_t>(active, timeout);
//才开始真正 处理 active 中的事件 1.epoll就绪事件、2.时间轮超时事件。
/*
遍历 active 队列,调用工作协程设置的 pfnProcess() 回调函数 resume 挂起的工作协程,处理对应的 I/O 或超时事件。
*/
lp = active->head;
while (lp)
{
PopHead<stTimeoutItem_t, stTimeoutItemLink_t>(active);
if (lp->bTimeout && now < lp->ullExpireTime)
{
int ret = AddTimeout(ctx->pTimeout, lp, now);
if (!ret)
{
lp->bTimeout = false;
lp = active->head;
continue;
}
}
if (lp->pfnProcess)
{
lp->pfnProcess(lp); //注意这里,
/*
co_poll_inner:
arg.pfnProcess = OnPollProcessEvent( ==> resume );
*/
}
lp = active->head;
}
if (pfn)
{
if (-1 == pfn(arg))
{
break;
}
}
}
}
co_poll_inner 注册事件的函数
/*
{ struct pollfd pf = {0};
pf.fd = g_listen_fd;
pf.events = (POLLIN | POLLERR | POLLHUP);
co_poll(co_get_epoll_ct(), &pf, 1, 1000);
}
int co_poll(stCoEpoll_t *ctx, struct pollfd fds[], nfds_t nfds, int timeout_ms)
{
return co_poll_inner(ctx, fds, nfds, timeout_ms, NULL);
}
所以 fds[].fd = g_listen_fd
*/
typedef int (*poll_pfn_t)(struct pollfd fds[], nfds_t nfds, int timeout);
int co_poll_inner(stCoEpoll_t *ctx, struct pollfd fds[], nfds_t nfds, int timeout, poll_pfn_t pollfunc)
{
if (timeout == 0)
{
return pollfunc(fds, nfds, timeout);
}
if (timeout < 0)
{
timeout = INT_MAX;
}
int epfd = ctx->iEpollFd;
stCoRoutine_t *self = co_self();
//1.struct change
stPoll_t &arg = *((stPoll_t *)malloc(sizeof(stPoll_t)));
memset(&arg, 0, sizeof(arg));
arg.iEpollFd = epfd;
arg.fds = (pollfd *)calloc(nfds, sizeof(pollfd));
arg.nfds = nfds;
stPollItem_t arr[2];
if (nfds < sizeof(arr) / sizeof(arr[0]) && !self->cIsShareStack)
{
arg.pPollItems = arr;
}
else
{
arg.pPollItems = (stPollItem_t *)malloc(nfds * sizeof(stPollItem_t));
}
memset(arg.pPollItems, 0, nfds * sizeof(stPollItem_t));
arg.pfnProcess = OnPollProcessEvent;
/*
void OnPollProcessEvent(stTimeoutItem_t *ap)
{
stCoRoutine_t *co = (stCoRoutine_t *)ap->pArg;
co_resume(co);
}
事件到来是直接调用这个函数就行了,而这个函数就会唤醒结构对应的协程开始进行工作
*/
arg.pArg = GetCurrCo(co_get_curr_thread_env());
//2. add epoll
for (nfds_t i = 0; i < nfds; i++)
{
arg.pPollItems[i].pSelf = arg.fds + i;
arg.pPollItems[i].pPoll = &arg;
arg.pPollItems[i].pfnPrepare = OnPollPreparePfn;
struct epoll_event &ev = arg.pPollItems[i].stEvent;
if (fds[i].fd > -1) //只有 fd >-1 才会添加进去,所以请看:
/*accept_routine():
struct pollfd pf = {0};
pf.fd = -1;
poll(&pf, 1, 1000);
*/
{
ev.data.ptr = arg.pPollItems + i;
ev.events = PollEvent2Epoll(fds[i].events);
int ret = co_epoll_ctl(epfd, EPOLL_CTL_ADD, fds[i].fd, &ev);
if (ret < 0 && errno == EPERM && nfds == 1 && pollfunc != NULL)
{
if (arg.pPollItems != arr)
{
free(arg.pPollItems);
arg.pPollItems = NULL;
}
free(arg.fds);
free(&arg);
return pollfunc(fds, nfds, timeout);
}
}
//if fail,the timeout would work
}
//3.add timeout,添加对应的定时器
unsigned long long now = GetTickMS();
arg.ullExpireTime = now + timeout;
int ret = AddTimeout(ctx->pTimeout, &arg, now); //添加操作,该函数执行成功返回值为 0
int iRaiseCnt = 0;
//不成功
if (ret != 0)
{
co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld",
ret, now, timeout, arg.ullExpireTime);
errno = EINVAL;
iRaiseCnt = -1;
}
//成功添加
else
{
printf("co_poll_inner 切换出 CPU\n");
co_yield_env(co_get_curr_thread_env());
//从 accept 协程,切回到主协程中,回到 main 里面
//切换回来不是到这里了吗?下面删除掉干嘛啊?
iRaiseCnt = arg.iRaiseCnt;
}
{
//clear epoll status and memory,
//这里将定时器也删除了
//这是为什么?实在是有点想不通啊!!!
RemoveFromLink<stTimeoutItem_t, stTimeoutItemLink_t>(&arg);
for (nfds_t i = 0; i < nfds; i++)
{
printf("清除相关信息\n");
int fd = fds[i].fd; //假如是 accept_routine g_listen_fd
if (fd > -1)
{
co_epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &arg.pPollItems[i].stEvent);
}
fds[i].revents = arg.fds[i].revents;
}
if (arg.pPollItems != arr)
{
free(arg.pPollItems);
arg.pPollItems = NULL;
}
free(arg.fds);
free(&arg);
}
return iRaiseCnt;
}
OK,以上就是 Libco 的事件驱动模型了.
总结一下:主要清楚了两点就行了
1. 注册
- 构造对应的结构呢加入epoll
- 构造定时器加入(注意会有一个回调函数)
- 让出CPU
- 删除对应fd的结构与对应的定时器
- 执行read/write/accept操作
2. 主事件循环
-
调用 epoll_wait() 等待 I/O 就绪事件,为了配合时间轮工作,这里的 timeout 设置为 1 毫秒。
-
active 指针指向当前执行环境的 pstActiveList 队列,注意这里面可能已经有“活跃”的待处理事件。timeout 临时链表
-
处理就绪的文件描述符。有预处理回调就调用 pfnPrepare 做预处理,否则直接将就绪事件 item 加入 active 队列。实际上,pfnPrepare() 预处理函数内部也是会将就绪item加入 active 队列,最终都是加入到 active 队列等到后边统一处理。
-
从时间轮上取出已超时的事件,放到 timeout 队列。
-
遍历 timeout 队列,设置事件已超时标志(bTimeout 设为 true)。
-
将 timeout 队列中事件合并到 active 队列。
-
遍历 active 队列,调用工作协程设置的 pfnProcess() 回调函数 resume 挂起的工作协程,处理对应的 I/O 或超时事件(在注册时会注册一个回调函数)。
这就是主协程的事件循环工作过程,我们看到它周而复始地 epoll_wait(),唤醒挂起的工作协程去处理定时器与 I/O 事件。这里的逻辑看起来跟所有基于 epoll 实现的事件驱动网络框架并没有什么特别之处,更没有涉及到任何协程调度算法,由此也可以看到 libco 其实是一个很典型的非对称协程机制。
参考:
https://blog.didiyun.com/index.php/2018/11/23/libco/#主协程事件循环源码分析