I/O复用技术是重要的提高服务器工作效率和性能的手段,Linux下实现I/O复用的系统调用主要有select、poll和epoll。
首先我们来看一下select的函数原型和常用的宏:
#include<sys/select.h>
int select(int nfds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
FD_ZERO(fd_set *fdset); //清除fdset所有位
FD_SET(int fd, fd_set* fdset); //设置fdset的位
fd FD_CLR(int fd, fd_set* fdset); //清除fdset的位
fd int FD_ISSET(int fd, fd_set* fdset); //测试fdset的位fd是否被设置
首先来看select函数原型,nfds指定了被监听的文件描述符的总数,其值通常被设定为所有文件描述符的最大值加一,接下来的三个fd_set*类型的参数分别指向可读可写和异常事件对应的文件描述符集合,最后一个参数是一个微秒级的定时器,表示select阻塞这个时间后继续执行,如果为0则立即返回,如果为NULL将一直阻塞。
通过观察fd_set结构体的原型,我们发现其仅包含一个整形数组,该数组的每一位都标记了一个文件描述符,所以select有最大可监控的文件描述符的限制。后面的宏是为了简化对于fd_set的位操作。select函数成功时返回就绪文件描述符的总数,如果在超时时间内没有任何文件描述符就绪,则select返回0,如果在select阻塞期间程序收到信号,则select立即返回-1并置errno为EINTR。
select在何种情况下会认为文件描述符产生了可读、可写或异常情况呢?首先,当socket处于以下状态会认为其可读:1)socket内核接收缓冲区中的字节数大于或等于其低水位标记,此时我们可以无阻塞地读该socket,且读操作返回值大于0;2)socket的对端关闭连接,此时读操作返回0;3)监听socket上有新的请求;4)socket上有未处理的错误。而以下状态会认为socket可写:1)socket内核发送缓冲区中的可用字节数大于或等于其低水位标记,此时我们可以无阻塞地写该socket,且写操作返回值大于0;2)socket的写操作被关闭,对写操作关闭的socket执行写操作会触发SIGPIPE信号;3)socket使用非阻塞connect连接成功或者失败(超时)之后;4)socket上有未处理的错误。而异常情况只有一种,就是产生了带外数据。
一个例子看一下select程序如何来写以及select如何同时处理普通数据和带外数据的:
#include<iostream>
#include<unistd.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<stdio.h>
#include<errno.h>
#include<string.h>
#include<fcntl.h>
#include<stdlib.h>
#include<assert.h>
#include<sys/epoll.h>
using namespace std;
int main(int argc, char** argv) {
if(argc <= 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port);
inet_pton(AF_INET, ip, &address.sin_addr);
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
if(connfd < 0) {
printf("errno is: %d\n", errno);
}
char buf[1024];
fd_set read_fds;
fd_set exception_fds;
FD_ZERO(&read_fds);
FD_ZERO(&exception_fds);
while(1) {
memset(buf, 0, sizeof(buf));
FD_SET(connfd, &read_fds);
FD_SET(connfd, &exception_fds);
ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL);
if(ret < 0) {
printf("selection failure\n");
break;
}
if(FD_ISSET(connfd, &read_fds)) {
ret = recv(connfd, buf, sizeof(buf), 0);
if(ret <= 0) break;
printf("get %d bytes of normal data: %s\n", ret, buf);
}
memset(buf, 0, sizeof(buf));
if(FD_ISSET(connfd, &exception_fds)) {
ret = recv(connfd, buf, sizeof(buf), MSG_OOB);
if(ret <= 0) break;
printf("get %d bytes of oob data: %s\n", ret, buf);
}
}
close(connfd);
close(listenfd);
return 0;
}
epoll是Linux特有的I/O复用函数,其实现和select、poll有很大区别。epoll将用户关心的文件描述符上的事件放在内核里的一个事件表中,从而无须像select和poll那样每次调用都要重复传入文件描述符集或事件集,但是epoll需要一个额外的文件描述符来标识内核中的这个事件表。与poll不同的是,epoll如果检测到事件,就将所有就绪时间从内核时间表中复制到events指向的数组中,这样就极大提高了应用程序检索就绪文件描述符的效率,从O(n)的时间复杂度降为了O(1)。我们来看一下epoll的几个函数:
#include<sys/epoll.h>
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
创建epoll的函数size参数现在是没有用处的,只是给内核一个提示,告诉它事件表需要多大。操作epoll的函数中op参数指定了操作类型,一共有注册、修改和删除三种,而event参数则描述了事件。
epoll对于文件描述符的操作有两种模式:
LT和ET模式
LT(Level Triggered,电平触发):LT模式是epoll默认的工作模式,也是select和poll的工作模式,在LT模式下,epoll相当于一个效率较高的poll。
采用LT模式的文件描述符,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理此事件,当下一次调用epoll_wait是,epoll_wait还会将此事件通告应用程序。
ET(Edge Triggered,边沿触发):当调用epoll_ctl,向参数event注册EPOLLET事件时,epoll将以ET模式来操作该文件描述符,ET模式是epoll的高效工作模式.
对于采用ET模式的文件描述符,当epoll_wait检测到其上有事件发生并将此通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不在向应用程序通知这一事件。ET模式降低了同意epoll事件被触发的次数,效率比LT模式高。
using namespace std;
#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10
//设置文件描述符为非阻塞模式
int setnonblocking(int fd) {
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
//以两种不同模式将事件注册到epoll中
void addfd(int epollfd, int fd, bool enable_et) {
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN;
if(enable_et) event.events |= EPOLLET;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
void lt(epoll_event* events, int number, int epollfd, int listenfd) {
char buf[BUFFER_SIZE];
for(int i = 0; i < number; i ++) {
int sockfd = events[i].data.fd;
if(sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd, false);
}
else if(events[i].events & EPOLLIN) {
printf("event trigger once\n");
memset(buf, 0, sizeof(buf));
int ret = recv(sockfd, buf, BUFFER_SIZE, 0);
if(ret <= 0) {
close(sockfd);
continue;
}
printf("get %d bytes of content: %s\n", ret, buf);
}
else printf("something else happened\n");
}
}
void et(epoll_event* events, int number, int epollfd, int listenfd) {
char buf[BUFFER_SIZE];
for(int i = 0; i < number; i ++) {
int sockfd = events[i].data.fd;
if(sockfd == listenfd) {
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
addfd(epollfd, connfd, true);
}
else if(events[i].events & EPOLLIN) {
//这段代码不会被重复触发,所以我们循环读取
printf("event trigger once\n");
while(1) {
memset(buf, 0, sizeof(buf));
int ret = recv(sockfd, buf, BUFFER_SIZE, 0);
if(ret < 0) {
//非阻塞模式的I/O,当下面的条件成立表示数据已经全部取走
if((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
printf("read later\n");
break;
}
close(sockfd);
break;
}
else if(ret == 0) close(sockfd);
else printf("get %d bytes of content: %s\n", ret, buf);
}
}
else printf("something else happened\n");
}
}
int main(int argc, char** argv) {
if(argc <= 2) {
printf("usage: %s ip_address port_number\n", basename(argv[0]));
return 1;
}
const char* ip = argv[1];
int port = atoi(argv[2]);
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(port);
inet_pton(AF_INET, ip, &address.sin_addr);
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
epoll_event events[MAX_EVENT_NUMBER];
int epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd, listenfd, true);
while(1) {
int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if(ret < 0) {
printf("epoll failure\n");
break;
}
et(events, ret, epollfd, listenfd);
}
close(listenfd);
return 0;
}