此TCP服务器客户端使用
- 使用了线程池
- 解决了粘包问题
- 使用了任务类
- 使用了方法类
服务器
server.cc
#include"server.hpp"
int main(int argc,char* argv[])
{
if(argc!=2)
{
cout<<"wrong command,please type again"<<endl;
exit(1);
}
TCPSERVER* p=new TCPSERVER(atoi(argv[1]));
p->InitServer();
p->StartTcp();
delete p;
return 0;
}
server.hpp
#pragma once
#include <iostream>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <signal.h>
#include <cstring>
#include <arpa/inet.h>
#include <string>
#include <pthread.h>
#include <cstdlib>
#include "threadpool.hpp"
using namespace ns_threadpool;
using namespace std;
#define MAX 1024
#define DEFUALTPORT 8081
#define BACKLOG 20
//因为我们要获取里面的东西,所以定义称struct类型的就可以了
struct Param
{
int _sockfd;
string _ip;
int _port;
Param(int sockfd, string ip, int port)
: _sockfd(sockfd), _ip(ip), _port(port)
{
}
~Param()
{
}
};
//我们在服务端引入了线程池,在服务类里面增加一个线程池的指针成员
class TCPSERVER
{
private:
int _sockfd; //监听套接字
int _port; //服务端的端口号
ThreadPool<Task> *tp;
public:
void err(const char *error)
{
perror(error);
exit(1);
}
TCPSERVER(int port = DEFUALTPORT)
: _port(port), _sockfd(-1)
{
}
~TCPSERVER()
{
if (_sockfd > 0)
close(_sockfd);
}
void InitServer()
{
signal(SIGCHLD, SIG_IGN); //忽略子进程退出后,给父进程发送的信号
_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (_sockfd < 0)
{
err("socket");
}
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_port = htons(_port);
local.sin_family = AF_INET;
local.sin_addr.s_addr = INADDR_ANY;
if (bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0)
{
err("bind");
}
if (listen(_sockfd, BACKLOG) < 0)
{
err("listen");
}
tp = new ThreadPool<Task>(); //先动态开辟一个线程池对象
}
// static void *Routine(void *args) //因为在类里面,所以不要this指针
// {
// Param sock = *(Param *)args;
// delete args;
// pthread_detach(pthread_self()); //线程分离就不需要再去join了
// cout << "create a new pthread for IO" << endl;
// char buff[MAX];
// while (true)
// {
// //读取数据
// struct sockaddr_in peer;
// socklen_t len = sizeof(peer);
// ssize_t s = recvfrom(sock._sockfd, buff, sizeof(buff) - 1, 0, (struct sockaddr *)&peer, &len); // peer里面就是远程的数据了
// if (s > 0)
// {
// buff[s] = 0;
// fflush(stdout);
// cout << "#client:" << buff << endl;
// string msg = "#client ";
// msg += buff;
// sendto(sock._sockfd, msg.c_str(), msg.size(), 0, (struct sockaddr *)&peer, len);
// }
// else
// {
// cout << "error data" << endl;
// break;
// }
// }
// }
void StartTcp()
{
tp->InitThreadPool(); //初始化线程池
while (true)
{
sockaddr_in endpoint;
memset(&endpoint, '\0', sizeof(endpoint));
//处理任务
socklen_t len = sizeof(endpoint);
int sock = accept(_sockfd, (struct sockaddr *)&endpoint, &len); //这个sock就是用来和客户端进行通信的,
if (sock < 0)
{
perror("accept");
continue;
}
string cli_ip = inet_ntoa(endpoint.sin_addr);
string ip(cli_ip);
int port = ntohs(endpoint.sin_port);
cli_ip += ":";
cli_ip += to_string(ntohs(endpoint.sin_port));
cout << "get a link->" <<sock<< cli_ip <<"->"<<port<< endl;
// pthread_t tid;
//但是调用server函数的时候我们需要的是socket ip port,创建线程的时候只能传一个参数,所以我们需要定义一个结构体存储这些信息
// Param *p = new Param(sock, ip, port); //这样就把数据都初始化传进去了,三个数据都有了
//对于主线程accept上来的文件描述符,不能由主线程来close,应该让操作这个sockfd的线程进行关闭
//同样,新线程也不能对监听sockfd进行关闭
// pthread_create(&tid, nullptr, Routine, (void *)p); //在这里执行操作,所以这个地方就要传入套接字
//构造一个任务
Task task(sock,ip,port);
//把这个任务放到线程池的任务队列里面取
tp->PushTask(task);
/*
各个线程共享一个文件描述符表,因此当主线程调用accept的时候,其他创建的新线程是可以直接访问到这个文件描述符的
由于此时新线程并不知道它要服务的客户端所要对应的是哪一个文件描述符,所以此时主线程要告诉新线程应该要去访问的文件描述符的值
就是要操作哪一个套接字
*/
/*
当前多线程版本的服务器存在的问题是,每次有新的链接来的话,就会为客户端创建为它服务的新线程,在服务结束之后就会把他进行销毁,
这样不仅有麻烦,效率还很低下
如果有大量的客户端进行连接请求,此时服务端要为每一个客户端创建对应的服务线程,计算机中的线程越多,CPU 的压力也就越大,CPU要在
这些线程里面不断的来回切换,此时CPU 在调度线程的时候,线程和线程之间的切换成本就很高,线程如果很多的话,被调度的周期也就很长,
这样客户端也会迟迟得不到应答
*/
/*
解决方案
* 可以先在服务端创建一些线程,当有客户端请求连接的时候 就让这些线程提供服务,此时客户端一来就有线程来替他们服务,而不是客户端请求了才创建线程
* 当某个线程为客户端提供完服务之后,不要让该线程退出,而是让该线程继续为下一个客户端提供服务,如果当前没有客户端连接请求,则可以让线程先休眠
当有客户端到来的时候再把该线程唤醒
* 服务端创建的这一批线程不能太多,此时CPU 的压力也就不会太大,如果此时客户端连接到来,此时一批线程都在进行工作,那么此时服务端不应该再创建线程
而是让这个新来的线程再连接队列里面排队,等服务端这些线程有空闲的就替它提供服务
此时就引入了线程池再服务端里面,因为线程池的存在就是为了避免处理短时间任务创建和销毁线程的代价,此时线程池还能够保证内核充分利用,避免过分调度
其中线程池里面有一个任务队列,当有新的任务来的时候,就把任务push到线程池里面,再线程池里面我们默认创建了5个线程,如果线程池里面没有那么多任务,就休眠
*/
}
}
};
threadpool.hpp
#pragma once
#include <iostream>
#include <string>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <string>
#include"IO.hpp"
#include <cstring>
using namespace std;
//设计一个handler类,在handler类里面对()操作符进行重载,将()操作符的执行动作重载为执行server函数的代码
class Handler
{
public:
Handler()
{
}
~Handler()
{
}
void operator()(int sock, string cliip, int cliport)
{
//执行server函数的代码
// char buff[1024];
// while (true)
// {
// ssize_t size = read(sock, buff, sizeof(buff) - 1);
// if (size > 0) //读取成功
// {
// buff[size] = '\0';
// cout << cliip << ":" << cliport << "#" << buff << endl;
// }
// else if (size == 0) //对端关闭了
// {
// cout << cliip << ":" << cliip << " close!" << endl;
// }
// else
// {
// //读取失败
// cerr << sock << " read error!" << endl;
// break; //读取失败的化就关闭
// }
// }
char *buff; //接收数据
while (true)
{
//读取数据
// struct sockaddr_in peer;
// socklen_t len = sizeof(peer);
// ssize_t s = recvfrom(sock, buff, sizeof(buff) - 1, 0, (struct sockaddr *)&peer, &len); // peer里面就是远程的数据了
//使用新的解决粘包问题的读取数据的方法
int s = RecvMsg(sock, &buff); //读取数据
if (s > 0)
{
buff[s] = 0;
fflush(stdout);
cout << "#client:" << buff << endl;
string msg = "#client ";
msg += buff;
//发送回客户端
sendmsg(sock, msg.c_str(), msg.size()); //发送数据
// sendto(sock, msg.c_str(), msg.size(), 0, (struct sockaddr *)&peer, len);
}
else
{
cout << "error data" << endl;
break;
}
}
close(sock); //这个线程用完了就要把这个文件描述符关掉
cout << cliip << ":" << cliport << " service done!" << endl;
}
};
class Task //任务
{
private:
int _sockfd;
string _cliip;
int _cliport;
Handler _handler; //处理方法
public:
Task() //无参数,就是为了线程池取任务出来执行
{
}
Task(int sock, string ip, int port) //构造函数里面放任务
: _sockfd(sock), _cliip(ip), _cliport(port)
{
}
~Task()
{
}
//处理任务的函数
void Run()
{
_handler(_sockfd, _cliip, _cliport); //调用仿函数
}
};
namespace ns_threadpool
{
const int g_num = 5;
template <class T>
class ThreadPool //线程池
{
private:
int num_; //一个线程池里面有多少个任务
std::queue<T> task_queue_; //任务队列,临界资源
pthread_mutex_t mtx_;
pthread_cond_t cond_;
public:
ThreadPool(int num = g_num) : num_(num)
{
pthread_mutex_init(&mtx_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
~ThreadPool()
{
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&cond_);
}
//在类中,要让
static void *Rountine(void *args)
//也不能访问类里面非static成员
{
pthread_detach(pthread_self()); //实现线程分离就不要再去join等待了
ThreadPool<T> *tp = (ThreadPool<T> *)args;
while (true)
{
//从任务队列里面去拿一个任务
//执行任务,要先把这个任务队列锁主
//每个线程他跟放任务的线程一样,都是竞争式的去拿一个任务
tp->Lock();
//先检测任务队列是否有一个任务
while (tp->IsEmpty())
{
//检测到任务队列为空
//此时线程就挂起等待
tp->Wait();
}
//该任务队列里面一定有任务了
T t;
tp->PopTask(&t);
//任务就拿到了
tp->UnLock();
t.Run(); //可能有多个线程在处理任务,
sleep(1);
}
}
void InitThreadPool()
{
//初始化一批线程,
//这样就不要每次用都要去开辟线程了
pthread_t tid; //一次创建一批线程
for (int i = 0; i < num_; i++)
{
pthread_create(&tid, nullptr, Rountine, (void *)this);
//在类中不能执行线程的方法,因为他都有隐藏的this指针
//所以我们需要使用静态的函数,就没有了this指针
}
}
void PopTask(T *out)
{
*out = task_queue_.front();
task_queue_.pop();
}
void Wait()
{
pthread_cond_wait(&cond_, &mtx_);
}
bool IsEmpty()
{
return task_queue_.empty();
}
void Lock()
{
pthread_mutex_lock(&mtx_);
}
void UnLock()
{
pthread_mutex_unlock(&mtx_);
}
void Wakeup()
{
pthread_cond_signal(&cond_);
}
void PushTask(const T &in)
{
//塞任务,就相当于一个生产者,生产者之间要进行互斥访问
Lock();
task_queue_.push(in);
UnLock();
Wakeup();
}
//万一任务队列里面一个任务都没有的话,那么线程池里面的每一个线程就要处于休眠状态,挂起等待
};
}
IO.hpp
#pragma once
#include<iostream>
#include<cstdlib>
#include<unistd.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<cstring>
#include<arpa/inet.h>
int writen(int _sockfd,const char *msg, int size)
{
const char *buf = msg; //指向它的地址,发送出去
int count = size; //剩余需要发送的字节数
while (count > 0)
{
int len = send(_sockfd, buf, count, 0);
if (len == -1)
{
//发送失败
return -1;
}
else if (len == 0)
{
continue; //没发送出去,再发送一次
}
else
{
//发送成功
buf += len;
count -= len;
}
}
return size; //发送成功,发送完成
}
void sendmsg(int _sockfd,const char *msg, int len)
{
//先申请包头
char *data = (char *)malloc(sizeof(char) * (len + 4)); //多加的4是为了数据的长度
int biglen = htonl(len);
memcpy(data, &biglen, 4); //拷贝4个字节过去
memcpy(data + 4, msg, len); //拷贝len个长度过去
int ret = writen(_sockfd,data, len + 4); //真正的传输数据
if (ret == -1)
{
//发送失败
free(data); //把data的内存销毁掉
close(_sockfd);
}
else
free(data);
}
int readn(int _sockfd,char *buf, int size)
{
char *pt = buf;
int count = size;
while (count > 0)
{
int len = recv(_sockfd, pt, count, 0);
if (len == -1)
{
//读取失败
return -1;
}
else if (len == 0)
{
return size - count;
}
else
{
pt += len;
count -= len;
}
}
return size;
}
int RecvMsg(int _sockfd,char **buf)
{
//解包
int len = 0;
readn(_sockfd,(char *)&len, 4);
len = htonl(len);
char *msg = (char *)malloc(sizeof(char) * (len + 1));
int size = readn(_sockfd,msg, len);
if (size != len)
{
close(_sockfd);
free(msg);
return -1;
}
msg[size] = '\0';
*buf = msg;
return size;
}
客户端
client.cc
#include"client.hpp"
int main(int argc,char* argv[])
{
if(argc!=3)
{
cout<<"please type again"<<endl;
exit(1);
}
TcpClient* cl=new TcpClient(argv[1],atoi(argv[2]));
cl->InitClient();
cl->StartClient();
return 0;
}
client.hpp
#pragma once
#include<iostream>
#include<string>
#include<unistd.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<cstdlib>
#include<arpa/inet.h>
#include<cstring>
#include"IO.hpp"
using namespace std;
class TcpClient
{
private:
string _ip;//服务器的ip地址
int _port;//这里的端口就是服务器的端口
int _sockfd;//创建socket连接到服务器上面
public:
TcpClient(string ip="127.0.0.1",int port=8081)
:_ip(ip),_port(port)
{
}
~TcpClient()
{
if(_sockfd>0)
{
close(_sockfd);
}
}
void InitClient()
{
_sockfd=socket(AF_INET,SOCK_STREAM,0);
if(_sockfd<0)
{
perror("socket");
exit(1);
}
struct sockaddr_in svr;//连接到远程服务器上
svr.sin_port=htons(_port);
svr.sin_family=AF_INET;
svr.sin_addr.s_addr=inet_addr(_ip.c_str());//将string转化成为网络字节序
socklen_t len=sizeof(svr);
if(connect(_sockfd,(struct sockaddr*)&svr,len)!=0)
{
perror("connect");
exit(1);
}
}
void StartClient()
{
while(true)
{
string msg;
getline(cin,msg);
//这样发送会出现丢包问题,所以我们要进行修改
sendmsg(_sockfd,msg.c_str(),msg.size());//将msg的数据和大小都发送过去
//发送完之后就要接收数据
char* buf;//用来接收数据
int ss=RecvMsg(_sockfd,&buf);
if(ss>0)
{
buf[ss]=0;
cout<<"server echo "<<buf<<endl;
}
//发送数据
}
}
};