1.客户端:
客户端程序使用poll同事监听用户输入和网络连接,并利用splice函数将用户输入内容直接定向到网络连接上以发送之,从而实现数据零拷贝,提高了程序执行效率。
splice函数简介:
#include <fcntl.h>
ssize_t splice(int fd_in, loff_t *off_in, int fd_out, loff_t *off_out, size_t len, unsigned int flags);
splice用于在两个文件描述符之间移动数据, 也是零拷贝。
①fd_in参数是待输入描述符。如果它是一个管道文件描述符,则②off_in必须设置为NULL;否则off_in表示从输入数据流的何处开始读取,此时若为NULL,则从输入数据流的当前偏移位置读入。
③/④fd_out/off_out与上述相同,不过是用于输出。
⑤len参数指定移动数据的长度。
⑥flags参数则控制数据如何移动:
- SPLICE_F_NONBLOCK:splice 操作不会被阻塞。然而,如果文件描述符没有被设置为不可被阻塞方式的 I/O ,那么调用 splice 有可能仍然被阻塞。
- SPLICE_F_MORE:告知操作系统内核下一个 splice 系统调用将会有更多的数据传来。
- SPLICE_F_MOVE:如果输出是文件,这个值则会使得操作系统内核尝试从输入管道缓冲区直接将数据读入到输出地址空间,这个数据传输过程没有任何数据拷贝操作发生。
使用splice时, fd_in和fd_out中必须至少有一个是管道文件描述符。
调用成功时返回移动的字节数量;它可能返回0,表示没有数据需要移动,这通常发生在从管道中读数据时而该管道没有被写入的时候。
失败时返回-1,并设置errno。
客户端程序如下:poll监听标准输入,直接重定向到sockfd。
/*************************************************************************
> File Name: client.c
> Author: dulun
> Mail: dulun@xiyoulinux.org
> Created Time: 2016年07月19日 星期二 11时01分09秒
************************************************************************/
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<poll.h>
#include<fcntl.h>
#include<assert.h>
#define BUFFER_SIZE 64
int main()
{
const char * ip = "127.0.0.1";
int port = 10086;
struct sockaddr_in server_address;
bzero( &server_address, sizeof(server_address) );
server_address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &server_address.sin_addr );
server_address.sin_port = htons(port);
int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
assert(sockfd >= 0);
if( connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0 )
{
printf("connetcion failed\n");
close(sockfd);
return 1;
}
printf("connetc over!\n");
pollfd fds[2]; //0:标准输入
fds[0].fd = 0;
fds[0].events = POLLIN; //可读事件
fds[0].revents = 0; //内核处理
//1:套接字描述符
fds[1].fd = sockfd;
fds[1].events = POLLIN | POLLRDHUP; //可读或对方中断
fds[1].revents = 0;
char read_buf[BUFFER_SIZE]; //读缓存, 用于接收
int pipefd[2];
int ret = pipe(pipefd);
assert(ret != -1);
while(1)
{
ret = poll(fds, 2, -1); //阻塞,监听两个感兴趣的fd
if(ret < 0)
{
printf("poll failed\n");
break;
}
if(fds[1].revents & POLLRDHUP)
{
printf("server close the connetcion\n");
break;
}
else if( fds[1].revents & POLLIN )
{
memset( read_buf, 0, BUFFER_SIZE );
recv(fds[1].fd, read_buf, BUFFER_SIZE - 1, 0);
printf("%s\n", read_buf);
}
if(fds[0].revents & POLLIN)
{
//标准输入输出 通过管道 。去sockfd
//从标准输入读 ,进管道, 大小32768 更多或文件直接走内核缓冲区
ret = splice( 0, NULL, pipefd[1], NULL, 32768 , SPLICE_F_MORE | SPLICE_F_MOVE);
//从管道读, 进套接字描述符, 大小32768 更多或文件直接走内核缓冲区
ret = splice( pipefd[0], NULL, sockfd, NULL, 32768 , SPLICE_F_MORE | SPLICE_F_MOVE);
}
}
close(sockfd);
return 0;
}
2.服务器:
服务器程序使用poll同时 监听 和 连接 socket,并使用牺牲空间换取时间策略提高服务器性能:
/*************************************************************************
> File Name: server.cpp
> Author: dulun
> Mail: dulun@xiyoulinux.org
> Created Time: 2016年07月19日 星期二 11时26分40秒
************************************************************************/
#include<iostream>
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<algorithm>
#include<sys/socket.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<fcntl.h>
#include<poll.h>
#include<errno.h>
#include<unistd.h>
using namespace std;
#define USER_LIMIT 100
#define BUFFER_SIZE 64
#define FD_LIMIT 65535
struct client_data
{
sockaddr_in address;
char * write_buff;
char buf[BUFFER_SIZE];
};
int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}
int main()
{
const char *ip = "127.0.0.1";
int port = 10086;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
int ret;
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); //命名套接字。
assert(ret != -1);
ret = listen(listenfd, USER_LIMIT); //监听
assert(ret != -1);
client_data * users = new client_data[FD_LIMIT];
pollfd fds[USER_LIMIT+1];
int user_counter = 0;
for(int i = 1; i <= USER_LIMIT; i++)
{
fds[i].fd = -1;
fds[i].events = 0;
}
//初始化第一个
fds[0].fd = listenfd;
fds[0].events = POLLIN | POLLERR;
fds[0].revents = 0;
while(1)
{
ret = poll( fds, user_counter+1, -1 );
if(ret < 0)
{
printf("POLL failed\n");
break;
}
for(int i = 0; i < user_counter+1; i++)
{
if((fds[i].fd == listenfd) && (fds[i].revents & POLLIN) )
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof(client_data);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
printf("accept\n");
if(connfd < 0)
{
printf("errno is : %d \n", errno);
continue;
}
if(user_counter >= USER_LIMIT)
{
const char * info = "too many users\n";
printf("%s", info);
send(connfd, info, strlen(info), 0);
close(connfd);
continue;
}
user_counter++;
users[connfd].address = client_address;
setnonblocking(connfd);
fds[user_counter].fd = connfd;
fds[user_counter].events = POLLIN | POLLERR;
fds[user_counter].revents = 0;
printf("comes a new user, now have %d users\n", user_counter);
}
else if(fds[i].revents & POLLERR) //ERROR , fds[i].revent & POLLERR 为真,表示,发生错误,下同
{
printf("get an error form %d\n", fds[i].fd);
char errors[100];
memset(errors, 0, sizeof(errors));
socklen_t length = sizeof(errors);
if( getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &length) < 0)
{
printf("get socket option failed\n");
}
continue;
}
else if(fds[i].revents & POLLIN)
{
int connfd = fds[i].fd;
memset(users[connfd].buf, 0, BUFFER_SIZE);
ret = recv(connfd, users[connfd].buf, BUFFER_SIZE-1, 0);
printf("get %d bytes of client data %s from %d \n", ret, users[connfd].buf, connfd);
//sleep(10000); //test!
if(ret < 0)
{
if(errno != EAGAIN)
{
close(connfd);
users[fds[i].fd] = users[fds[user_counter].fd];
fds[i] = fds[user_counter];
i--;
user_counter--;
}
}
else if( ret == 0 )
{
}
else{
for(int j = 1; j <= user_counter; j++)
{
if(fds[j].fd == connfd) //不等于自己,不给自己设置读写事件
{
continue;
}
fds[j].events |= ~POLLIN; //取消读事件
fds[j].events |= POLLOUT; //设置写事件,下次while循环,直接进下一个else if
users[fds[j].fd].write_buff = users[connfd].buf;
}
}
}
else if(fds[i].revents & POLLOUT) //监听到写事件(次数:user_counter-1)
{
int connfd = fds[i].fd;
if( !users[connfd].write_buff )
{
continue;
}
ret = send(connfd, users[connfd].write_buff, strlen(users[connfd].write_buff), 0); //往过发
users[connfd].write_buff = NULL; //清缓存
fds[i].events |= ~POLLOUT; //取消写时间
fds[i].events |= POLLIN; //设置读事件
}
}
}
delete []users;
close(listenfd);
return 0;
}