进程池
进程池的使用场景
当我们需要并行的处理大规模任务的时候,需要使用到多进程,多线程技术,比如说服务器处理大量客户端的任务,我在大一的时候写过一个C/S+mysql架构的聊天室,大概是这样处理的,每当有客户端发出连接请求时,服务器accept成功以后就去fork一个进程去处理关于这个客户端的所有请求,经过后来的一系列的学习,这显然是极其不科学的,狂开进程浪费资源且不说,效率也极低。
- 动态创建进程(或线程)是比较耗费时间的,这将导致较慢的客户响应。
- 动态创建的子进程(或子线程)通常只用来为一个客户服务(除非我们做特殊处理),这将导致系统上产生大量的细微进程(或者线程)。进程(或者线程)间的切换消费大量CPU时间。
- 动态创建的子进程是当前进程的完整映像。当前进程必须谨慎地管理其分配的文件描述符和堆内存等系统资源,从而使系统的可用资源急剧下降,进而影响服务器的性能。
池化的思想
为了解决上述问题呀,我们的前辈们相出了用池化的思想来解决这些资源消耗问题。简而言之,就是提前创建好一些进程,一般3-7个(具体看cpu核数以及运行环境),目的是为了充分释放多核芯的性能,实大规模并行。由于服务器的硬件资源“充裕”,那么提高服务器性能的一个很直接的方法就是以空间换时间,即“浪费”服务器的硬件资源,以换取其运行效率。这就是池的概念。池是一组资源的集合,这组资源在服务器启动之初就完全被创建并初始化,这称为静态资源分配。当服务器进入正式运行阶段,即开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取,无需动态分配。很显然,直接从池中取得所需资源比动态分配资源的速度要快得多,因为分配系统资源的系统调用都是很耗时的。当服务器处理完一个客户连接后,可以把相关的资源放回池中,无需执行系统调用来释放资源。从最终效果来看,池相当于服务器管理系统资源的应用设施,它避免了服务器对内核的频繁访问。
进程池概述
进程池中的所有子进程都运行着相同的代码(不调用exev族函数),并具有相同的属性,比如优先级、 PGID 等。
当有新的任务来到时,主进程将通过某种方式选择进程池中的某一个子进程来为之服务。相比于动态创建子进程,选择一个已经存在的子进程的代价显得小得多。至于主进程选择哪个子进程来为新任务服务,则有两种方法:
1)主进程使用某种算法来主动选择子进程。最简单、最常用的算法是随机算法和 Round Robin (轮流算法)。
2)主进程和所有子进程通过一个共享的工作队列来同步,子进程都睡眠在该工作队列上。当有新的任务到来时,主进程将任务添加到工作队列中。这将唤醒正在等待任务的子进程,不过只有一个子进程将获得新任务的“接管权”,它可以从工作队列中取出任务并执行之,而其他子进程将继续睡眠在工作队列上。
当选择好子进程后,主进程还需要使用某种通知机制来告诉目标子进程有新任务需要处理,并传递必要的数据。最简单的方式是,在父进程和子进程之间预先建立好一条管道,然后通过管道来实现所有的进程间通信。在父线程和子线程之间传递数据就要简单的多,因为我们可以把这些数据定义全局的,那么他们本身就是被所有线程共享的。
综合上面的论述,我们将进程池的一般模型描述为下图所示的形式。
处理多客户
在使用进程池处理多客户任务时,首先考虑的一个问题是:监听socket和连接socket是否都由主进程来统一管理。并发模型,其中半同步/半反应堆模式是由主进程统一管理这两种socket的。而高效的半同步/半异步和领导者/追随者模式,则是由主进程管理所有监听socket,而各个子进程分别管理属于自己的连接socket的。对于前一种情况,主进程接受新的连接以得到连接socket,然后它需要将该socket传递给子进程(对于线程池而言,父线程将socket传递给子线程是很简单的。因为他们可以很容易地共享该socket。但对于进程池而言,必须通过管道传输)。后一种情况的灵活性更大一些,因为子进程可以自己调用accept来接受新的连接,这样该父进程就无须向子进程传递socket。而只需要简单地通知一声:“我检测到新的连接,你来接受它。
长连接,即一个客户的多次请求可以复用一个TCP连接。那么,在设计进程池时还需要考虑:一个客户连接上的所有任务是否始终由一个子进程来处理。如果说客户任务是无状态的,那么我们可以考虑使用不同的进程为该客户不同请求服务。
半同步/半异步进程池实现
综合前面的讨论,我们可以实现这个进程池,为了避免在父、子进程之间传递文件描述符,我们将接受新连接的操作放到子进程中,很显然,对于这种模式而言,一个客户连接上的所有任务始终是由一个子进程来处理的。
奉上代码 进程池github地址
进程池模板 方便以后复用
#ifndef PROCESSPOOL_H
#define PROCESSPOOL_H
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <iostream>
/*子进程类*/
class process
{
public:
/*以 -1初始化*/
process() : m_pid( -1 ){}
public:
/*子进程号*/
pid_t m_pid;
/*父子进程通信管道*/
int m_pipefd[2];
};
/*进程池类
*其模板参数是处理逻辑任务的类
**/
template< typename T >
class processpool
{
/*将构造函数定义为私有的 因此我们只能通过后边的create静态*/
private:
processpool( int listenfd, int process_number = 8 );
public:
/*单例模式 在之后调用到*/
static processpool< T >* create( int listenfd, int process_number = 8 )
{
if( !m_instance )
{
m_instance = new processpool< T >( listenfd, process_number );
}
return m_instance;
}
~processpool()
{
delete [] m_sub_process;
}
/*启动进程池*/
void run();
private:
void setup_sig_pipe();
void run_parent();
void run_child();
private:
/*进程允许的最大子进程数*/
static const int MAX_PROCESS_NUMBER = 16;
/*每个子进程最多能处理的客户数量*/
static const int USER_PER_PROCESS = 65536;
/*epoll 最多能处理的事件数*/
static const int MAX_EVENT_NUMBER = 10000;
/*进程池中的进程数*/
int m_process_number;
/*进程池在池中的序号 从0开始*/
int m_idx;
/*每个进程都有一个epoll内核事件表 用epollfd标识*/
int m_epollfd;
/*监听socket*/
int m_listenfd;
/*子进程通过stop来决定是否停止*/
int m_stop;
/*保存所有的子进程的描述信息*/
process* m_sub_process;
/*进程池静态实例*/
static processpool< T >* m_instance;
};
template< typename T >
processpool< T >* processpool< T >::m_instance = NULL;
/*用于处理信号的管道 实现统一信号源
* 全局 */
static int sig_pipefd[2];
static int setnonblocking( int fd )
{
int old_option = fcntl( fd, F_GETFL );
int new_option = old_option | O_NONBLOCK;
fcntl( fd, F_SETFL, new_option );
return old_option;
}
static void addfd( int epollfd, int fd )
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
setnonblocking( fd );
}
static void removefd( int epollfd, int fd )
{
epoll_ctl( epollfd, EPOLL_CTL_DEL, fd, 0 );
close( fd );
}
static void sig_handler( int sig )
{
int save_errno = errno;
int msg = sig;
/* 这块为啥得转成char* */
send( sig_pipefd[1], ( char* )&msg, 1, 0 );
errno = save_errno;
}
static void addsig( int sig, void( handler )(int), bool restart = true )
{
struct sigaction sa;
memset( &sa, '\0', sizeof( sa ) );
sa.sa_handler = handler;
if( restart )
{
sa.sa_flags |= SA_RESTART;
}
sigfillset( &sa.sa_mask );
assert( sigaction( sig, &sa, NULL ) != -1 );
}
/*进程池的构造函数 参数listenfd是监听*/
template< typename T >
processpool< T >::processpool( int listenfd, int process_number )
: m_listenfd( listenfd ), m_process_number( process_number ), m_idx( -1 ), m_stop( false )
{
assert( ( process_number > 0 ) && ( process_number <= MAX_PROCESS_NUMBER ) );
std::cout << "pool\n"<<std::endl;
m_sub_process = new process[ process_number ];
assert( m_sub_process );
for( int i = 0; i < process_number; ++i )
{
/*建立通信管道*/
int ret = socketpair( PF_UNIX, SOCK_STREAM, 0, m_sub_process[i].m_pipefd );
assert( ret == 0 );
m_sub_process[i].m_pid = fork();
assert( m_sub_process[i].m_pid >= 0 );
/*父进程*/
if( m_sub_process[i].m_pid > 0 )
{
close( m_sub_process[i].m_pipefd[1] );
continue;
}
/*子进程*/
else
{
close( m_sub_process[i].m_pipefd[0] );
m_idx = i;
break;
}
}
}
/*统一事件源*
* 每一个子进程都会拥有一次 sig_pipefd
父子进程都做一次*/
template< typename T >
void processpool< T >::setup_sig_pipe()
{
m_epollfd = epoll_create( 5 );
assert( m_epollfd != -1 );
int ret = socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd );
assert( ret != -1 );
setnonblocking( sig_pipefd[1] );
addfd( m_epollfd, sig_pipefd[0] );
/*设置信号处理函数*/
addsig( SIGCHLD, sig_handler );
addsig( SIGTERM, sig_handler );
addsig( SIGINT, sig_handler );
addsig( SIGPIPE, SIG_IGN );
}
/*父进程中的m_idx是-1 子进程中的m_idx值大于等于0 我们据此判断接下来要运行的代码是父进程代码还是子进程的 */
template< typename T >
void processpool< T >::run()
{
if( m_idx != -1 )
{
run_child();
return;
}
run_parent();
}
/*子进程运行*/
template< typename T >
void processpool< T >::run_child()
{
/*用于父子进程通信*/
setup_sig_pipe();
/*每个子进程都能通过其在进程池中的序号值m_idx找到与父进程通信的管道*/
int pipefd = m_sub_process[m_idx].m_pipefd[ 1 ];
addfd( m_epollfd, pipefd );
epoll_event events[ MAX_EVENT_NUMBER ];
/*处理cgi请求的类 一个子进程最多处理USER_PER_PROCESS个连接*/
T* users = new T [ USER_PER_PROCESS ];
assert( users );
int number = 0;
int ret = -1;
while( ! m_stop )
{
number = epoll_wait( m_epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ( number < 0 ) && ( errno != EINTR ) )
{
printf( "epoll failure\n" );
break;
}
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
/*从父子进程之间的管道读取数据 并将结果保存在变量client中 如果成功表示有新客户连接到来*/
if( ( sockfd == pipefd ) && ( events[i].events & EPOLLIN ) )
{
int client = 0;
/*仅接收 不处理字符串*/
ret = recv( sockfd, ( char* )&client, sizeof( client ), 0 );
if( ( ( ret < 0 ) && ( errno != EAGAIN ) ) || ret == 0 )
{
continue;
}
/*让子进程来完成客户端的连接*/
else
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( m_listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
if ( connfd < 0 )
{
printf( "errno is: %d\n", errno );
continue;
}
addfd( m_epollfd, connfd );
/* 模板类T必须实现init方法 以初始化一个客户端连接
* 我们直接使用connfd来索引逻辑处理对象,以提高程序的运行效率
*/
users[connfd].init( m_epollfd, connfd, client_address );
}
}
/* 处理子进程收到信号
* 关于这块大家要小心处理
* 因为父子进程会同时收到完全一样的信号
*/
else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) )
{
int sig;
char signals[1024];
ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );
if( ret <= 0 )
{
continue;
}
else
{
for( int i = 0; i < ret; ++i )
{
switch( signals[i] )
{
/*子进程不需要处理这个child新号吧
* 做完测试之后发现并不是*/
case SIGCHLD:
case SIGTERM:
case SIGINT:
{
printf("子进程也受到啦\n");
m_stop = true;
break;
}
default:
{
break;
}
}
}
}
}
/*客户请求的到来 调用process来处理*/
else if( events[i].events & EPOLLIN )
{
users[sockfd].process();
}
else
{
continue;
}
}
}
delete [] users;
users = NULL;
close( pipefd );
//close( m_listenfd );
close( m_epollfd );
}
template< typename T >
void processpool< T >::run_parent()
{
setup_sig_pipe();
/*父进程监听listenfd*/
addfd( m_epollfd, m_listenfd );
epoll_event events[ MAX_EVENT_NUMBER ];
int sub_process_counter = 0;
int new_conn = 1;
int number = 0;
int ret = -1;
while( ! m_stop )
{
number = epoll_wait( m_epollfd, events, MAX_EVENT_NUMBER, -1 );
if ( ( number < 0 ) && ( errno != EINTR ) )
{
printf( "epoll failure\n" );
break;
}
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
/*有新连接到来采用Round Robin方式将其分配给一个子进程处理*/
/*这里就是前文提到的主进程发送信息通知子进程接受连接,高效的半同步/半异步和领导者/追随者模式,则是由主进程管理所有监听socket,而各个子进程分别管理属于自己的连接socket的。*/
if( sockfd == m_listenfd )
{
int i = sub_process_counter;
do
{
if( m_sub_process[i].m_pid != -1 )
{
break;
}
i = (i+1)%m_process_number;
}
while( i != sub_process_counter );
if( m_sub_process[i].m_pid == -1 )
{
m_stop = true;
break;
}
sub_process_counter = (i+1)%m_process_number;
//send( m_sub_process[sub_process_counter++].m_pipefd[0], ( char* )&new_conn, sizeof( new_conn ), 0 );
send( m_sub_process[i].m_pipefd[0], ( char* )&new_conn, sizeof( new_conn ), 0 );
printf( "send request to child %d\n", i );
//sub_process_counter %= m_process_number;
}
/*处理父进程接收到的信号*/
else if( ( sockfd == sig_pipefd[0] ) && ( events[i].events & EPOLLIN ) )
{
int sig;
char signals[1024];
ret = recv( sig_pipefd[0], signals, sizeof( signals ), 0 );
if( ret <= 0 )
{
continue;
}
else
{
for( int i = 0; i < ret; ++i )
{
switch( signals[i] )
{
/*第i个子进程退出了 则主进程关闭相应的通信管道并设置m_pid为-1 已标记子进程已退出*/
/*-1代表等待任意一个 stat保存退出状态信息 WNOHANG表明非阻塞式
* 正常退出返回退出子进程的id*/
case SIGCHLD:
{
pid_t pid;
int stat;
while ( ( pid = waitpid( -1, &stat, WNOHANG ) ) > 0 )
{
for( int i = 0; i < m_process_number; ++i )
{
if( m_sub_process[i].m_pid == pid )
{
printf( "child %d join\n", i );
close( m_sub_process[i].m_pipefd[0] );
m_sub_process[i].m_pid = -1;
}
}
}
/*检查是否又有进程存活*/
m_stop = true;
for( int i = 0; i < m_process_number; ++i )
{
if( m_sub_process[i].m_pid != -1 )
{
m_stop = false;
}
}
break;
}
case SIGTERM:
/*父进程终止信号 杀死所有子进程并等待救赎
* 更好的方式是向父子进程的通信管道发送特殊数据*/
case SIGINT:
{
printf( "kill all the clild now\n" );
for( int i = 0; i < m_process_number; ++i )
{
int pid = m_sub_process[i].m_pid;
if( pid != -1 )
{
kill( pid, SIGTERM );
}
}
break;
}
default:
{
break;
}
}
}
}
}
else
{
continue;
}
}
}
/*由创建者关闭*/
//close( m_listenfd );
close( m_epollfd );
}
#endif
cgi程序
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include "./15-1processpool.h"
#include <iostream>
class cgi_conn
{
public:
cgi_conn(){}
~cgi_conn(){}
//初始化客户端连接,清空该缓冲区
void init(int epollfd, int sockfd, const sockaddr_in& client_addr)
{
m_epollfd = epollfd;
m_sockfd = sockfd;
m_address = client_addr;
memset(m_buf, '\0', BUFFER_SIZE);
m_read_idx = 0;
}
/*
* 从m_sockfd读入信息,并进行处理
*/
void process()
{
int idx = 0;
int ret = -1;
//循环读取和分析客户的数据
while(true)
{
idx = m_read_idx;
ret = recv(m_sockfd, m_buf + idx, BUFFER_SIZE-1-idx, 0);
if(ret < 0)
{
if( errno != EAGAIN)
{
removefd(m_epollfd, m_sockfd);
}
}
//如果对方关闭链接,服务器端也关闭
else if(ret == 0)
{
removefd(m_epollfd, m_sockfd);
break;
}
else
{
m_read_idx += ret;
std::cout<<"user content is : "<<m_buf<<std::endl;
for(;idx < m_read_idx; ++idx)
{
if( (idx>=1) && (m_buf[idx-1] == '\r') && (m_buf[idx] == '\n') )
{
break;
}
}
//如果没有遇到字符\r \n 就读取更多数据
if(idx == m_read_idx)
{
continue;
}
m_buf[ idx-1 ] = '\0';
char * file_name = m_buf;
if(access(file_name, F_OK) == -1)
{
removefd(m_epollfd, m_sockfd);
break;
}
ret = fork();
if(ret == -1)
{
removefd(m_epollfd,m_sockfd);
break;
}
//父进程
else if(ret > 0)
{
removefd(m_epollfd,m_sockfd);
break;
}
else
{
close(STDERR_FILENO);
dup(m_sockfd);
execl(m_buf, m_buf, 0);
exit(0);
}
}
}
}
private:
//读缓冲区大小
static const int BUFFER_SIZE = 1024;
static int m_epollfd;
int m_sockfd;
sockaddr_in m_address;
char m_buf[ BUFFER_SIZE ];
//标记缓冲区下一个要读入的位置
int m_read_idx;
};
int cgi_conn::m_epollfd = -1;
int main(int argc, char* argv[])
{
if(argc <= 2)
{
printf("usage: %s ip_address port_number\n",basename(argv[0]));
return 1;
}
const char * ip = argv[1];
int port = atoi(argv[2]);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert( listenfd >=0 );
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
ret= bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd,5);
assert(ret != -1);
/*创建进程池 会自动调用构造函数的*/
processpool<cgi_conn>* pool = processpool<cgi_conn>::create(listenfd);
if(pool)
{
pool->run();
delete pool;
}
close(listenfd);
return 0;
}
(提前说明一点 这是我在高性能服务器一书上摘取的代码,有一小部分的改进)
编译就没啥好说的了,g++就可以了 客户端的话大家就用telnet 来测试就好了
这样一个简单的进程池就实现啦 以上.
参考
<<Linux 高性能服务器编程>>