昨天被一个问题困扰到,最后发现这个问题就不是个问题。
怎么说呢?就是写服务器和客户端程序,两端想互相发送文件,对于小文件来说的话,是没啥,感觉不到发送过程的缓慢,但是换成一个几百兆的文件,就有点累了,得等上个几分钟才能发完,我就想为什么不能实现秒传。所以就在网上学习了多线程发送技术。实现了近百兆文件秒发的目标。具体功能就是服务器传文件到客户端客户端再反传给服务器,运行截图如下:
客户端接收到的文件:
服务器端发送的文件(framework.7z)和接收到客户端反传回的文件(_ramwork.7z):
200MB的文件不到两秒传完了。结果显示内容也没啥差别。
下面是基本思想,其实在上一篇文章都说了在这里用流程图来表示吧!
这方法快是快,但致命的缺点就是不能用到IO多路复用的服务器中。只是用一个客户端和一个服务器之间的情况。要是多个客户端的话,服务器就懵了,不知道那个连接发来的内容该写到同一文件了,另外发一个文件创建太多连接对性能影响也比较大,所以这种传文件的方法用在服务器和客户端之间发大文件不现实!
不过话说回来,现在我们在网上下载个大文件的话也得等上一段时间才行。限制就在于一个TCP连接数据缓冲区本来就有限制,最多也就一次发65535bytes的长度。但跨越这个限制建立多连接用起来却不灵活。所以还是老老实实用单连接发吧,服务器那边也容易处理。
下面是我的代码实现:
- writen和readn
#pragma once
#include<unistd.h>
#include<stdlib.h>
#include<fcntl.h>
#include<stdio.h>
#include<errno.h>
int readn(int fd, void *buf, int n) ;
//写数据
int writen(int fd, void *buf, int n) ;
int readn(int fd, void *buf, int n) {
int nleft = n; //还需要读取的字节数
char *bufptr =(char*)buf; //指向read函数当前存放数据的位置
int nread = 0 ;
while(nleft > 0){
if((nread = read(fd, bufptr, nleft)) < 0){
if(errno == EINTR){ //遇到中断
continue;
}
else // 其他错误
return -1;
}
else if(nread == 0){ // 遇到EOF
break;
}
nleft -= nread;
bufptr += nread;
}
return (n - nleft);
}
int writen(int fd, void *buf, int n){
int nleft = n;
char *bufptr = (char*)buf;
int nwrite;
while(nleft > 0){
if((nwrite = write(fd, bufptr, nleft)) < 0){
if(errno == EINTR)
continue;
else
return -1;
}
if(nwrite == 0)
continue;
nleft -= nwrite;
bufptr += nwrite;
}
return (n-nleft); // 注意这里必须是 n 因为这里保证了 n 字节都被写入
}
- 两端协议
struct proctol{
int flag = 12;
int fd ;
int sockfd ;
int threadNum ;
long cur ;
long moved ;
//文件长度
long size ;
//缓冲区大小
char fileName[1024] ;
char buf[4096] ;
} ;
- 服务器
#include"testServer.h"
#include"ReadWrite.h"
int main(int argc, char** argv)
{
if(argc< 0){
cout<< "use:./a.out filename" <<endl ;
exit(1);
}
int fd = socket(AF_INET, SOCK_STREAM, 0) ;
if(fd < 0) {
return 0 ;
}
struct sockaddr_in sock_addr ;
sock_addr.sin_family = AF_INET ;
sock_addr.sin_port = htons(PORT) ;
sock_addr.sin_addr.s_addr = INADDR_ANY ;
int use = 1 ;
if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &use, sizeof(use))< 0){
cout << "socketoopt" <<endl ;
return 1 ;
}
if(bind(fd, (struct sockaddr*)&sock_addr, sizeof(sock_addr))< 0) {
cout << "bind" <<endl ;
return 1 ;
}
if(listen(fd, BACKLOG) < 0){
cout<< "listen" <<endl ;
return 1 ;
}
SendFile(argv[1], fd) ;
RecvData(fd) ;
return 0;
}
void RecvData(int sockfd) {
int connfd ;
//接受连接
if((connfd = accept(sockfd, NULL, NULL)) < 0) {
return ;
}
proctol data ;
bzero(&data, sizeof(data)) ;
//获取客户端向服务发送的第一个包
int ret = readn(connfd, &data, sizeof(data)) ;
if(ret < 0) {
printf("readn失败!\n") ;
exit(1) ;
}
int threadNum = data.threadNum ;
//第一个包里面含有文件名称
close(connfd) ;
int fd = open(data.fileName, O_CREAT|O_WRONLY, 0666) ;
printf("线程数量%d\n",threadNum) ;
for(int i = 0 ; i< threadNum; i++) {
proctol data ;
bzero(&data, sizeof(data)) ;
printf("第%d个连接\n", i) ;
if((connfd = accept(sockfd, NULL, NULL)) < 0) {
printf("接收文件accept出错!\n") ;
return ;
}
data.fd = fd ;
data.sockfd = connfd ;
std::thread t(threadRecv, std::ref(data)) ;
t.join() ;
}
close(fd) ;
}
//接收每个客户端线程发送来的数据
void threadRecv(proctol& data) {
int connfd = data.sockfd ;
int fileFd = data.fd ;
proctol tmp ;
bzero(&tmp, sizeof(tmp)) ;
int ret = readn(connfd, &tmp, sizeof(tmp)) ;
if(ret < 0) {
printf("接收功能readn读取数据错误!\n") ;
return ;
}
long size = tmp.size ;
long cur = tmp.cur ;
ret = pwrite(fileFd, tmp.buf, tmp.moved, cur) ;
if(ret < 0) {
printf("pwrite写文件失败!\n") ;
return ;
}
size -= tmp.moved ;
cur += tmp.moved ;
while(size) {
bzero(&tmp, sizeof(tmp)) ;
ret = readn(connfd, &tmp, sizeof(tmp)) ;
if(ret < 0&& errno ==EINTR) {
printf("读取文件失败!\n");
continue ;
}
else if(ret == 0) {
break ;
}
if(ret < 0) {
printf("readn接收文件失败!\n") ;
return ;
}
ret = pwrite(fileFd, tmp.buf, tmp.moved, cur) ;
if(ret < 0) {
printf("pwrite写文件失败!\n") ;
return ;
}
size -= tmp.moved ;
cur += tmp.moved ;
}
close(connfd) ;
}
int SendFile(const char*fileName, int sockFd) {
struct stat st_file ;
if(lstat(fileName, &st_file)< 0){
cout << "发送的文件不存在!"<<endl ;
return 1;
}
//为线程分配任务
long file_block ;
proctol pp ;
pp.size = st_file.st_size ;
//获取文件的大小,20mb以上为大文件
if(pp.size>=1024*1024*50) {
file_block = 1024*1024*10 ;
}
else {
file_block = 1024*1024*3 ;
}
printf("每个线程分的块大小:%ld\n", file_block) ;
int threadNum = pp.size/file_block ;
//获取最后一个包的长度
int lastpacksize = (int)pp.size-threadNum*file_block ;
//如果最后一个包的长度不问0的话,则多创建一个线程
if(lastpacksize) {
threadNum++ ;
}
if(threadNum > 100) {
printf("线程数量太多!\n") ;
return 0;
}
int connfd = accept(sockFd, NULL, NULL) ;
if(connfd < 0) {
printf("接收连接accept出错!") ;
exit(1) ;
}
proctol data ;
data.size = file_block ;
//第一个包是文件名称和线程的数量
strcpy(data.fileName, fileName) ;
data.sockfd = sockFd ;
data.threadNum = threadNum ;
int ret = writen(connfd, &data, sizeof(data)) ;
if(ret < 0) {
printf("发送消息失败!") ;
exit(1) ;
}
close(connfd) ;
//打开文件
printf("文件名:%s\n", fileName) ;
int fd = open(fileName, O_RDONLY) ;
if(fd < 0) {
cout << "open error" <<endl ;
exit(1) ;
}
for(int i= 0; i< threadNum; i++) {
connfd = accept(sockFd, NULL, NULL) ;
if(connfd < 0) {
exit(1) ;
}
//发送文件
proctol data ;
memset(data.buf, 0, sizeof(data.buf)) ;
strcpy(data.fileName, fileName) ;
//当线程数量不为0且最后一个包的长度不为0
if(i == threadNum-1&&lastpacksize) {
data.cur = i*file_block ;
data.size = lastpacksize ;
}
else {
//计算偏移
data.cur = i*file_block ;
data.size = file_block ;
}
data.fd = fd ;
data.sockfd = connfd ;
thread t(sender, std::ref(data)) ;
t.join() ;
}
close(fd) ;
return 1 ;
}
//发送文
void sender(proctol& data) {
long cur = data.cur ;
int connfd = data.sockfd ;
int read_count ;
long size = data.size ;
long read_size = 0 ;
int filefd = data.fd ;
printf("传送文件的长度%ld,名称:%s\n", data.size, data.fileName) ;
while(size){
//原子操作
data.moved = sizeof(data.buf) ;
if(sizeof(data.buf) > (size_t)size) {
data.moved = size ;
}
read_count = pread(filefd, data.buf, data.moved, cur+read_size) ;
if(read_count < 0&& errno == EINTR) {
printf("被信号打断") ;
continue ;
}
else if(read_count == 0) {
break ;
}
else if(read_count < 0) {
printf("pread 错误!") ;
exit(1) ;
}
writen(connfd, &data, sizeof(data)) ;
size-=read_count ;
read_size += read_count ;
}
close(connfd) ;
}
- 客户端
#include"testClient.h"
#include"ReadWrite.h"
struct sockaddr_in addr ;
int main(int argc, char** argv)
{
if(argc != 3) {
printf("usage:./a.out ip port\n") ;
exit(1) ;
}
int sockfd = socket(AF_INET, SOCK_STREAM, 0) ;
if(sockfd < 0) {
printf("创建套接字失败!");
exit(1) ;
}
const char* ip = argv[1] ;
int port = atoi(argv[2]) ;
addr.sin_family = AF_INET ;
addr.sin_port = htons(port) ;
inet_pton(AF_INET, ip, &addr.sin_addr) ;
if(connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
printf("connect连接失败!") ;
exit(1) ;
}
int threadNum = 0 ;
proctol data ;
int ret = readn(sockfd, &data, sizeof(data)) ;
if(ret < 0) {
printf("recv 接收消息失败!") ;
exit(1) ;
}
threadNum = data.threadNum ;
if(threadNum > 100) {
printf("线程数量太多!\n") ;
exit(1) ;
}
int fd = open(data.fileName, O_CREAT|O_WRONLY, 0777) ;
if(fd < 0) {
printf("open创建文件出错!") ;
exit(1) ;
}
//创建线程
for(int i = 0; i< threadNum; i++) {
proctol data ;
memset(&data, 0, sizeof(data)) ;
data.fd = fd ;
std::thread t(receive ,std::ref(data)) ;
t.join() ;
}
close(fd) ;
printf("回车开始服务器回传数据....\n");
getchar() ;
//客户端给服务器发送大文件
//发给服务器后进行标识为客户端回射的
const char* filename= data.fileName ;
sendToServer(filename) ;
close(sockfd) ;
printf("接收完成!") ;
return 0;
}
//发送文件给服务器
void sendToServer(const char*fileName) {
//创建套接字
int sockfd = socket(AF_INET, SOCK_STREAM, 0) ;
if(sockfd < 0) {
printf("创建套接字失败!") ;
exit(1) ;
}
if(connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
printf("连接服务器失败!") ;
exit(1) ;
}
struct stat st ;
lstat(fileName, &st) ;
int threadNum = 0 ;
long block_size = 0 ;
int lastpack = 0 ;
long size = st.st_size ;
//最大文件为20M
if(size > 1024*1024*50) {
block_size = 1024*1024*10 ;
}
//不超过20M的每个线程传送2M
else {
block_size = 1024*1024*3 ;
}
threadNum = size/block_size ;
lastpack = size -threadNum*block_size ;
//最后一个包长度不为0,多分一个线程处理
if(lastpack) {
threadNum++ ;
}
if(threadNum > 100) {
printf("不适合发所指定的文件!文件过大!\n");
exit(1) ;
}
proctol data ;
data.threadNum = threadNum ;
strcpy(data.fileName, fileName) ;
data.fileName[0] = '_' ;
int ret = writen(sockfd, &data, sizeof(data)) ;
if(ret < 0) {
printf("文件信息发送失败!\n") ;
return ;
}
close(sockfd) ;
int fd = open(fileName, O_RDONLY) ;
for(int i= 0; i< threadNum; i++) {
sockfd = socket(AF_INET, SOCK_STREAM, 0) ;
//和服务器建立连接
if(connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
printf("线程创建连接失败!\n") ;
return ;
}
if(i == threadNum-1&&lastpack){
data.cur = i*block_size ;
data.size = lastpack ;
}
//判断是否到达文件长度
else {
data.cur = i*block_size ;
data.size = block_size ;
}
data.fd = fd ;
data.sockfd = sockfd ;
std::thread t1(threadSend, std::ref(data)) ;
t1.join() ;
}
printf("发送完成,size=%ld", size) ;
}
//开线程向服务器端发送数据
void threadSend(proctol& data) {
//线程负责的范围和负责的长度
long cur = data.cur ;
long size = data.size ;
int sockfd = data.sockfd ;
int filefd = data.fd ;
proctol tmp ;
bzero(&tmp, sizeof(tmp)) ;
tmp.size = size ;
tmp.cur = cur ;
int ret = 0;
int read_count = 0 ;
while(size) {
int len = sizeof(data.buf) ;
if(size<len) {
len = size ;
}
///通知对方这次在接受到数据的基础上,
//在文件位置为cur的基础上移动moved长度
tmp.moved = len ;
ret = pread(filefd, tmp.buf, len, cur+read_count) ;
if(ret < 0) {
printf("发送数据出错!") ;
return ;
}
writen(sockfd, &tmp, sizeof(tmp)) ;
size -= ret ;
read_count += ret ;
}
close(sockfd) ;
}
void receive(proctol& data) {
int sockfd = socket(AF_INET, SOCK_STREAM, 0) ;
if(sockfd < 0) {
printf("创建套接字失败!") ;
exit(1) ;
}
if(connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
printf("连接服务器失败!") ;
exit(1) ;
}
//接收到数据
proctol datas ;
int ret ;
if((ret = readn(sockfd, &datas, sizeof(datas))) < 0) {
printf("接收数据错误!");
exit(1) ;
}
long size = datas.size ;
long cur = datas.cur ;
if(pwrite(data.fd, datas.buf, datas.moved, cur)<0) {
printf("写文件出错!") ;
exit(1) ;
}
size -= datas.moved;
cur += datas.moved ;
//接收消息
while(size) {
ret = readn(sockfd, &datas, sizeof(datas)) ;
if(ret < 0 && errno == EINTR) {
printf("读取文件失败!\n") ;
continue ;
}
else if (ret == 0) {
break ;
}
else if(ret < 0) {
printf("读取文件出错!") ;
exit(1) ;
}
//将数据写入到文件中
if(pwrite(data.fd, datas.buf, datas.moved, cur) < 0) {
printf("写文件出错!") ;
exit(1) ;
}
cur += datas.moved ;
size -= datas.moved ;
}
}