> 文章列表 > 【Hello Linux】生产者消费者模式

【Hello Linux】生产者消费者模式

【Hello Linux】生产者消费者模式

作者:@小萌新
专栏:@Linux
作者简介:大二学生 希望能和大家一起进步!
本篇博客简介:简单介绍linux中线程同步

生产者消费者模型

  • 生活中的生产消费模型
  • 计算机中的生产者消费者模型
    • 三种关系
  • 基于BlockingQueue的生产者消费者模型
    • 基于阻塞队列的生产者消费者模型
    • 模拟实现基于阻塞队列的生产消费模型

生活中的生产消费模型

在我们的生活中就有一个典型的生产者消费者模型–超市

【Hello Linux】生产者消费者模式

对于一家超市来说我们是消费者 而各种供应商就是生产者

这里引申出来一个问题

为什么要有超市的存在呢?

在我们的现实生活中一般的供货商就是开在偏远郊区的一些工厂

如果说我们为了买一根火腿肠或买一瓶水而大费周章的跑到几十公里外去效率未免有些太低了

所以说超市是为了提高效率而存在的 它的本质是收集消费者的需求

同时它会将生产者和消费者的行为进行解耦

我们在吃火腿肠的时候供货商也可以不停的造火腿肠

计算机中的生产者消费者模型

在我们的计算机中实际上生产者消费者都是线程

三种关系

我们首先来看计算机中生产者和消费者的三种关系

生产者和生产者

比如说现在有两家供货商 都是制造同一种矿泉水的 现在这两家供货商都想要给同一家超市供货

那么很显然这两家供货商之间是竞争关系 并且他们还是互斥的

消费者和消费者

消费者和消费者的关系也是竞争和互斥的 大家可能觉得奇怪 自己去超市买东西也没有和其他人产生冲突为什么是一个竞争关系 实际上这是资源充足的结果

如果说现在特别干旱 并且超市里面只剩下最后一瓶水了 那么消费者之间就会开始竞争这瓶水了

生产者和消费者

现在大家想象一下这个场景 超市里面现在有一个小货架是专门放火腿肠的

消费者现在跑过来准备买一根火腿肠 而生产者准备过来供货

很不巧的是 现在货架上一根火腿肠也没有了 但是这个消费者比较轴 一直在货架上找 看看有没有剩下的火腿肠 此时生产者就供不了货了 所以说生产者和消费者此时是一个互斥的关系 但是这种关系实际上是很不健康的 所以说最好还要加上一个同步来让生产消费正常的进行

总结下: 其实我们的生产者和消费者模型可以总结成一个321模型

  • 三种关系: 生产者和生产者(互斥关系) 消费者和消费者(互斥关系) 生产者和消费者(互斥关系、同步关系)
  • 两种角色: 生产者和消费者(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区 (可以自己通过某种方式组织起来)

我们编写代码的时候本质就是对于这一个交易场所中两种角色的三种关系进行维护

基于BlockingQueue的生产者消费者模型

基于阻塞队列的生产者消费者模型

在多线程编程中 阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构

【Hello Linux】生产者消费者模式

阻塞队列和普通队列的区别在于

  • 当队列为空时 从队列获取元素的操作将会被阻塞 直到队列中放入了元素
  • 当队列满时 往队列里存放元素的操作会被阻塞 直到有元素从队列中取出

从上面的描述我们很容易就能联想到管道 而实际上管道的底层就是使用阻塞队列来实现的

模拟实现基于阻塞队列的生产消费模型

为了便于大家学习 我们下面使用单生产者和消费者来进行实现

【Hello Linux】生产者消费者模式

我们首先要实现一个阻塞队列 它的代码框架如下

#include <istream>
using namespace std;
#include <queue>
#include <pthread.h>namespace b_q
{template<class T>class block_queue{public:   block_queue() // 构造{}~block_queue() // 析构{}public:void push(){// 生产数据}void pop(){// 取走数据}void LockQueue(){// 上锁保证原子性}void UnLockQueue(){// 解锁}private:queue<T> bq_; //阻塞队列int cap_; //队列大小pthread_mutex_t mtx_; //保护线程安全pthread_cond_t full_; //线程同步 pthread_cond_t empty_; //线程同步 };
}

我们封装C++ STL库中的队列来完成我们的阻塞队列

其中我们使用cap来获取这个队列的大小来得知队列满和队列空的情况

此外它还需要一些条件变量和互斥量来实现互斥和同步

像这个队列中push就是生产数据 pop就是拿走数据

接下来就是接口函数的实现了 代码表示如下

#include <iostream>
#include <pthread.h>
#include <queue>
#include <unistd.h>#define NUM 5template<class T>
class BlockQueue
{
private:bool IsFull(){return _q.size() == _cap;}bool IsEmpty(){return _q.empty();}
public:BlockQueue(int cap = NUM): _cap(cap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_full, nullptr);pthread_cond_init(&_empty, nullptr);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_full);pthread_cond_destroy(&_empty);}//向阻塞队列插入数据(生产者调用)void Push(const T& data){pthread_mutex_lock(&_mutex);while (IsFull()){//不能进行生产,直到阻塞队列可以容纳新的数据pthread_cond_wait(&_full, &_mutex);}_q.push(data);pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_empty); //唤醒在empty条件变量下等待的消费者线程}//从阻塞队列获取数据(消费者调用)void Pop(T& data){pthread_mutex_lock(&_mutex);while (IsEmpty()){//不能进行消费,直到阻塞队列有新的数据pthread_cond_wait(&_empty, &_mutex);}data = _q.front();_q.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_full); //唤醒在full条件变量下等待的生产者线程}
private:std::queue<T> _q; //阻塞队列int _cap; //阻塞队列最大容器数据个数pthread_mutex_t _mutex;pthread_cond_t _full;pthread_cond_t _empty;
};

接下来我们只需要写出一个主函数 生产者不断的生产数据 消费者不断的消费数据就可以了

#include "BlockQueue.hpp"void* Producer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//生产者不断进行生产while (true){sleep(1);int data = rand() % 100 + 1;bq->Push(data); //生产数据std::cout << "Producer: " << data << std::endl;}
}
void* Consumer(void* arg)
{BlockQueue<int>* bq = (BlockQueue<int>*)arg;//消费者不断进行消费while (true){sleep(1);int data = 0;bq->Pop(data); //消费数据std::cout << "Consumer: " << data << std::endl;}
}
int main()
{srand((unsigned int)time(nullptr));pthread_t producer, consumer;BlockQueue<int>* bq = new BlockQueue<int>;//创建生产者线程和消费者线程pthread_create(&producer, nullptr, Producer, bq);pthread_create(&consumer, nullptr, Consumer, bq);//join生产者线程和消费者线程pthread_join(producer, nullptr);pthread_join(consumer, nullptr);delete bqreturn 0;
}