线程经典问题
本月任务
一. 解决生产者消费者问题
二. 解决哲学家吃饭问题
三. 实现进程池/线程池
在编译运行程序之后需要加上 -lpthread
生产者消费者问题
问题:生产者消费者共享缓冲区,生产者向缓冲区中放数据,消费者从缓冲取中取数据,当缓冲区中被放满时,生产者进程就必须进入挂起状态,直到消费者从缓冲中取走数据时,生产者才能继续向缓冲区中存放数据,同样当缓冲取中没有数据时,消费者进程就必须进入挂起休眠状态,直到生产者向缓冲区中放入数据时,消费者才能被唤醒继续从缓冲区中取走数据。
生产者消费者问题,也称有限缓冲问题,是一个多线程同步问题的经典案例。
解决这个问题之前,我们应该了解一下PV操作(这篇文章很好理解)
# include <stdio.h>
# include <pthread.h>
# include <unistd.h>
# include <stdlib.h>
#define N 100
#define true 1
#define producerNum 10
#define consumerNum 5
typedef int semaphore;
typedef int item;
item buffer[N] = {
0};
int in = 0;
int out = 0;
int proCount = 0;
semaphore mutex = 1, empty = N, full = 0, proCmutex = 1;
void * producer(void * a)
{
while(true) {
while( proCmutex <= 0 );
proCmutex--;
proCount++;
printf("生产一个产品ID%d, 缓冲区位置为%d\n",proCount,in);
proCmutex++;
while( empty <= 0 ) {
printf("缓冲区已满!\n");
}
empty--;
while( mutex <= 0 );
mutex--;
buffer[in] = proCount;
in = (in + 1) % N;
mutex++;
full++;
sleep(1);
}
}
void * consumer(void *b)
{
while(true) {
while( full <= 0 ) {
printf("缓冲区为空!\n");
}
full--;
while( mutex <= 0 );
mutex--;
int nextc = buffer[out];
buffer[out] = 0;//消费完将缓冲区设置为0
out = (out + 1) % N;
mutex++;
empty++;
printf("\t\t\t\t消费一个产品ID%d,缓冲区位置为%d\n", nextc,out);
sleep(1);
}
}
int main()
{
pthread_t threadPool[producerNum+consumerNum];
int i;
for(i = 0; i < producerNum; i++) {
pthread_t temp;
if ( pthread_create(&temp, NULL, producer, NULL) == -1 ) {
printf("ERROR, fail to create producer%d\n", i);
exit(1);
}
threadPool[i] = temp;
}//创建生产者进程放入线程池
for(i = 0; i < consumerNum; i++) {
pthread_t temp;
if ( pthread_create(&temp, NULL, consumer, NULL) == -1 ) {
printf("ERROR, fail to create consumer%d\n", i);
exit(1);
}
threadPool[i+producerNum] = temp;
}//创建消费者进程放入线程池
void * result;
for(i = 0; i < producerNum+consumerNum; i++) {
if ( pthread_join(threadPool[i], &result) == -1 ) {
printf("fail to recollect\n");
exit(1);
}
}//运行线程池
return 0;
}
哲学家吃饭问题
问题:有五个哲学家绕着圆桌坐,每个哲学家面前有一盘面,两人之间有一支筷子,这样每个哲学家左右各有一支筷子。哲学家有2个状态,思考或者拿起筷子吃饭。如果哲学家拿到一只筷子,不能吃饭,直到拿到2只才能吃饭,并且一次只能拿起身边的一支筷子。一旦拿起便不会放下筷子直到把饭吃完,此时才把这双筷子放回原处。如果,很不幸地,每个哲学家拿起他或她左边的筷子,那么就没有人可以吃到饭了。
哲学家进餐问题是一个多线程运用的经典例子,涉及到线程同步/互斥,临界区访问问题以及死锁问题。
吃饭时使用筷子:
- 拿一双筷子才能吃
- 每次只允许拿一支筷子
- 只能拿身边的筷子
- 吃完才放下筷子
5个哲学家可能每个人都拿起自己的左筷子,但是却无法拿到自己的右筷子。既无法释放自己的筷子,也等不到别人的筷子完成自己的活动,最终形成死锁。
死锁:两个或多个进程无限期地等待永远不会发生的条件的一种系统状态(结果:每个进程都永远阻塞)
在此问题中死锁为:每个哲学家都无限期的等待邻座哲学家放下筷子!
而邻座哲学家没有吃完饭前不会放下筷子!
邻座哲学家缺一支筷子永远无法吃完饭!
方法一:
每个哲学家对应一个线程,程序中定义一个互斥量,对于每个线程进行访问其他哲学家状态时用互斥量进行加锁,这样也就避免了死锁的产生,访问到该哲学家处于饥饿时,同时旁边两位科学家并未处于进餐状态时,他就拿起左右两边的叉子进行吃饭,吃饭一段时间后,就放下叉子进行思考,思考一段时间后处于饥饿状态,重新开始试图拿起叉子吃饭,代码如下
semaphore chopstick[5]={
1,1,1,1,1};
semaphore room=4;
#include<sys/types.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#include<semaphore.h>
#include<time.h>
#define N 5 //哲学家数量
#define LEFT(i) (i+N-1)%N //左手边哲学家编号
#define RIGHT(i) (i+1)%N //右手边哲家编号
#define HUNGRY 0 //饥饿
#define THINKING 1 //思考
#define EATING 2 //吃饭
#define U_SECOND 1000000 //1秒对应的微秒数
pthread_mutex_t mutex; //互斥量
int state[N]; //记录每个哲学家状态
//每个哲学家的思考时间,吃饭时间,思考开始时间,吃饭开始时间
clock_t thinking_time[N], eating_time[N], start_eating_time[N], start_thinking_time[N];
//线程函数
void *thread_function(void *arg);
int main()
{
pthread_mutex_init(&mutex, NULL);
pthread_t a,b,c,d,e;
//为每一个哲学家开启一个线程,传递哲学家编号
pthread_create(&a,NULL,thread_function,"0");
pthread_create(&b,NULL,thread_function,"1");
pthread_create(&c,NULL,thread_function,"2");
pthread_create(&d,NULL,thread_function,"3");
pthread_create(&e,NULL,thread_function,"4");
//初始化随机数种子
srand((unsigned int)(time(NULL)));
while(1)
{
;
}
}
void *thread_function(void *arg)
{
char *a = (char *)arg;
int num = a[0] - '0'; //根据传递参数获取哲学家编号
int rand_time;
while(1) {
//关键代码加锁
pthread_mutex_lock(&mutex);
//如果该哲学家处于饥饿 并且 左右两位哲学家都没有在吃饭 就拿起叉子吃饭
if (state[num] == HUNGRY && state[LEFT(num)] != EATING && state[RIGHT(num)] != EATING) {
state[num] = EATING;
start_eating_time[num] = clock(); //记录开始吃饭时间
eating_time[num] = (rand() % 5 + 5) * U_SECOND; //随机生成吃饭时间
//输出状态
printf("state: %d %d %d %d %d\n",state[0],state[1],state[2],state[3],state[4]);
//printf("%d is eating\n",num);
} else if (state[num] == EATING) {
//吃饭时间已到 ,开始思考
if (clock() - start_eating_time[num] >= eating_time[num]) {
state[num] = THINKING;
//printf("%d is thinking\n",num);
printf("state: %d %d %d %d %d\n",state[0],state[1],state[2],state[3],state[4]);
start_thinking_time[num] = clock(); //记录开始思考时间
thinking_time[num] = (rand() % 10 + 10) * U_SECOND; //随机生成思考时间
}
} else if (state[num] == THINKING) {
//思考一定时间后,哲学家饿了,需要吃饭
if (clock() - start_thinking_time[num] >= thinking_time[num]) {
state[num] = HUNGRY;
printf("state: %d %d %d %d %d\n",state[0],state[1],state[2],state[3],state[4]);
// printf("%d is hungry\n",num);
}
}
pthread_mutex_unlock(&mutex);
}
}
方法二:
通过互斥信号量 mutex 对哲学家进餐之前取左侧和右侧筷子的操作进行保护,可以防止死锁的出现。
# include <stdio.h>
# include <stdlib.h>
# include <malloc.h>
# include <time.h>
# include <unistd.h>
# include <pthread.h>
# include <semaphore.h>
# define N 5
sem_t chopsticks[N]; //设置5种信号量,有5种不同类型的资源,每一种有1个,这样便于理解,因为每个哲学家需要的资源不同
pthread_mutex_t mutex; //定义互斥锁
int philosophers[N] = {
0, 1, 2, 3, 4}; //代表5个哲学家的编号
void delay (int len) {
int i = rand() % len;
int x;
while (i > 0) {
x = rand() % len;
while (x > 0) {
x--;
}
i--;
}
}
void *philosopher (void* arg) {
int i = *(int *)arg;
int left = i;//左筷子的编号和哲学家的编号相同
int right = (i + 1) % N;//右筷子的编号为哲学家编号+1
while (1) {
printf("哲学家%d正在思考问题\n", i);
delay(60000);
printf("哲学家%d饿了\n", i);
pthread_mutex_lock(&mutex);//加锁
sem_wait(&chopsticks[left]);//此时这个哲学家左筷子的信号量-1之后>=0时,表示能继续执行。
printf("哲学家%d拿起了%d号筷子,现在只有一支筷子,不能进餐\n", i, left);
sem_wait(&chopsticks[right]);
printf("哲学家%d拿起了%d号筷子\n", i, right);
pthread_mutex_unlock(&mutex);//解锁
printf("哲学家%d现在有两支筷子,开始进餐\n", i);
delay(60000);
sem_post(&chopsticks[left]);
printf("哲学家%d放下了%d号筷子\n", i, left);
sem_post(&chopsticks[right]);
printf("哲学家%d放下了%d号筷子\n", i, right);
}
}
int main (int argc, char **argv) {
srand(time(NULL));
pthread_t philo[N];
//信号量初始化
for (int i=0; i<N; i++) {
sem_init(&chopsticks[i], 0, 1);
}
pthread_mutex_init(&mutex,NULL);//初始化互斥锁
//创建线程
for (int i=0; i<N; i++) {
pthread_create(&philo[i], NULL, philosopher, &philosophers[i]);
}
//挂起线程
for (int i=0; i<N; i++) {
pthread_join(philo[i], NULL);
}
//销毁信号量
for (int i=0; i<N; i++) {
sem_destroy(&chopsticks[i]);
}
pthread_mutex_destroy(&mutex);//销毁互斥锁
return 0;
}# include <stdio.h>
# include <stdlib.h>
# include <malloc.h>
# include <time.h>
# include <unistd.h>
# include <pthread.h>
# include <semaphore.h>
# define N 5
sem_t chopsticks[N]; //设置5种信号量,有5种不同类型的资源,每一种有1个,这样便于理解,因为每个哲学家需要的资源不同
pthread_mutex_t mutex; //定义互斥锁
int philosophers[N] = {
0, 1, 2, 3, 4}; //代表5个哲学家的编号
void delay (int len) {
int i = rand() % len;
int x;
while (i > 0) {
x = rand() % len;
while (x > 0) {
x--;
}
i--;
}
}
void *philosopher (void* arg) {
int i = *(int *)arg;
int left = i;//左筷子的编号和哲学家的编号相同
int right = (i + 1) % N;//右筷子的编号为哲学家编号+1
while (1) {
printf("哲学家%d正在思考问题\n", i);
delay(60000);
printf("哲学家%d饿了\n", i);
pthread_mutex_lock(&mutex);//加锁
sem_wait(&chopsticks[left]);//此时这个哲学家左筷子的信号量-1之后>=0时,表示能继续执行。
printf("哲学家%d拿起了%d号筷子,现在只有一支筷子,不能进餐\n", i, left);
sem_wait(&chopsticks[right]);
printf("哲学家%d拿起了%d号筷子\n", i, right);
pthread_mutex_unlock(&mutex);//解锁
printf("哲学家%d现在有两支筷子,开始进餐\n", i);
delay(60000);
sem_post(&chopsticks[left]);
printf("哲学家%d放下了%d号筷子\n", i, left);
sem_post(&chopsticks[right]);
printf("哲学家%d放下了%d号筷子\n", i, right);
}
}
int main (int argc, char **argv) {
srand(time(NULL));
pthread_t philo[N];
//信号量初始化
for (int i=0; i<N; i++) {
sem_init(&chopsticks[i], 0, 1);
}
pthread_mutex_init(&mutex,NULL);//初始化互斥锁
//创建线程
for (int i=0; i<N; i++) {
pthread_create(&philo[i], NULL, philosopher, &philosophers[i]);
}
//挂起线程
for (int i=0; i<N; i++) {
pthread_join(philo[i], NULL);
}
//销毁信号量
for (int i=0; i<N; i++) {
sem_destroy(&chopsticks[i]);
}
pthread_mutex_destroy(&mutex);//销毁互斥锁
return 0;
}
线程池实现
# include <stdio.h>
# include <stdlib.h>
# include <sys/types.h>
# include <sys/stat.h>
# include <fcntl.h>
# include <string.h>
# include <sys/mman.h>
# include <pthread.h>
# include <semaphore.h>
# include <netinet/in.h>
# include <sys/socket.h>
# include <arpa/inet.h>
# include <sys/select.h>
# include <sys/epoll.h>
# include <unistd.h>
# include <dirent.h>
# include <pwd.h>
# include <grp.h>
# include <time.h>
# include <errno.h>
# include <signal.h>
# include <fcntl.h>
# include <sys/wait.h>
# include <stdbool.h>
typedef void Fun(void*);
typedef struct work_i{
Fun* fun;
void *arg;
struct work_i *next;
}work_l;
typedef struct my_pthread_t
{
pthread_t *_pthread;
bool state;
}my_pthread_t;
typedef struct trdpool_t
{
my_pthread_t *phead;
work_l *whead;
pthread_cond_t cond;
pthread_mutex_t mutex;
bool status; //0 线程池不销毁 1 销毁
int num;
}pool_t;
pool_t *pool = NULL;
void print_pthread_self(void *arg)
{
printf("pthread id = %ld\n",pthread_self());
sleep(2);
return ;
}
void *start_routine(void *arg)
{
bool *state = arg;
work_l *work;
while (1) {
pthread_mutex_lock(&pool->mutex);
while ( pool->whead == NULL && pool->status == 0) {
//一个小朋友把蛋糕抢走了,那么其他小朋友醒来再继续睡。
fprintf(stdout,"thread %ld is ready \n",pthread_self());
*state = 1;
pthread_cond_wait(& pool->cond,& pool->mutex);
fprintf(stdout,"thread %ld is weak up \n",pthread_self());
}
if ( pool->status == 0) {
work = pool->whead;
pool->whead = pool->whead->next;
pthread_mutex_unlock(& pool->mutex);
(work->fun)(work->arg);
free(work);
} else {
pthread_mutex_unlock(& pool->mutex);
pthread_exit(NULL);
}
}
}
void add_work()
{
work_l* new_work = malloc(sizeof(work_l));
work_l* old_work;
new_work->arg = NULL;
new_work->fun = print_pthread_self;
new_work->next = NULL;
pthread_mutex_lock(& pool->mutex);
if ( pool->whead != NULL) {
old_work = pool->whead;
pool->whead = new_work;
new_work->next = old_work;
} else {
pool->whead = new_work;
}
pthread_cond_signal(& pool->cond);
pthread_mutex_unlock(& pool->mutex);
}
void pool_init(int num)
{
pool=malloc(sizeof(pool_t));
pthread_mutex_init(& pool->mutex,NULL);
pthread_cond_init(& pool->cond,NULL);
pool->status = 0;
pool->phead = malloc(sizeof(my_pthread_t)*num);
pool->num = num;
for (int i=0; i<num; i++) {
pool->phead[i]._pthread = malloc(sizeof(pthread_t)*num);
pool->phead[i].state = 0;
bool *pthread_arg = &(pool->phead[i].state);
pthread_create(pool->phead[i]._pthread,NULL,start_routine,(void*)pthread_arg);
lblready:{
if(pool->phead[i].state == 0){
usleep(400);
goto lblready;
}
}
}
}
void pool_destory()
{
puts("the pool begin to destory !");
work_l *work;
pool->status = 1;
pthread_mutex_lock(& pool->mutex);
pthread_cond_broadcast(& pool->cond);
pthread_mutex_unlock(& pool->mutex);
for (int i=0; i< pool->num; i++) {
pthread_join(*(pool->phead[i]._pthread),NULL);
free(pool->phead[i]._pthread);
}
free( pool->phead);
work = pool->whead;
while (pool->whead) {
pool->whead = pool->whead->next;
free(work);
work = pool->whead;
}
pthread_cond_destroy(& pool->cond);
pthread_mutex_lock(& pool->mutex);
pthread_mutex_destroy(& pool->mutex);
free(pool);
puts("the pool destory !");
}
int main(int argc,char**argv)
{
pool_init(5);
for(int i=0; i<10; i++)
add_work();
sleep(10);
pool_destory();
return 0;
}