前面说到了我已经测试好了任务队列中的fd是可以正常加入和pop出来的,现在我想我需要做的是将pop出来的fd交给线程池去处理,c++的std::vector
和std::thread
配合使用应该是非常爽的,于是我就开始先写了一个简单的模型:
#include<iostream>
#include<vector>
#include<thread>
#include<unistd.h>
#include<sys/types.h>
#include<mutex>
#include<signal.h>
std::vector<std::thread> g_list;
std::mutex g_lock;
int type = 0;
void func() {
while(1) {
g_lock.lock();
if(type == 1) {
std::cout << "我是 : " << std::this_thread::get_id() << "我现在要工作2s" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(2));
type = 0;
}
g_lock.unlock();
}
}
void handle(int sigino) //如果按下ctrl + c ,则将type置为一
{
type = 1;
}
void CreateThread(int n) {
for(int i = 0;i < n;++i) {
std::thread t(func);
t.detach(); //让主线程和子线程分离,主线程不会组阻塞
g_list.push_back(std::move(t));
}
}
int main(int argc,char *argv[])
{
signal(SIGINT,handle); //如果按ctrl+C 会执行handle
CreateThread(5);
while(1){}; //保证主线程没有退出
return 0;
}
由于没有加条件变量,所以五个线程实际上是在不断的轮询抢锁判断。结果如下:
注意:由于需要在不同线程间快速切换,所以这样是非常耗费资源的,很容易就吧cpu跑成了100。
然后我觉得这样可行啊,继续工作,写出了thpoll.h
#ifndef _THPOLL_H
#define _THPOLL_H
#include<vector>
#include<thread>
#include<string>
class thpoll {
public:
thpoll() = default;
thpoll(int size);
void func();
private:
std::vector<std::thread> poll; //第一版
//std::vector<std::shared_ptr<std::thread>> poll; //第二版
int size;
int popfd;
char buf[1024];
};
#endif
下面是thpoll.cpp的代码:
extern std::mutex g_lock;
extern std::queue<epoll_event> MessageQueue;
struct epoll_event ev;
thpoll::thpoll(int size)
{
for(int i = 0;i < size;++i ) {
std::thread t(&thpoll::func,this); //第一版
//std::shared_ptr<std::thread> t = std::make_shared<std::thread>(&thpoll::func,this); //第二版
poll.push_back(std::move(t)); //第一版
//poll.push_back(t); //第二版
t.detach();
}
}
/* 省略了thpoll::func() */
}
第一版的代码编译不会有错误,但是运行的时候就坑了:
于是我查询得到下面结果:
http://stackoverflow.com/questions/21084462/threads-fine-with-10-crash-with-10000
改成了第二版,但是问题又来了,现在我的线程池能跑。但是它已经跑起来了,并且现在跑的是线程函数
,而线程函数是我在初始化线程的时候就指定的。所以现在的问题变成了我怎样把这个fd传入线程中
,于是我在server.cpp
中:
void server::thread_func(thpoll& mythpoll)
{
while(1) {
if(MessageQueue.empty() == false) { // 如果队列中有东西
mythpoll.func(); //直接调用定义的thpoll.func();
}
sleep(1);
}
}
但是还是传不了?我的最终问题成了c++怎样给运行中的线程传递参数
提问以及解答过程点这里:c++ 如何给 “运行中” 的线程传递数据
之后我的结论是:一般的线程池中都会又一个任务队列,就和我的模型一样,但是投递的应该是一个操作,而不是一个fd,比如一个fd的read 操作,或者fd的write操作,要是只传fd,那只能有一类操作了。
解决了这个问题,我开始重新考虑我的模型,我突然觉得开一个线程监控任务队列,然后拿出fd竟然是一个多余操作,我完全可以直接在线程池中监控fd,而且就把这个代码放在线程函数中,让线程轮询检查fd的操作,有的话处理就行了。
最终的server.cpp
代码就这样:
#include<iostream>
#include<thread>
#include<mutex>
#include<sys/epoll.h>
#include<queue>
#include<sys/types.h>
#include<sys/socket.h>
#include<chrono>
#include<string.h>
#include"thpoll.h"
#include"server.h"
extern std::mutex g_lock;
extern std::queue<epoll_event> MessageQueue;
struct epoll_event ev;
thpoll::thpoll(int size)
{
for(int i = 0;i < size;++i ) {
std::shared_ptr<std::thread> t = std::make_shared<std::thread>(&thpoll::func,this);
poll.push_back(t);
t.detach();
}
}
void thpoll::func()
{
while(1) {
g_lock.lock();
if(MessageQueue.empty() == false) {
std::cout << "从队列中取出来了fd : ";
ev = MessageQueue.front();
std::cout << ev.data.fd << std::endl;
MessageQueue.pop();
std::cout << "当前队列的长度: " << MessageQueue.size() << std::endl;
if(ev.events & EPOLLIN) {
bzero(buf,1024);
recv(ev.data.fd,buf,1024,0);
std::cout << "我是线程 : " << std::this_thread::get_id() << " 我读出了: " << buf << "我将要工作5s" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "我工作完了"<< std::endl;
}else if(ev.events & EPOLLOUT) {
bzero(buf,1024);
strcpy(buf,"来自服务器的消息:收到\n");
std::cout << "我是线程 : " << std::this_thread::get_id() << " 我将要工作5s" << std::endl;
send(ev.data.fd,buf,sizeof(buf),0);
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "我工作完了"<< std::endl;
}
}
g_lock.unlock();
}
}
全部的源代码点这里:半同步/半异步线程池
运行部分截图如下:
可以发现时有三个线程在处理。
还需要总结的有:
1:互斥锁加条件变量的使用
2:epoll的边缘触发和水平触发