这两天刷算法,感觉自己凉了,不看人家的答案自己连题都看不懂。天赋吧,这没得救了。于是决定先把算法放一下,找点比较有意思的事情做做,所以就突发奇想不如看一下epoll源码吧,整天用这个东西,了解一下他实现有益无害吧!
在网络编程中,有常用的IO复用有三个,对于不同的场景,各显神通。即select,poll,epoll,当然select和poll不是该篇文章的主角,下面我主要的是epoll的详细实现原理。当然select和poll我将作对比性的描述。
下面我们思考一个问题:
- 为什么存在I/O复用,使用IO复用和不使用在单进程中有什么影响?
这个问题在网上没找到答案,可能这个“为什么”提出来并没有什么意义。其实答案比较简单,多路复用是在单个线程中,实现跟踪多socket套接字状态变化的一种机制。在不使用IO复用的情况下,没法实现单线程检测多个套接字状态变化。(以上是个人见解,仅供参考)
我们都使用select,poll,epoll这些系统调用来实现IO复用机制。对于前两者,每次只要有就绪事件,要将所有fd传递给select或poll系统调用,这意味着将所有的fd从用户态拷贝到内核态。这就多余了一些没有意义的拷贝,因为有些描述符上还没有IO事件的发生,只因为其他的描述符有IO事件就被拷贝到内核态。
再一个有必要区分一下select和poll,select系统调用通过一个位数组fd_bits收集要监听的描述符,内核中的宏定义限制了最大可监听的描述符数量为1024个。
而对于poll没有限制监听的数量,只要来一个新连接,就将这个连接加到poll维护的描述符链表中。
对于以上两种IO复用机制都没有解决避免复制非就绪连接到内核的无效操作。对于连接较少的状况,以上三种IO复用机制效率相当,但要是连接数量上升,程序中分别使用三者的性能差异就显而易见了。
epoll为什么高效快捷?
epoll相关的系统调用有:epoll_create, epoll_ctl和epoll_wait。
在Linux系统中,epoll想内核注册了一个epoll文件系统,用来存储被监控的文件描述符。
当epoll_create时,就会在epoll文件系统创建file节点,只服务于epoll。当epoll在内核中被初始化时,操作系统回为epoll开辟出一个内核高速缓冲区(连续的物理内存页
),用来保存保存红黑树结构
。
static int __init eventpoll_init(void)
{
mutex_init(&pmutex);
ep_poll_safewake_init(&psw);
//开辟slab高速缓冲区
epi_cache = kmem_cache_create(“eventpoll_epi”,sizeof(struct epitem),
0,SLAB_HWCACHE_ALIGN|EPI_SLAB_DEBUG|
SLAB_PANIC,NULL);
pwq_cache = kmem_cache_create(“eventpoll_pwq”,sizeof(struct
eppoll_entry),0,EPI_SLAB_DEBUG|SLAB_PANIC,NULL);
return 0;
}
在刚开始学习epoll的时候就听过epoll底层的实现核心数据结构就是红黑树,红黑树是一种自平衡二叉树,在查找和删除效率较高。
在调用epoll_create后,epoll底层实现会在epoll的高速缓冲区中创建一个红黑树用来接收后面到来的连接,再者就是会创建一个就绪链表,保存发生就绪事件的fd:
//当来一个新连接,基于slab高速缓存(可以理解为一种分配内存的机制,对于频繁的使用和销毁特别的高效),创建一个epitem对象,保存下面描述的信息,然后将这个对象加到红黑树中。
struct epitem {
struct rb_node rbn; //用于主结构管理的红黑树
struct list_head rdllink; //事件就绪队列
struct epitem *next; //用于主结构体中的链表
struct epoll_filefd ffd; //这个结构体对应的被监听的文件描述符信息
int nwait; //poll操作中事件的个数
struct list_head pwqlist; //双向链表,保存着被监视文件的等待队列,功能类似于select/poll中的poll_table
struct eventpoll *ep; //该项属于哪个主结构体(多个epitm从属于一个eventpoll)
struct list_head fllink; //双向链表,用来链接被监视的文件描述符对应的struct file。因为file里有f_ep_link,用来保存所有监视这个文件的epoll节点
struct epoll_event event; //注册的感兴趣的事件,也就是用户空间的epoll_event
}
//每个epoll fd(epfd)对应的主要数据结构为:
struct eventpoll {
spin_lock_t lock; //对本数据结构的访问
struct mutex mtx; //防止使用时被删除
wait_queue_head_t wq; //sys_epoll_wait() 使用的等待队列
wait_queue_head_t poll_wait; //file->poll()使用的等待队列
struct list_head rdllist; //事件满足条件的链表
struct rb_root rbr; //用于管理所有fd的红黑树(树根)
struct epitem *ovflist; //将事件到达的fd进行链接起来发送至用户空间
}
//内核通过创建一个fd与这个结构相关联,管理红黑树结构,即epoll_create1实现
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error, fd;
//////////////////////////////////////////////////////////////////////////////////////////////////////
struct eventpoll *ep = NULL;
struct file *file;
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
////////////////////////////////////////////////////////////////////////////////////////////////////
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
////////////////////////////////////////////////////////////////////////////////////////////
ep->file = file;
fd_install(fd, file);
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}
//创建并初始化eventpoll结构体实例,由ep_alloc实现(第52行)可知,调用kzalloc在内核空间申请内存并初始化清零。
static int ep_alloc(struct eventpoll **pep)
{
int error;
struct user_struct *user;
struct eventpoll *ep;
user = get_current_user();
error = -ENOMEM;
ep = kzalloc(sizeof(*ep), GFP_KERNEL);
if (unlikely(!ep))
goto free_uid;
spin_lock_init(&ep->lock);
mutex_init(&ep->mtx);
init_waitqueue_head(&ep->wq);
init_waitqueue_head(&ep->poll_wait);
INIT_LIST_HEAD(&ep->rdllist);
ep->rbr = RB_ROOT;
ep->ovflist = EP_UNACTIVE_PTR;
ep->user = user;
*pep = ep;
return 0;
free_uid:
free_uid(user);
return error;
}
当红黑树中有IO事件时,会通过红黑树的删除操作将节点取下放到就绪事件链表rdlist中(也就是将就绪事件从红黑树中取出放到就绪链表中),然后通过epoll_wait返回到用户进程,在进程中进行逻辑处理或者又进行IO处理,所以对于epoll来说只需关心就绪链表中的描述符。不必像poll和select一样只要有个别的fd发生IO活动就遍历整个集合或者链表。
- 还有就是网上所说的epoll还使用了mmap,实现用户空间和内核空间的共享?
通过前面的例子发现这一说法是站不住脚的,通过源码我们可以看出epoll高速缓存是存在的,他就是用来保存红黑树结构的,来一个新连接,加到红黑树中进行检测,这些已经在逻辑上很合理了,如果存在用户空间和内核空间共享内存的话,那岂不是画蛇添足?所以不论是epoll fd本身或者epitem都是基于内核内存分配创建,不存在mmap之说。
下面代码主要是往红黑树中添加节点EPOLL_CTL_ADD,还有就是取出就绪事件的过程:
static int ep_insert(struct eventpoll *ep, struct epoll_event *event, struct file *tfile, int fd)
{
int error ,revents,pwake = 0;
unsigned long flags ;
struct epitem *epi;
/*
struct ep_queue{
poll_table pt;
struct epitem *epi;
} */
struct ep_pqueue epq;
//分配一个epitem结构体来保存每个加入的fd
if(!(epi = kmem_cache_alloc(epi_cache,GFP_KERNEL)))
goto error_return;
//初始化该结构体
ep_rb_initnode(&epi->rbn);
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd,tfile,fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
epq.epi = epi;
//安装poll回调函数
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc );
/* 调用poll函数来获取当前事件位,其实是利用它来调用注册函数ep_ptable_queue_proc(poll_wait中调用)。
如果fd是套接字,f_op为socket_file_ops,poll函数是
sock_poll()。如果是TCP套接字的话,进而会调用
到tcp_poll()函数。此处调用poll函数查看当前
文件描述符的状态,存储在revents中。
在poll的处理函数(tcp_poll())中,会调用sock_poll_wait(),
在sock_poll_wait()中会调用到epq.pt.qproc指向的函数,
也就是ep_ptable_queue_proc()。 */
revents = tfile->f_op->poll(tfile, &epq.pt);
spin_lock(&tfile->f_ep_lock);
list_add_tail(&epi->fllink,&tfile->f_ep_lilnks);
spin_unlock(&tfile->f_ep_lock);
ep_rbtree_insert(ep,epi); //将该epi插入到ep的红黑树中
spin_lock_irqsave(&ep->lock,flags);
// revents & event->events:刚才fop->poll的返回值中标识的事件有用户event关心的事件发生。
// !ep_is_linked(&epi->rdllink):epi的ready队列中有数据。ep_is_linked用于判断队列是否为空。
/* 如果要监视的文件状态已经就绪并且还没有加入到就绪队列中,则将当前的
epitem加入到就绪队列中.如果有进程正在等待该文件的状态就绪,则
唤醒一个等待的进程。 */
if((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink,&ep->rdllist); //将当前epi插入到ep->ready队列中。
/* 如果有进程正在等待文件的状态就绪,
也就是调用epoll_wait睡眠的进程正在等待,
则唤醒一个等待进程。
waitqueue_active(q) 等待队列q中有等待的进程返回1,否则返回0。
*/
if(waitqueue_active(&ep->wq))
__wake_up_locked(&ep->wq,TAKS_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE);
/* 如果有进程等待eventpoll文件本身(???)的事件就绪,
则增加临时变量pwake的值,pwake的值不为0时,
在释放lock后,会唤醒等待进程。 */
if(waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock,flags);
if(pwake)
ep_poll_safewake(&psw,&ep->poll_wait);//唤醒等待eventpoll文件状态就绪的进程
return 0;
}
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
revents = tfile->f_op->poll(tfile, &epq.pt);
这两个函数将ep_ptable_queue_proc注册到epq.pt中的qproc。
typedef struct poll_table_struct {
poll_queue_proc qproc;
unsigned long key;
}poll_table;
执行f_op->poll(tfile, &epq.pt)时,XXX_poll(tfile, &epq.pt)函数会执行poll_wait(),poll_wait()会调用epq.pt.qproc函数,即ep_ptable_queue_proc。
ep_ptable_queue_proc函数如下:
/* 在文件操作中的poll函数中调用,将epoll的回调函数加入到目标文件的唤醒队列中。
如果监视的文件是套接字,参数whead则是sock结构的sk_sleep成员的地址。 */
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt) {
/* struct ep_queue{
poll_table pt;
struct epitem *epi;
} */
struct epitem *epi = ep_item_from_epqueue(pt); //pt获取struct ep_queue的epi字段。
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
/* We have to signal that an error occurred */
/*
* 如果分配内存失败,则将nwait置为-1,表示
* 发生错误,即内存分配失败,或者已发生错误
*/
epi->nwait = -1;
}
}
因为epoll是基于红黑树实现的,ep_find、ep_insert、ep_modify、ep_remove等基于红黑树的查找、插入等操作的时间复杂度都是O(logn)。所以epoll快,高效。
结合下面的图应该会更加清晰epoll事件处理过程了。
对于epoll的边缘触发
ET
和水平触发LT
,在边缘触发的情况下,当套接字从一种状态变化到另一种状态,才会进行IO操作。举个例子:缓冲区本来是空的,现在突然有数据了,也就是由无到有的过程,在ET模式下,会一次性将数据取走,要是缓冲区中还有没取完的数据,这个时候就得分设置的套接字为阻塞模式或者非阻塞模式了,要是设置套接字为阻塞模式的话,在ET模式下,读事件触发,一次性把数据要是读不完,之后,就再不会触发ET时间了,这样会使套接字一直阻塞下去,读不到新数据;要是设置为非阻塞了,那我们在处理读时间处就应该设置循环,意思就是事件尽管触发一次,但读的次数由我来决定,我要读到套接字返回EAGAIN为止。这就是使用epoll的情况时,套接字设置为非阻塞的原因!
对于LT模式对于未取走的数据,这次没取走的话,下次发现还没取完再来取就行,比较简单。
既然ET模式那么麻烦,LT操的心又那么少,那为什么通常使用ET模式呢?因为在LT模式下,会不断产生epoll相关系统调用和i/o处理相关系统调用,而在ET模式,设置套接字非阻塞,只会进行io处理相关系统调用。在Linux中系统调用相比于用户态普通接口调用是比较吃性能的,所以在程序种尽量想办法减少系统调用的频数,这样程序的性能才会提高!
(ps.本段是个人见解,要是有其他见解的话,希望能分享一下)
对于IO多路复用的总结至此告一段落,源码解读将会持续,后面同样也会持续更新这篇博客。要是描述有误,欢迎交流指正!
本篇文章参考文档:
http://www.pandademo.com/2016/11/the-discrimination-of-linux-kernel-epoll-et-and-lt/