【关于Linux中----信号量及其使用场景】
文章目录
一、解释信号量
1.1 概念的引入
我们知道,一个线程在访问临界资源时,临界资源必须要是满足条件的。但是,在线程访问资源前,无法得知这块资源是否满足生产或消费的条件。所以,线程只能先对这块资源加锁,然后检测其是否满足条件,再进行操作,最后再释放锁。可是,检测的过程本质上也是在访问临界资源。
只要一个线程对一块资源加了锁,就默认该线程对这个资源的整体使用。
但实际情况中可能存在,一份公共资源是允许多个线程同时访问其中的不同区域的。所以,在这种情况下,一个线程要访问资源,就必须先申请信号量。
信号量的本质是一把衡量临界资源中资源数量多少的计数器,拥有信号量就意味着,在未来一定能够拥有临界资源的一部分。申请信号量的本质是对临界资源中特定某一部分资源的预定机制。
所以,有了信号量,就意味着在访问临界资源之前,就可以知道临界资源的使用情况。换言之,如果申请信号量成功,就说明临界资源中一定有可以访问的资源;失败说明不满足条件,必须进行等待。所以,申请信号量成功与否,就能说明是否可以访问临界资源。这样也就不需要先进行判断了。
1.2 信号量操作和使用接口
首先,线程要访问临界资源中的某一部分,就必须先申请信号量。也就是说,信号量要能够被所有线程看到,即信号量本身是公共资源。
而因为信号量是衡量资源中资源数量多少的计数器,所以当线程访问资源的时候,它必须进行–操作;当线程归还资源的时候,它必须进行++操作。而为了保证++、–的过程不会被其他线程打断,就必须保证操作的原子性。其中,信号量–的操作叫做P操作,++的操作叫做V操作。而信号量的核心操作就是PV操作。
信号量基本使用接口如下:
①初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值
②销毁信号量
int sem_destroy(sem_t *sem);
③等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
④发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
二、信号量使用场景
2.1 引入环形队列&&生产消费问题
经过前面的铺垫,想必大家已经对信号量和互斥锁适合使用的场景有了大致的轮廓。
互斥锁更适用于一整块的临界资源,而信号量更适用于看似是一块临界资源,但其实是可以分成一个个小部分的资源块的资源。
所以,这里引入一个符合条件的适用于信号量的存储资源的结构----环形队列。
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。(这里采用计数器的方式,也就是使用信号量)
具体的细节实现就不解释了,相信大家学到这个程度已经都熟稔于心了。
而这里,我们让生产者和消费者都访问这个环形队列,生产者向队列中写入数据,而消费者从队列中读取数据(相当于把数据弹出),该过程中二者应该是并发的。
写代码之前,需要知道环形队列为空和为满的时候,生产者和消费者是在同一个位置的,其他情况下都不在同一位置。
更重要的“游戏规则”是,消费者在队列中的位置一定不能超过生产者(未生产不能消费),生产者不能将消费者“套圈”(队列满了就不能再放入)。而队列为空时,生产者先访问队列,为满时,消费者先访问队列。
所以,只有队列为空和为满的时候,生产者消费者才存在同步和互斥的问题!
对于生产者来说,看中的时队列中的剩余空间;对于消费者而言,看中的是放入队列中的数据。所以,在实现代码时,我们应该定义两个信号量,分别用来维护空间资源和数据资源。
2.2 代码实现
首先还是老规矩,定义一个环形队列的类,文件为RingQueue.hpp,内容如下:
#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <cassert>static const int gcap=5;template<class T>
class RingQueue
{
private:void P(sem_t& sem){int n=sem_wait(&sem);assert(n==0);(void)n;}void V(sem_t& sem){int n=sem_post(&sem);assert(n==0);(void)n;}
public:RingQueue(const int& cap=gcap):_queue(cap),_cap(cap){int n=sem_init(&_spaceSem,0,_cap);assert(n==0);n=sem_init(&_dataSem,0,0);assert(n==0);_productorStep=_consumerStep=0;}void Push(const T& in){P(_spaceSem);//申请空间信号量成功就一定能进行生产_queue[_productorStep++]=in;_productorStep%=_cap;V(_dataSem);}void Pop(T* out){P(_dataSem);*out=_queue[_consumerStep++];_consumerStep%=_cap;V(_spaceSem);}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);}
private:std::vector<T> _queue;int _cap;//队列容量sem_t _spaceSem;//生产者看重的空间资源信号量sem_t _dataSem;//消费者看重的数据资源信号量int _productorStep;int _consumerStep;
};
然后,在Main.cc中就可以用这个类来完成生产者和消费者各自的任务了,内容如下:
#include "RingQueue.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>void* ProductorRoutine(void* rq)
{RingQueue<int>* ringqueue=static_cast<RingQueue<int>*>(rq);while(true){sleep(2);int data=rand()%10+1;ringqueue->Push(data);std::cout<<"生产完成,生产数据: "<<data<<std::endl;}
}void* ConsumerRoutine(void* rq)
{RingQueue<int>* ringqueue=static_cast<RingQueue<int>*>(rq);while(true){int data;ringqueue->Pop(&data);std::cout<<"消费完成,消费数据: "<<data<<std::endl;}
}int main()
{srand((unsigned int)time(nullptr)^getpid()^pthread_self());RingQueue<int>* rq=new RingQueue<int>();pthread_t c,p;pthread_create(&p,nullptr,ProductorRoutine,rq);pthread_create(&c,nullptr,ConsumerRoutine,rq);pthread_join(p,nullptr);pthread_join(c,nullptr);delete rq;return 0;
}
Makefile内容如下:
ringqueue:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f ringqueue
需要注意的是,上面代码中设置的生产者每一次生产之前都要休眠两秒,而对消费者不做处理。所以代码执行结果一定是生产者每生产一次,消费者就能立刻消费。
运行结果如下:
[sny@VM-8-12-centos circlequeue]$ ./ringqueue
生产完成,生产数据: 7
消费完成,消费数据: 7
生产完成,生产数据: 7
消费完成,消费数据: 7
生产完成,生产数据: 8
消费完成,消费数据: 8
^C
可见,结果和预测相同。
除此之外,我们也可以用在之前的文章中封装过的任务派发类,来给生产者派发任务,而让消费者处理任务。
新建Task.hpp文件内容如下:
#pragma once
#include <iostream>
#include <functional>
#include <cstdio>class Task
{using func_t =std::function<int(int,int,char)>;
public:Task(){}Task(int x,int y,char op,func_t func):_x(x),_y(y),_op(op),_callback(func){}std::string operator()(){int result=_callback(_x,_y,_op);char buffer[1024];snprintf(buffer,sizeof buffer,"%d %c %d = %d",_x,_op,_y,result);return buffer;}std::string toTaskString(){char buffer[1024];snprintf(buffer,sizeof buffer,"%d %c %d = ?",_x,_op,_y);return buffer;}
private:int _x;int _y;func_t _callback;char _op;
};const std::string oper="+-*/%";int mymath(int x,int y,char op)
{int result=0;switch(op){case '+':result= x+y;break;case '-':result= x-y;break;case '*':result= x*y;break;case '/':if(y==0){std::cerr<<"div zero error!"<<std::endl;result=-1;} elseresult=x/y;break;case '%':if(y==0){std::cerr<<"mod zero error!"<<std::endl;result=-1;} elseresult=x%y;break;default:break;}return result;
}
对Main.cc内容稍作修改,如下:
#include "RingQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <ctime>
#include <cstdlib>
#include <unistd.h>
#include <sys/types.h>void* ProductorRoutine(void* rq)
{RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>*>(rq);while(true){sleep(2);//获取任务int x=rand()%1000;int y=rand()%1500;char op=oper[rand()%oper.size()];Task t(x,y,op,mymath);//生产任务ringqueue->Push(t);std::cout<<"生产者派发任务: "<<t.toTaskString()<<std::endl;}
}void* ConsumerRoutine(void* rq)
{RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>*>(rq);while(true){Task t;//消费任务ringqueue->Pop(&t);std::string result=t();std::cout<<"消费者消费任务: "<<result<<std::endl;}
}int main()
{srand((unsigned int)time(nullptr)^getpid()^pthread_self());RingQueue<Task>* rq=new RingQueue<Task>();pthread_t c,p;pthread_create(&p,nullptr,ProductorRoutine,rq);pthread_create(&c,nullptr,ConsumerRoutine,rq);pthread_join(p,nullptr);pthread_join(c,nullptr);delete rq;return 0;
}
运行结果如下:
[sny@VM-8-12-centos circlequeue]$ ./ringqueue
生产者派发任务: 912 % 178 = ?
消费者消费任务: 912 % 178 = 22
生产者派发任务: 282 * 951 = ?
消费者消费任务: 282 * 951 = 268182
生产者派发任务: 658 % 173 = ?
消费者消费任务: 658 % 173 = 139
^C
2.3 对于多生产多消费的情况
上面的代码中实现的很明显是单生产单消费的情况,那么如果有多个生产者和多个消费者又该如何实现呢?
要知道的是,不管有多少个生产者和消费者,一次只能有一个生产者和一个消费者访问环形队列。所以,应该让生产者和消费者之间决出一个竞争能力较强的线程,进而又去执行单生产单消费的任务。由于生产者和生产者之间、消费者和消费者之间是互斥的关系,所以一定要有两把锁分别控制生产者和消费者。
所以,再对代码做出修改。
Main.cc内容如下:
std::string ThreadName()
{char name[128];snprintf(name,sizeof(name),"thread[0x%x]",pthread_self());return name;
}void* ProductorRoutine(void* rq)
{RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>*>(rq);while(true){sleep(2);//获取任务int x=rand()%1000;int y=rand()%1500;char op=oper[rand()%oper.size()];Task t(x,y,op,mymath);//生产任务ringqueue->Push(t);std::cout<<ThreadName()<<",生产者派发任务: "<<t.toTaskString()<<std::endl;}
}void* ConsumerRoutine(void* rq)
{RingQueue<Task>* ringqueue=static_cast<RingQueue<Task>*>(rq);while(true){Task t;//消费任务ringqueue->Pop(&t);std::string result=t();std::cout<<ThreadName()<<",消费者消费任务: "<<result<<std::endl;}
}int main()
{srand((unsigned int)time(nullptr)^getpid()^pthread_self());RingQueue<Task>* rq=new RingQueue<Task>();pthread_t c[8],p[4];for(int i=0;i<4;i++){pthread_create(p+i,nullptr,ProductorRoutine,rq);}for(int i=0;i<8;i++){pthread_create(c+i,nullptr,ConsumerRoutine,rq);}for(int i=0;i<4;i++){pthread_join(p[i],nullptr);}for(int i=0;i<8;i++){pthread_join(c[i],nullptr);}delete rq;return 0;
}
RingQueue.hpp内容如下:
template<class T>
class RingQueue
{
private:void P(sem_t& sem){int n=sem_wait(&sem);assert(n==0);(void)n;}void V(sem_t& sem){int n=sem_post(&sem);assert(n==0);(void)n;}
public:RingQueue(const int& cap=gcap):_queue(cap),_cap(cap){int n=sem_init(&_spaceSem,0,_cap);assert(n==0);n=sem_init(&_dataSem,0,0);assert(n==0);_productorStep=_consumerStep=0;pthread_mutex_init(&_pmutex,nullptr);pthread_mutex_init(&_cmutex,nullptr);}void Push(const T& in){pthread_mutex_lock(&_pmutex);P(_spaceSem);//申请空间信号量成功就一定能进行生产_queue[_productorStep++]=in;_productorStep%=_cap;V(_dataSem);pthread_mutex_unlock(&_pmutex);}void Pop(T* out){pthread_mutex_lock(&_cmutex);P(_dataSem);*out=_queue[_consumerStep++];_consumerStep%=_cap;V(_spaceSem);pthread_mutex_unlock(&_cmutex);}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);pthread_mutex_destroy(&_pmutex);pthread_mutex_destroy(&_cmutex);}
private:std::vector<T> _queue;int _cap;//队列容量sem_t _spaceSem;//生产者看重的空间资源信号量sem_t _dataSem;//消费者看重的数据资源信号量int _productorStep;int _consumerStep;pthread_mutex_t _pmutex;pthread_mutex_t _cmutex;
};
执行结果如下:
[sny@VM-8-12-centos circlequeue]$ ./ringqueue
thread[0xbaaaa700],生产者派发任务: 730 * 1478 = ?
thread[0xb8aa6700],消费者消费任务: 881 / 481 = 1
thread[0xb82a5700],消费者消费任务: 2 + 874 = 876
thread[0xba2a9700],生产者派发任务: 334 / 1437 = ?
thread[0xb92a7700],消费者消费任务: 334 / 1437 = 0
thread[0xbb2ab700],生产者派发任务: 881 / 481 = ?
thread[0xbaaaa700],生产者派发任务: 990 * 373 = ?
thread[0xb72a3700],消费者消费任务: 990 * 373 = 369270
thread[0xba2a9700],生产者派发任务: 590 + 693 = ?
thread[0xb9aa8700],生产者派发任务: 985 - 912 = ?
thread[0xb72a3700],消费者消费任务: 590 + 693 = 1283
^C
2.4 申请信号量和加锁的顺序问题
现在来谈一下,是先加锁好,还是先申请信号量好?
答案是先申请信号量更好。
因为首先申请信号量的过程本来就是原子的,不需要将其放在申请锁之后。
其次,如果先申请锁,那么没有申请到锁的线程什么也干不了,整个过程只有申请到锁的那一个线程在“忙前忙后”。而如果先申请信号量,则申请到信号量的线程可以去申请锁,而其他线程也可以同时在申请信号量,明显提高了效率。
当然,两种方式运行时间的长短,感兴趣的读者可以将上面的代码复制粘贴,然后修改信号量和锁的先后位置,运行观察一下,这里就不演示了。
2.5 多生产多消费的意义
这个话题跟上一篇文章中----阻塞队列中多线程的意义是一样的。
即一个线程在访问队列的时候,其他的线程也可以获取和执行任务,提升了效率。
本篇完,青山不改,绿水长流!