示例代码内核版本为2.6.38
epoll源码解析(1) epoll_create
epoll源码解析(2) epoll_ctl
epoll源码解析(3) epoll_wait
引言
上一篇文章中分析的epoll的重要数据结构和epoll_create的实现,如果说上篇文章是理解epoll的基础,那这篇文章就是解释为何epoll如此高效的原因,我们来一起看看吧!
/*
* epfd 为epoll_create创建的fd
* op 是我们的操作种类 增删改 ADD DEL MOD
* fd 当然是我们要加入的fd了
* event 我们所关心的事件类型 注意只有我们注册的事件才会在epoll_wait被唤醒后传递到用户空间 否则虽然内核可以收到 但不会传递到用户空间
*/
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int did_lock_epmutex = 0;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
/*
static inline int ep_op_has_event(int op)
{
return op != EPOLL_CTL_DEL; // DEL的时候当然不用从用户态拷贝event啦
}
*/
if (ep_op_has_event(op) && //我们可以在上面看到ep_op_has_event的函数体 用以判断是否需要从用户态拷贝数据
copy_from_user(&epds, event, sizeof(struct epoll_event))) //从用户空间把数据拷贝过来
goto error_return;
/* Get the "struct file *" for the eventpoll file */
error = -EBADF;
file = fget(epfd); //得到epoll的file指针
if (!file)
goto error_return;
/* Get the "struct file *" for the target file */
tfile = fget(fd); //得到要添加fd的file指针
if (!tfile)
goto error_fput;
/* The target file descriptor must support poll */
error = -EPERM;
if (!tfile->f_op || !tfile->f_op->poll) //如果要监听的fd不支持poll的话就没办法了 只能报error了.
goto error_tgt_fput;
/*
* We have to check that the file structure underneath the file descriptor
* the user passed to us _is_ an eventpoll file. And also we do not permit
* adding an epoll file descriptor inside itself.
*/
error = -EINVAL;
/*
/*
static inline int is_file_epoll(struct file *f)
{
return f->f_op == &eventpoll_fops; //更快的检测fd是否为一个epoll_fd
}
*/
if (file == tfile || !is_file_epoll(file)) //当然不允许自己监听自己了,同时得保证epfd是一个epoll的fd
goto error_tgt_fput;
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = file->private_data; //上一篇说到private_data中存的是分配好的eventpoll
/*
* When we insert an epoll file descriptor, inside another epoll file
* descriptor, there is the change of creating closed loops, which are
* better be handled here, than in more critical paths.
*
* We hold epmutex across the loop check and the insert in this case, in
* order to prevent two separate inserts from racing and each doing the
* insert "at the same time" such that ep_loop_check passes on both
* before either one does the insert, thereby creating a cycle.
*/
if (unlikely(is_file_epoll(tfile) && op == EPOLL_CTL_ADD)) { //防止epoll插入到另一个epoll中
mutex_lock(&epmutex);
did_lock_epmutex = 1;
error = -ELOOP;
if (ep_loop_check(ep, tfile) != 0)
goto error_tgt_fput;
}
mutex_lock(&ep->mtx); //对fd进行操作时上mtx这把锁
/*
* Try to lookup the file inside our RB tree, Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
epi = ep_find(ep, tfile, fd); //O(logn)查询要操作的fd是否已经存在
/*
static inline void ep_set_ffd(struct epoll_filefd *ffd,
struct file *file, int fd) //创建一个进行比较的项
{
ffd->file = file;
ffd->fd = fd;
}
static inline int ep_cmp_ffd(struct epoll_filefd *p1,
struct epoll_filefd *p2)
{
return (p1->file > p2->file ? +1:
(p1->file < p2->file ? -1 : p1->fd - p2->fd));
}
//从RB-tree中寻找期望插入的fd是否存在
static struct epitem *ep_find(struct eventpoll *ep, struct file *file, int fd)
{
int kcmp;
struct rb_node *rbp;
struct epitem *epi, *epir = NULL;
struct epoll_filefd ffd;
ep_set_ffd(&ffd, file, fd);
for (rbp = ep->rbr.rb_node; rbp; ) {
epi = rb_entry(rbp, struct epitem, rbn);
kcmp = ep_cmp_ffd(&ffd, &epi->ffd); //红黑树的排序规则
if (kcmp > 0)
rbp = rbp->rb_right;
else if (kcmp < 0)
rbp = rbp->rb_left;
else {
epir = epi;
break;
}
}
return epir; //如果找到返回找到的fd所对应的节点
}
*/
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP; //我们可以看到这两个事件显然我们也没有必要注册
error = ep_insert(ep, &epds, tfile, fd);
} else
error = -EEXIST;
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
mutex_unlock(&ep->mtx);
error_tgt_fput:
if (unlikely(did_lock_epmutex))
mutex_unlock(&epmutex);
fput(tfile); //正常关闭文件
error_fput:
fput(file);
error_return:
return error;
}
//注意 这个整个函数调用都是上锁的
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
//是否到达最大的监听数
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) //最有意思的地方 slab机制,在缓存中分配对象,为了更快的访问
return -ENOMEM;
/* Item initialization follow here ... */ //正如内核注释所言 正常的初始化
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
/* Initialize the poll table using the queue callback */
epq.epi = epi;
//设置poll的回调为ep_ptable_queue_proc,这其中藏着epoll如此高效的原因!
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
//进行完这一步 算是把epitem和这个fd连接起来了 会在事件到来的时候执行上面注册的回调
revents = tfile->f_op->poll(tfile, &epq.pt);
/*
* We have to check if something went wrong during the poll wait queue
* install process. Namely an allocation for a wait queue failed due
* high memory pressure.
*/
error = -ENOMEM;
if (epi->nwait < 0)
goto error_unregister;
/* Add the current item to the list of active epoll hook for this file */
spin_lock(&tfile->f_lock); //tfile是要加入的fd的file指针
list_add_tail(&epi->fllink, &tfile->f_ep_links); //file会把监听自己的epitem连接起来
spin_unlock(&tfile->f_lock);
/*
* Add the current item to the RB tree. All RB tree operations are
* protected by "mtx", and ep_insert() is called with "mtx" held.
*/
ep_rbtree_insert(ep, epi); //向ep的红黑树中插入一个epitem
/* We have to drop the new item inside our item list to keep track of it */
spin_lock_irqsave(&ep->lock, flags);
/* If the file is already "ready" we drop it inside the ready list */
//这里处理的是监听的fd刚好有事件发生,如果现在不处理,可能后面不会到来数据,也就刽触发回调,它们可能就再也不会被处理了
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq); //唤醒epoll_wait中的等待队列
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 0;
error_unregister:
ep_unregister_pollwait(ep, epi);
/*
* We need to do this because an event could have been arrived on some
* allocated wait queue. Note that we don't care about the ep->ovflist
* list, since that is used/cleaned only inside a section bound by "mtx".
* And ep_insert() is called with "mtx" held.
*/
spin_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink))
list_del_init(&epi->rdllink);
spin_unlock_irqrestore(&ep->lock, flags);
kmem_cache_free(epi_cache, epi);
return error;
}
我们来看看poll被调用时执行的回调ep_ptable_queue_proc
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
/*
struct eppoll_entry { //这个结构体就是做一个epitem和其回调之间的关联
// List header used to link this structure to the "struct epitem"
struct list_head llink;
// The "base" pointer is set to the container "struct epitem"
struct epitem *base;
//Wait queue item that will be linked to the target file wait
//queue head.
wait_queue_t wait;
// The wait queue head that linked the "wait" wait queue item
wait_queue_head_t *whead;
};
*/
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
//初始化一个等待队列的节点 其中注册的回调为ep_poll_callback
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead; //这个就是监控的fd的等待队列头
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);//把初始化的等待对列头插入到所监控的fd的等待对列中,稍后调用
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;//记录了poll执行回调的次数,即数据来了几次
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
显然 全文的重点就是那个加入到所监听fd等待队列中的回调ep_poll_callback,我们来一睹它的真容吧!
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key) //key是events
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait); //由等待队列中获取对应的epitem
struct eventpoll *ep = epi->ep; //从而得到epoll的结构
spin_lock_irqsave(&ep->lock, flags); //这里要操作rdllist 上锁
/*
* If the event mask does not contain any poll(2) event, we consider the
* descriptor to be disabled. This condition is likely the effect of the
* EPOLLONESHOT bit that disables the descriptor when an event is received,
* until the next EPOLL_CTL_MOD will be issued.
*/
//#define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET)
if (!(epi->event.events & ~EP_PRIVATE_BITS)) //我们看到如果所监控的fd没有注册什么事件的话,就不加入rdllist中
goto out_unlock;
/*
* Check the events coming with the callback. At this stage, not
* every device reports the events in the "key" parameter of the
* callback. We need to be able to handle both cases here, hence the
* test for "key" != NULL before the event match test.
*/
if (key && !((unsigned long) key & epi->event.events)) //所触发的事件我们没有注册的话当然也不加入rdllist中
goto out_unlock;
/*
* If we are trasfering events to userspace, we can hold no locks
* (because we're accessing user memory, and because of linux f_op->poll()
* semantics). All the events that happens during that period of time are
* chained in ep->ovflist and requeued later on.
*/
/*
* 这里就比较有意思了
* 在epoll_wait中EP_UNACTIVE_PTR初始化为EP_UNACTIVE_PTR,而当epoll_wait]中被唤醒后处理rdlist时会将ep->ovflist置为NULL
* 也就是说如果ep->ovflist != EP_UNACTIVE_PTR意味这epoll_wait已被唤醒 正在执行loop
* 此时我们就把在rdllist遍历时发生的事件用ovflist串起来,在遍历结束后插入rdllist中
*/
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
}
goto out_unlock;
}
/* If this file is already in the ready list we exit soon */
//将当前的epitem加入到rdllist中
//这就是epoll如此高效的原因,我们不必每次去遍历fd寻找触发的事件,触发事件时会触发回调自动把epitem加入到rdllist中,
//这使得复杂度从O(N)降到了O(有效事件集合),且我们不必每次注册事件,仅在epoll_ctl(ADD)中注册一次即可(修改除外),
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
//唤醒epoll_wait
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 1;
}
总结
epoll_ctl我们调了ep_insert来讲,这是因为其基本道出了epoll如此高效的原因,首先是epitem的创建使用了slab机制,利用缓存来加速我们的操作,其次是红黑树,这使得我们的增删改操作的复杂度均为O(logn),最重要的一点就是回调的设计,epoll会在插入一个节点的时候创建一个epoll_entry,然后在其中注册ep_ptable_queue_proc,fd会在poll的时候执行这个回调,同时调用ep_poll_callback,这使得被监控的fd被触发时直接就把epitem加入到rdllist中,这样rdllist中就全部都是就绪的fd了! 在有大量不活跃fd时有着显著的性能提升,除此之外我们还不必每次注册事件,这些都是在epitem中保存好的.
可以说epoll的精髓就在于这个神奇的回调.