要想实现并发编程,最简单的模式就是1个进程/线程处理1个连接的全部生命周期,当我们使用默认的套接字时,它会将一个新的连接与一个进程或线程绑定,这样等待处理消息的进程或线程就会阻塞而等待消息准备好,在高并发下这会导致进程/线程频繁的睡眠、唤醒,从而影响了CPU的效率。而多路复用则是解决这一问题的“利器”,它可以同时监控所有的连接,它虽然也会进入睡眠等待阶段,但是这一个进程/线程是为所有连接而等待,只要有一个连接准备好了,我们的进程/线程就会被唤醒,这样就大大提高了效率。简单来说,多路复用首先需要构造一张描述符表,然后调用一个函数,直到这些描述符中的一个已准备好时,该函数才返回。
目前支持I/O复用的系统调用有:select、pselect、poll、epoll
1. select
select 函数原型如下:
#include<sys/select.h>
int select( int nfds, fd_set *restrict readfds, fd_set *restrict writefds, fd_set *restrict exceptfds, struct timval *restrict tvptr);
//select返回准备就绪的描述符数目;若超时,返回0;若出错,返回-1。
//restrict: 表明指针是访问这个数据对象的唯一方式,即它告诉编译器,所有修改该指针所指向内存内容的操作都必须通过该指针来修改
①第一个参数的nfds的意思是“最大文件描述符编号值+1”.考虑readfds、writefds和exceptfds这三个描述符集,在这三个描述符集中找出最大描述符编号值,然后加1.因为描述符编号从0开始,所以要在最大描述符编号值上加1。也可以将第一个参数设置为 FD_SETSIZE,这是<sys/select.h>中的一个常量__FD_SETSIZE,它指定最大描述符数,通常是1024,但是对于大多数程序而言,这个值就太大了。通过指定我们所关注的最大描述符数,内核就只需要在此范围内寻找打开的位,而不必在3个描述符集中的数百个没有使用的位内寻找。
eg:select (4, &readfds, &writefds, NULL, NULL);
执行效果如下图:
②中间三个参数是指向描述符的指针,这三个描述符集说明了我们关心的可读、可写或处于异常条件的描述符集合。描述符不受限于套接字,任何描述符都行。每个描述符集都存储在一个fd_set数据类型中。这个数据类型由实现选择,它可以为每一个可能的描述符保持一位,我们可以认为它是一个很大的字节数组,即该数组的每一个元素即每一位存储一个文件描述符,位数组的每一位代表其对应的描述符是否需要被检查。这三个参数既是输入参数也是输出参数,可能会被内核修改用于标示哪些描述符上发生了关注的事件。所以每次调用select前都需要重新初始化fdset。我们可以通过下列4个函数堆fd_set数据类型的数据进行处理。
#include <sys/select.h>
int FD_ISSET(int fd, fd_set *fd_set) //测试fd_set的位fd是否已经打开
void FD_CLR(int fd, fd_set *fd_set) //清除fd_set中的一位
void FD_SET(int fd, fd_set *fd_set) //开启fd_set中的一位
void FD_ZERO(fd_set *fdset) //将一个fd_set变量的所有位置0
③最后一个参数指定了愿意等待的时间长度,单位为秒和微秒。有以下三种情况:
a.tvptr=NULL。永远等待,直到指定的描述符中的一个以已经准备好或者捕捉到一个信号才返回。
b.tvptr->tv_sec == 0 && tvptr->tv_usec == 0。根本不等待,测试所有指定的描述符并立即返回。(轮询)
c.tvptr->tv_sec != 0 || tvptr->tv_usec != 0。等待指定的秒数和微秒数,当指定的描述符中的一个以已经准备好指定的时间值已经超过时立即返回。
select()函数实现I/O多路复用的步骤
(1)清空描述符集合
(2)建立需要监视的描述符与描述符集合的关系
(3)调用select函数
(4)检查监视的描述符判断是否已经准备好
(5)对已经准备好的描述符进程IO操作
下面是检测一个socket是否有数据可读的例子:
fd_set rdfds; // 先申明一个 fd_set 集合来保存我们要检测的 socket句柄
struct timeval tv; // 申明一个时间变量来保存时间
int ret; //保存返回值
FD_ZERO(&rdfds); // 用select函数之前先把集合清零
FD_SET(socket, &rdfds); //把要检测的句柄socket加入到集合里
tv.tv_sec = 1;
tv.tv_usec = 500; //设置select等待的最大时间为1秒加500毫秒
ret = select(socket + 1, &rdfds, NULL, NULL, &tv); //检测我们上面设置到集合rdfd里的句柄是否有可读信息
if(ret < 0)
perror("select");// 这说明select函数出错
else if(ret == 0)
printf("超时\n"); //说明在我们设定的时间值1秒加500毫秒的时间内socket的状态没有发生变化
else { /* 说明等待时间还未到1秒加500毫秒,socket的状态发生了变化 */
printf("ret=%d\n", ret); //ret这个返回值记录了发生状态变化的句柄的数目,由于我们只监视了socket这一个句柄,所以这里一定ret=1,如果同时有多个句柄发生变化返回的就是句柄的总和了
if(FD_ISSET(socket, &rdfds)) { //从select返回时,用FD_ISSET测试一下这个socket是否真的变成可读的了
recv(...);//读取socket句柄里的数据
}
}
2.pselect
#include<sys/select.h>
int pselect( int nfds, fd_set *restrict readfds, fd_set *restrict writefds, fd_set *restrict exceptfds, const struct timespec *restrict tsptr, const struct sigset_t *restrict sigmask);
pselect与select的有以下不同点:
①select的超时值用timeval结构指定,pselect用timespec结构,以以秒和纳秒表示超时值,提供了更精确的超时时间。
struct timeval {
time_t tv_sec;
long tv_usec;
}
struct timespec {
time_t tv_sec;
long tv_nsec;
}
②pselect 的超时值被声明为const,因为在select中,超时值会被内核修改,其值为超时剩余的时间,而const保证了调用pselect不会改变此值。
③ pselect函数增加了第六个参数:pselect可使用可选信号屏蔽字,一个指向信号掩码的指针。当sigmask为NULL时,在信号有关的方面,pselect与select运行状况相同。否则,sigmask指向一信号屏蔽字,在调用pselect时,以原子操作的方式安装该信号屏蔽字。在返回时,恢复以前的信号屏蔽字。即pselect()函数等待的这段时间内不会被别的信号打断。
3.poll
poll函数提供的功能与select函数类似,但是接口不同(实质就是描述fd集合的方式不同)
#include<poll.h>
int poll (struct pollfd *fdarray[], nfds_t nfds, int timeout);
//返回就绪的描述符数目;若超时,返回0;若出错,返回-1
可以看到,poll不是为每个条件(可读、可写、异常)构造一个一个描述符集,而是构造一个pollfd结构的数组(fdarray数组中的元素由第二个参数nfds指定),每个数组元素指定一个描述符编号以及我们对该描述符感兴趣的条件。
struct pollfd {
int fd;
short events;
short revents;
}
每个元素中events成员可以设置为以下所示值中的一个或几个,通过这些值来告诉内核我们关心的是这个描述符的那些事件。下表中前4行是测试可读性,中间3行测试可写性。注意最后3行不由我们指定,而是在返回时由内核设置的。即要测试的条件由events成员指定,而返回的结果则在revents中存储。
因为poll是通过pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。
这里是网上找到的一个例子:
/* 检测两个文件描述符,分别为一般数据和高优先数据。如果事件发生
则用相关描述符和优先度调用函数handler(),无时间限制等待,直到
错误发生或描述符挂起。*/
#define NORMAL_DATA 1
#define HIPRI_DATA 2
int poll_two_normal(int fd1,int fd2)
{
struct pollfd poll_list[2];
int retval;
poll_list[0].fd = fd1;
poll_list[1].fd = fd2;
poll_list[0].events = POLLIN|POLLPRI;
poll_list[1].events = POLLIN|POLLPRI;
while(1)
{
retval = poll(poll_list,(unsigned long)2,-1);
/* retval 总是大于0或为-1,因为我们在阻塞中工作 */
if(retval < 0)
{
fprintf(stderr,"poll错误: %s\n",strerror(errno));
return -1;
}
if(((poll_list[0].revents&POLLHUP) == POLLHUP) ||
((poll_list[0].revents&POLLERR) == POLLERR) ||
((poll_list[0].revents&POLLNVAL) == POLLNVAL) ||
((poll_list[1].revents&POLLHUP) == POLLHUP) ||
((poll_list[1].revents&POLLERR) == POLLERR) ||
((poll_list[1].revents&POLLNVAL) == POLLNVAL))
return 0;
if((poll_list[0].revents&POLLIN) == POLLIN)
handle(poll_list[0].fd,NORMAL_DATA);
if((poll_list[0].revents&POLLPRI) == POLLPRI)
handle(poll_list[0].fd,HIPRI_DATA);
if((poll_list[1].revents&POLLIN) == POLLIN)
handle(poll_list[1].fd,NORMAL_DATA);
if((poll_list[1].revents&POLLPRI) == POLLPRI)
handle(poll_list[1].fd,HIPRI_DATA);
}
}
4.epoll
epoll与select方式的最大区别首先在于select所用到的FD_SET是有限的,而epoll的FD上限则是最大可以打开文件的数目,具体数目可以cat /proc/sys/fs/file-max察看。poll虽然不存在FD个数的限制,但是任一时间只有部分fd是“活跃”的,而select/poll每次调用都需要在内核遍历传递进来的所有fd,当fd个数过大时,会导致效率呈线性下降。epoll的出现就解决了这个问题。epoll提供了3个函数:
#include <sys/epoll.h>
int epoll_create(int maxfds);
创建一个
epoll
句柄,其中
maxfds
是
epoll
所支持的最大句柄数,它来告诉内核这个监听的数目一共有多大。要注意的是,当创建好
epoll
句柄后,它就是会占用一个
fd
值,在
linux
下如果查看
/proc/
进程
id/fd/
,是能够看到这个
fd
的,所以在使用完
epoll
后,必须调用
close()
关闭,否则可能导致
fd
被耗尽。
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
注册要监听的事件类型。epoll_ctl可以操作上面建立的epoll,例如,将刚建立的socket加入到epoll中让其监控,或者把 epoll正在监控的某个socket句柄移出epoll,不再监控它等等。
①epfd即为epoll_create函数创建的fd
②op参数指定了操作类型,表示动作,用三个宏来表示:
EPOLL_CTL_ADD:注册新的fd到epfd中;
EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
EPOLL_CTL_DEL:从epfd中删除一个fd;
③ 第三个参数是需要监听的fd。
④第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
//感兴趣的事件和被触发的事件
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
//保存触发事件的某个文件描述符相关的数据
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
events可以是以下几个宏的集合:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
EPOLLOUT:表示对应的文件描述符可以写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event * events, int max_events, int timeout);
//返回需要处理的事件数目,若超时,返回0;若出错,返回-1
用来等待在epoll监控的事件中已经产生的事件,类似于select()调用。当该函数操作成功后,参数events将储存从内核得到所有就绪读写事件的集合,max_events告之内核这个events有多大,即当前需要监听的所有socket句柄数。在给定的timeout时间内,当在监控的所有句柄中有事件发生时,就返回用户态的进程。但其实这里返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里是通过内核与用户空间映射(mmap)同一块内存实现的,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
几乎所有的epoll程序都使用下面的框架:
for( ; ; )
{
nfds = epoll_wait(epfd, events, 20, 500);
for(i = 0; i < nfds; i++) {
if(events[i].data.fd == listenfd) { //有新的连接
connfd = accept(listenfd, (sockaddr *)%clientaddr, &clilen);//accept这个连接
ev.data.fd = connfd;
ev.events = EPOLLIN|EPOLLET;
epoll_clt(epfd, EPOLL_CTL_ADD, connfd, &ev);//将新fd添加到epoll的监听队列中
}
else if(events[i].events & EPOLLIN) {//接收到数据,读socket
n = read(sockfd, line, MAXLINE);
ev.data.ptr = md;
ev.events = EPOLLOUT|EPOLLET;
epoll_ctl(epfd, EPOLL_CLT_MOD, sockfd, &ev);//修改标识符,等待下一个循环时发送数据
}
else if(events[i].events & EPOLLOUT){//有数据待发送
struct myepoll_data *md = (myepoll_data *)events[i].data.ptr;//取数据
sockfd = md->fd;
send(sockfd, md->ptr, strlen((char *)md->ptr), 0); //发送数据
ev.data.fd = sockfd;
ev.events = EPOLLIN|EPOLLET;
epoll_ctl(epfd, EPOLL_CLT_MOD, sockfd, &ev);//修改标识符,等待下一个循环时接收数据
}
else{
// 其他处理
}
}
}
EPOLL事件有两种模型:
Edge Triggered (ET)边缘触发
Level Triggered (LT)水平触发
ET与LT的区别在于,当一个新的事件到来时,ET模式下当然可以从epoll_wait调用中获取到这个事件,可是如果这次没有把这个事件对应的套接字缓冲区处理完,在这个套接字中没有新的事件再次到来时,在ET模式下是无法再次从epoll_wait调用中获取这个事件的。而LT模式正好相反,只要一个事件对应的套接字缓冲区还有数据,就总能从epoll_wait中获取这个事件。也就是只要句柄满足某种状态,水平触发就会发出通知;而只有当句柄状态改变时,边缘触发才会发出通知。例如一个socket经过长时间等待后接收到一段100k的数据,两种触发方式都会向程序发出就绪通知。假设程序从这个socket中读取了50k数据,并再次调用监听函数,水平触发依然会发出就绪通知,而边缘触发会因为socket“有数据可读”这个状态没有发生变化而不发出通知且陷入长时间的等待。因此在使用边缘触发的 api 时,要注意每次都要读到 socket返回 EWOULDBLOCK为止。
ET/LT 模式的处理逻辑几乎完全相同,差别仅在于 LT 模式在 event 发生时不会将其从 ready list 中移除,这2种使用方式针对的仍然是效率问题,只不过变成了epoll_wait返回的连接如何能够更准确些。
三者比较
select对应于内核中的sys_select调用,sys_select首先将第二三四个参数指向的fd_set拷贝到内核,然后对每个被SET的描述符调用进行poll,并记录在临时结果中(fdset),如果有事件发生,select会将临时结果写到用户空间并返回;频繁的内存拷贝导致select需要复制大量的句柄数据结构,产生巨大的开销。当数十万并发连接存在时,可能每一毫秒只有数百个活跃的连接,同时其余数十万连接在这一毫秒是非活跃的,而select则返回了全部待监控的连接,显而易见,这是一种无效率的表现。当处理并发上万个连接时,select就完全力不从心了。况且应用程序需要遍历整个列表才能发现哪些句柄发生了事件,这就使得查找配对速度变慢,而当FD很大时,就会产生超时现象。
poll的实现机制与select类似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,然后对pollfd中的每个描述符进行poll,相比处理fdset来说,poll效率更高。poll返回后,需要对pollfd中的每个元素检查其revents值,来得知事件是否发生。与select相比,因为poll是通过pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制。但是除了这一点,select的缺点poll也同样拥有。
epoll比select和poll聪明的地方在于,epoll_wait不需要将所有待监控的连接传入,也就是说它不用每次调用都向内核拷贝事件描述信息,在第一次调用后,事件信息就会与对应的epoll描述符关联起来。而且epoll不是通过轮询,而是通过在等待的描述符上注册回调函数,当事件发生时,回调函数负责把发生的事件存储在就绪事件链表中,最后写到用户空间。这样就很好的分清了频繁调用和不频繁调用的操作,而且它也不会随着并发连接的增加使得入参越发多起来,导致内核执行效率下降。与此同时,epoll在FD的传送过程中,避免了内存与用户空间之间的频繁的拷贝问题,使用了内存映射(mmap),这意味着内核可以直接看到epoll监听的句柄,彻底省掉了这些文件描述符在系统调用时复制的开销。而它在内核态保存所有待监控的连接的数据结构,就是一棵红黑树,每次调用epoll_ctl就是在往红黑树里添加或删除节点(socket句柄)。