Epoll 概述
epoll
是linux
中IO多路复用的一种机制,I / O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。当然linux中IO多路复用不仅仅是epoll
,其他多路复用机制还有select、poll,但是接下来介绍epoll
的内核实现。 网上关于epoll
接口的介绍非常多,这个不是我关注的重点,但是还是有必要了解。该接口非常简单,一共就三个函数,这里我摘抄了网上关于该接口的介绍:
int epoll_create(int size);
创建一个epoll
的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select() 中的第一个参数,给出最大监听的fd + 1的值。需要注意的是,当创建好epoll
句柄后,它就是会占用一个fd值,在linux下如果查看 / proc / 进程id / fd / ,是能够看到这个fd的,所以在使用完epoll
后,必须调用close() 关闭,否则可能导致fd被耗尽。 int epoll
_ctl(int epfd, int op, int fd, struct epoll
_event *event);
epoll
的事件注册函数,它不同与select() 是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll
_create() 的返回值,第二个参数表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
源码分析
当系统启动时,epoll
使用eventpoll_init
进行初始化:
static int __init eventpoll_init(void)
{
struct sysinfo si;
si_meminfo(&si);
/*
* Allows top 4% of lomem to be allocated for epoll watches (per user).
*/
max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) /
EP_ITEM_COST;
BUG_ON(max_user_watches < 0);
/*
* 初始化用于执行 epoll 文件描述符的结构
* 包含循环检查。
*/
ep_nested_calls_init(&poll_loop_ncalls);
#ifdef CONFIG_DEBUG_LOCK_ALLOC
/* Initialize the structure used to perform safe poll wait head wake ups */
ep_nested_calls_init(&poll_safewake_ncalls);
#endif
/*
* 我们可以有数千个epitems,所以要防止这种情况发生:
* 在64位(和更小)CPU上使用额外的缓存行
*/
BUILD_BUG_ON(sizeof(void *) <= 8 && sizeof(struct epitem) > 128);
/**/
/* 分配用于 分配“struct epitem”结构的slab缓存 */
epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),
0, SLAB_HWCACHE_ALIGN|SLAB_PANIC|SLAB_ACCOUNT, NULL);
/* Allocates slab cache used to allocate "struct eppoll_entry" */
pwq_cache = kmem_cache_create("eventpoll_pwq",
sizeof(struct eppoll_entry), 0, SLAB_PANIC|SLAB_ACCOUNT, NULL);
return 0;
}
fs_initcall(eventpoll_init);
从使用Slab
分配器动态分配内存开始看起.第一个结构为当系统中添加一个fd
时,就创建一epitem
结构体,也就是说每一个socket fd
都会对应一个epitem
结构体,对应于内核中构建的一个红黑树节点
struct epitem
{
union {
/* 将此结构链接到 eventpoll RB 树上 */
struct rb_node rbn;
/* 用于释放struct epitem */
struct rcu_head rcu;
};
struct list_head rdllink; /*链表节点,将此结构链接到 eventpoll 的 rdllist 中 */
struct epitem *next; //配合ovflist一起使用来保持单向链的条目
struct epoll_filefd ffd; //此条目引用的文件描述符信息
int nwait; //附加到poll轮询中的活跃等待队列数
struct eventpoll *ep; //epi所属的ep
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
struct epoll_event event; //监控的事件和文件描述符
};
可以看到,有一个struct eventpoll
结构,这个就是epollfd
对应的结构,其实就是相当于与用户打交道的第一层,也相当于红黑树结构.
struct eventpoll
{
//防止使用时被删除
struct mutex mtx;
// sys_epoll_wait() 使用的等待队列
wait_queue_head_t wq;
// file->poll() 使用的等待队列,用于 epollfd 本身被 poll 的时候
wait_queue_head_t poll_wait;
//事件满足条件的链表,这个我没有搞懂他与 ovflist 的区别?????你搞懂了吗?现在
/*有监听事件到来的套接字的该结构以双向链表组织起来,
链表头保存在eventpoll中(rdllist成员)*/
struct list_head rdllist;
/* RB树根 用于管理受监视的所有 fd 结构 */
struct rb_root_cached rbr;
//将事件到达的 fd 进行链接起来发送至用户空间,发送时又进行了包装
// 当正在向用户空间传递事件,则就绪事件会临时放到该队列,否则直接放到 rdllist
struct epitem *ovflist;
/* 保存了一些用户变量,比如fd监听数量的最大值等 */
struct user_struct *user;
struct file *file;
int visited; ////用于优化循环检测检查
struct list_head visited_list_link;
};
其中最最主要的两个结构就是struct epitem *ovflist;
和struct list_head rdllist;
,所以要理解他们哦
epoll_create函数
至于struct eventpoll
的创建自然是在调用了epoll_create
时:
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
return do_epoll_create(flags);
}
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return do_epoll_create(0);
}
/*
可以看出 size 真的没有什么暖用
原因:原来使用的是hash表,所以有size,
现在改为红黑树,故不使用. (摘自网上,不知真假)
*/
static int do_epoll_create(int flags)
{
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
// 创建内部数据结构eventpoll
error = ep_alloc(&ep);
//找到一个未使用的 fd
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
//创建file文件对象实例,以及匿名 inode节点和dentry 等数据结构
//(具体见我的深入理解文件系统的文章)
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
ep->file = file;
fd_install(fd, file); //建立fd和file的关联关系
return fd;
}
epoll_create
的过程主要是创建并初始化数据结构eventpoll
,以及创建file
实例,并将ep
放入file->private
中进行保存
epoll_create->get_unused_fd_flags
函数get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
有必要说一下,因为会涉及到前面讲的文件系统
int get_unused_fd_flags(unsigned flags)
{
return __alloc_fd(current->files, 0, rlimit(RLIMIT_NOFILE), flags);
}
linux内核中,current是个宏,返回的是一个task_struct结构(我们称之为进程描述符)的变量,表示的是当前进程,进程打开的文件资源保存在进程描述符的files
成员里面,所以current->files返回的当前进程打开的文件资源。rlimit(RLIMIT_NOFILE) 函数获取的是当前进程可以打开的最大文件描述符数,这个值可以设置,默认是1024。
__alloc_fd的工作是为进程在[start,end)之间(备注:这里start为0, end为进程可以打开的最大文件描述符数)分配一个可用的文件描述符
epoll_create->anon_inode_getfile
/*
创建一个 file 结构,并将它与一个匿名 inode 节点 anon_inode_inode 挂钩在一起
使用anon_inode_getfile()创建的所有文件对象将共享一个匿名inode,
因此节省了内存并避免了文件/inode/dentry的代码重复
*/
struct file *anon_inode_getfile(const char *name,
const struct file_operations *fops,
void *priv, int flags)
{
//priv 就是前面的 eventpoll 结构
struct file *file;
if (IS_ERR(anon_inode_inode))
return ERR_PTR(-ENODEV);
if (fops->owner && !try_module_get(fops->owner))
return ERR_PTR(-ENOENT);
/*
* We know the anon_inode inode count is always greater than zero,
* so ihold() is safe.
*/
ihold(anon_inode_inode);
file = alloc_file_pseudo(anon_inode_inode, anon_inode_mnt, name,
flags & (O_ACCMODE | O_NONBLOCK), fops);
if (IS_ERR(file))
goto err;
file->f_mapping = anon_inode_inode->i_mapping;
file->private_data = priv;
/*
file->private_data会指向这个ep变量
ep->file会指向该函数申请的file结构变量。
*/
return file;
err:
iput(anon_inode_inode);
module_put(fops->owner);
return file;
}
其中涉及到的file/dentry/inode对象
都可以 在此查看
file
结构:当进程打开一个文件时,内核就会为该进程分配一个file结构,表示打开的文件在进程的上下文,然后应用程序会通过一个int类型的文件描述符来访问这个结构,实际上内核的进程里面维护一个file结构的数组,而文件描述符就是相应的file结构在数组中的下标。
dentry
结构:(称之为“目录项”)记录着文件的位置,每个文件都只有一个dentry结构,然后一个进程可以多次打开一个文件,多个进程也可以打开同一个文件,这些情况,内核都会申请多个file结构,建立多个文件上下文。但是,对同一个文件来说,无论打开多少次,内核只会为该文件分配一个dentry。所以,file结构与dentry结构的关系是多对一的。
inode
结构:里面记录文件在存储介质上的位置和分布等信息,每个文件在内核中只分配一个inode。 dentry与inode描述的目标是不同的,一个文件可能会有好几个文件名(比如链接文件),通过不同文件名访问同一个文件的权限也可能不同。dentry文件所代表的是逻辑意义上的文件,记录的是其逻辑上的属性,而inode结构所代表的是其物理意义上的文件,记录的是其物理上的属性。dentry与inode结构的关系是多对一的关系。如下图:
epoll_create->fd_install(fd, file);
将fd与file交给关联在一起,之后,内核可以通过应用传入的fd参数访问file结构
epoll_ctl 函数
epoll_ctl接口的作用是添加/修改/删除文件的监听事件,内核代码如下:
//epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event);
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
int full_check = 0;
struct fd f, tf;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
struct eventpoll *tep = NULL;
error = -EFAULT;
//判断 op 参数,如果不是删除操作,需要将 __user *event 复制给 epds。
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;
error = -EBADF;
f = fdget(epfd); // 得到 epoll fd 对应的文件对象
tf = fdget(fd); // 得到 fd 对应的文件对象
/*接下来就是对参数的一些检查,出现如下情况,就可以认为传入的参数有问题,直接返回出错:
- 目标文件不支持poll操作(!tf.file->f_op->poll);
- 监听的目标文件就是epoll文件本身(f.file == tf.file);
- 用户传入的epoll文件(epfd代表的文件)并不是一个真正的
epoll的文件(!is_file_epoll(f.file));
- 如果操作动作是修改操作,并且事件类型为EPOLLEXCLUSIVE,返回出错等等。
*/
/* The target file descriptor must support poll */
error = -EPERM;
if (!file_can_poll(tf.file))
goto error_tgt_fput;
if (ep_op_has_event(op))
ep_take_care_of_epollwakeup(&epds);
error = -EINVAL;
if (f.file == tf.file || !is_file_epoll(f.file))
goto error_tgt_fput;
if (ep_op_has_event(op) && (epds.events & EPOLLEXCLUSIVE))
{
if (op == EPOLL_CTL_MOD)
goto error_tgt_fput;
if (op == EPOLL_CTL_ADD && (is_file_epoll(tf.file) ||
(epds.events & ~EPOLLEXCLUSIVE_OK_BITS)))
goto error_tgt_fput;
}
//取得 eventpoll 结构
ep = f.file->private_data;
.........;
//防止重复添加(在ep的红黑树中查找是否已经存在这个fd)
epi = ep_find(ep, tf.file, fd);
error = -EINVAL;
switch (op)
{
case EPOLL_CTL_ADD:
if (!epi)
{
//默认包含 POLLERR 和 POLLHUP 事件
epds.events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, &epds, tf.file, fd, full_check);
}
else
error = -EEXIST; //重复添加(在ep的红黑树中查找已经存在这个fd)。
if (full_check)
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi)
{
if (!(epi->event.events & EPOLLEXCLUSIVE))
{
epds.events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, &epds);
}
}
else
error = -ENOENT;
break;
}
if (tep != NULL)
mutex_unlock(&tep->mtx);
mutex_unlock(&ep->mtx);
error_tgt_fput:
if (full_check)
mutex_unlock(&epmutex);
fdput(tf);
error_fput:
fdput(f);
error_return:
return error;
}
epoll_ctl->ep_insert 以添加为例
将对目标文件的监听事件struct epitem
插入到ep维护的红黑树里面:
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
struct file *tfile, int fd, int full_check)
{
int error, pwake = 0;
__poll_t revents;
long user_watches;
struct epitem *epi;
struct ep_pqueue epq;
lockdep_assert_irqs_enabled();
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;
//从slab分配器里面分配一个epitem结构监听项,然后对该结构进行初始化
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;
/* Item initialization follow here ... */
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
if (epi->event.events & EPOLLWAKEUP)
{
error = ep_create_wakeup_source(epi);
if (error)
goto error_create_wakeup_source;
}
else
{
RCU_INIT_POINTER(epi->ws, NULL);
}
/* 初始化当对应事件发生时,内核需要调用的回调函数 */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
//注册回调函数(ep_ptable_queue_proc),目标对象产生事件就会
//调用ep_ptable_queue_proc回调函数。
revents = ep_item_poll(epi, &epq.pt, 1);
/* Add the current item to the list of active epoll hook for this file */
spin_lock(&tfile->f_lock);
list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
//插入 ep 的红黑树
ep_rbtree_insert(ep, epi);
/* now check if we've created too many backpaths */
error = -EINVAL;
if (full_check && reverse_path_check())
goto error_remove_epi;
/* We have to drop the new item inside our item list to keep track of it */
spin_lock_irq(&ep->wq.lock);
/* record NAPI ID of new item if present */
ep_set_busy_poll_napi_id(epi);
/* If the file is already "ready" we drop it inside the ready list */
if (revents && !ep_is_linked(epi))
{
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irq(&ep->wq.lock);
atomic_long_inc(&ep->user->epoll_watches);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 0;
error_remove_epi:
spin_lock(&tfile->f_lock);
list_del_rcu(&epi->fllink);
spin_unlock(&tfile->f_lock);
rb_erase_cached(&epi->rbn, &ep->rbr);
error_unregister:
ep_unregister_pollwait(ep, epi);
spin_lock_irq(&ep->wq.lock);
if (ep_is_linked(epi))
list_del_init(&epi->rdllink);
spin_unlock_irq(&ep->wq.lock);
wakeup_source_unregister(ep_wakeup_source(epi));
error_create_wakeup_source:
kmem_cache_free(epi_cache, epi);
return error;
}
其余的删除,修改与上面等同
ep_insert
中有一个与muduo queueInLoop
用法很相似的地方,但是我偏偏没有看得太懂!!!
epoll_wait 函数
epoll_wait等待事件的产生,内核代码如下:
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
int maxevents, int timeout)
{
int error;
struct fd f;
struct eventpoll *ep;
//参数检查
ep = f.file->private_data;
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);
}
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
u64 slack = 0;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;
/*
对等待时间的处理,timeout超时时间以ms为单位,timeout大于0,说明等待timeout时间后超时,
如果timeout等于0,函数不阻塞,直接返回,小于0的情况,是永久阻塞,直到有事件产生才返回。
*/
if (timeout > 0)
{
struct timespec64 end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&end_time);
to = &expires;
*to = timespec64_to_ktime(end_time);
}
else if (timeout == 0)
{
/*
* Avoid the unnecessary trip to the wait queue loop, if the
* caller specified a non blocking operation.
*/
timed_out = 1;
spin_lock_irq(&ep->wq.lock);
goto check_events;
}
fetch_events:
if (!ep_events_available(ep))
ep_busy_loop(ep, timed_out);
spin_lock_irq(&ep->wq.lock);
if (!ep_events_available(ep))
{
//没有事件产生
...;
}
check_events:
/* Is it worth to try to dig for events ? */
eavail = ep_events_available(ep);
spin_unlock_irq(&ep->wq.lock);
// 有事件产生,就调用ep_send_events函数,将events事件转移到用户空间里面。
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;
return res;
}
epoll_wait->ep_send_events->ep_scan_ready_list
ep_send_events
函数中什么都没有,只是调用了一下ep_scan_ready_list,直接来看ep_scan_ready_list
函数即可
static __poll_t ep_scan_ready_list(struct eventpoll *ep,
__poll_t (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv, int depth, bool ep_locked)
{
__poll_t res;
int pwake = 0;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
lockdep_assert_irqs_enabled();
if (!ep_locked)
mutex_lock_nested(&ep->mtx, depth);
/*
将ep->rdllist链表加入到txlist链表中去,这样的话rdllist链表就为空了
同时还将ep的ovflist链表设置为NULL,ovflist是用单链表,是一个接受就绪事件的备份链表
*/
spin_lock_irq(&ep->wq.lock);
list_splice_init(&ep->rdllist, &txlist);
ep->ovflist = NULL;
spin_unlock_irq(&ep->wq.lock);
/*
真正王用户空间拷贝数据的函数 ep_send_events_proc
*/
res = (*sproc)(ep, &txlist, priv);
spin_lock_irq(&ep->wq.lock);
/*
当内核进程将事件从内核拷贝到用户空间时,这段时间目标文件可能会产生新的事件,这个时候,
就需要将新的时间链入到ovlist里面。
*/
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR)
{
if (!ep_is_linked(epi))
{
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
return res;
}
epoll_wait->ep_send_events->ep_scan_ready_list->ep_send_events_proc
在这个函数中,我们就可以看到LT
与ET
到底有什么区别了
static __poll_t ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
__poll_t revents;
struct epitem *epi;
struct epoll_event __user *uevent;
struct wakeup_source *ws;
poll_table pt;
init_poll_funcptr(&pt, NULL);
/*
We can loop without lock because we are passed a task private list.
Items cannot vanish during the loop because ep_scan_ready_list() is
holding "mtx" during this call.
*/
for (esed->res = 0, uevent = esed->events;
!list_empty(head) && esed->res < esed->maxevents;)
{
epi = list_first_entry(head, struct epitem, rdllink);
/*
* Activate ep->ws before deactivating epi->ws to prevent
* triggering auto-suspend here (in case we reactive epi->ws
* below).
*
* This could be rearranged to delay the deactivation of epi->ws
* instead, but then epi->ws would temporarily be out of sync
* with ep_is_linked().
*/
ws = ep_wakeup_source(epi);
if (ws)
{
if (ws->active)
__pm_stay_awake(ep->ws);
__pm_relax(ws);
}
list_del_init(&epi->rdllink);
revents = ep_item_poll(epi, &pt, 1);
/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding "mtx", so no operations coming from userspace
* can change the item.
*/
if (revents)
{
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data))
{
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
if (!esed->res)
esed->res = -EFAULT;
return 0;
}
esed->res++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET))
{
/*
LT 模式时,将没有标记为ET模式的fd又放回到 rdllist 中,
这样下次就绪时又能将其发送至用户空间了
*/
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}
}
}
return 0;
}
明天完善该篇博客...........
epoll
机制实现了自己特有的文件系统eventpoll filesystem
通过epoll_ctl
接口加入该epoll
描述符监听的套接字则属于socket filesystem
,这点一定要注意。
select ,poll,epoll 最强区别
epoll就是自己保存拷入的fd,它的API就已经说明了这一点——不是 epoll_wait的时候才传入fd,而是通过epoll_ctl把所有fd传入内核再一起"wait",这就省掉了不必要的重复拷贝。其次,在 epoll_wait时,也不是把current轮流的加入fd对应的设备等待队列,而是在设备等待队列醒来时调用一个回调函数(当然,这就需要“唤醒回调”机制),把产生事件的fd归入一个链表,然后返回这个链表上的fd。
epoll为什么高效(相比select)
l 仅从上面的调用方式就可以看出epoll比select/poll的一个优势:select/poll每次调用都要传递所要监控的所有fd给select/poll系统调用(这意味着每次调用都要将fd列表从用户态拷贝到内核态,当fd数目很多时,这会造成低效)。而每次调用epoll_wait时(作用相当于调用select/poll),不需要再传递fd列表给内核,因为已经在epoll_ctl中将需要监控的fd告诉了内核(epoll_ctl不需要每次都拷贝所有的fd,只需要进行增量式操作)。所以,在调用epoll_create之后,内核已经在内核态开始准备数据结构存放要监控的fd了。每次epoll_ctl只是对这个数据结构进行简单的维护。
l 此外,内核使用了slab机制,为epoll提供了快速的数据结构:
在内核里,一切皆文件。所以,epoll向内核注册了一个文件系统,用于存储上述的被监控的fd。当你调用epoll_create时,就会在这个虚拟的epoll文件系统里创建一个file结点。当然这个file不是普通文件,它只服务于epoll。epoll在被内核初始化时(操作系统启动),同时会开辟出epoll自己的内核高速cache区,用于安置每一个我们想监控的fd,这些fd会以红黑树的形式保存在内核cache里,以支持快速的查找、插入、删除。这个内核高速cache区,就是建立连续的物理内存页,然后在之上建立slab层,简单的说,就是物理上分配好你想要的size的内存对象,每次使用时都是使用空闲的已分配好的对象。
l epoll的第三个优势在于:当我们调用epoll_ctl往里塞入百万个fd时,epoll_wait仍然可以飞快的返回,并有效的将发生事件的fd给我们用户。这是由于我们在调用epoll_create时,内核除了帮我们在epoll文件系统里建了个file结点,在内核cache里建了个红黑树用于存储以后epoll_ctl传来的fd外,还会再建立一个list链表,用于存储准备就绪的事件,当epoll_wait调用时,仅仅观察这个list链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。而且,通常情况下即使我们要监控百万计的fd,大多一次也只返回很少量的准备就绪fd而已,所以,epoll_wait仅需要从内核态copy少量的fd到用户态而已。那么,这个准备就绪list链表是怎么维护的呢?当我们执行epoll_ctl时,除了把fd放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个fd的中断到了,就把它放到准备就绪list链表里。所以,当一个fd(例如socket)上有数据到了,内核在把设备(例如网卡)上的数据copy到内核中后就来把fd(socket)插入到准备就绪list链表里了。
如此,一颗红黑树,一张准备就绪fd链表,少量的内核cache,就帮我们解决了大并发下的fd(socket)处理问题。
1.执行epoll_create时,创建了红黑树和就绪list链表。
2.执行epoll_ctl时,如果增加fd(socket),则检查在红黑树中是否存在,存在立即返回,不存在则添加到红黑树上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪list链表中插入数据。
3.执行epoll_wait时立刻返回准备就绪链表里的数据即可。