> 文章列表 > 2.17、多生产者-多消费者进程

2.17、多生产者-多消费者进程

2.17、多生产者-多消费者进程

桌子上有一只盘子,每次只能向其中放入一个水果。爸爸专向盘子中放苹果,妈妈专向盘子中放橘子,儿子专等着吃盘子中的橘子,女儿专等着吃盘子中的苹果。只有盘子空时,爸爸或妈妈才可向盘子中放一个水果。仅当盘子中有自己需要的水果时,儿子或女儿可以从盘子中取出水果。

2.17、多生产者-多消费者进程

生产者生产不同的产品,消费者消费相应的产品


1、问题分析

① 关系分析。找出题目中描述的各个进程,分析它们之间的同步、互斥关系。

互斥关系:

  • 对缓冲区(盘子)的访问要互斥地进行同步关系(一前一后):

同步关系(一前一后)

  1. 父亲将苹果放入盘子后,女儿才能取苹果

  2. 母亲将橘子放入盘子后,儿子才能取橘子

  3. 只有盘子为空时,父亲或母亲才能放入水果

    • “盘子为空”这个事件可以由儿子或女儿触发,事件发生后才允许父亲或母亲放水果

② 整理思路。根据各进程的操作流程确定 PV 操作的大致顺序。

互斥:在临界资源前后分别 PV

同步:前 VP

  • 在前操作后面进行 V 操作
  • 在后操作前面进行 P 操作

③ 设置信号量。设置需要的信号量,并根据题目条件确定信号量初值。

  • (互斥信号量初值一般为 1,同步信号量的初始值要看对应资源的初始值是多少)

2.17、多生产者-多消费者进程

2、具体实现

2.17、多生产者-多消费者进程

2.17、多生产者-多消费者进程

2.17、多生产者-多消费者进程


问题:可不可以不用互斥信号量?

2.17、多生产者-多消费者进程

分析:刚开始,儿子、女儿进程即使上处理机运行也会被阻塞。如果刚开始是父亲进程先上处理机运行,则:
父亲 P(plate),可以访问盘子 → 母亲 P(plate),阻塞等待盘子 → 父亲放入苹果 V(apple),女儿进程被唤醒,其他进程即使运行也都会阻塞,暂时不可能访问临界资源(盘子) → 女儿 P(apple) ,访问盘子,V(plate) ,等待盘子的母亲进程被唤醒→母亲进程访问盘子(其他进程暂时都无法进入临界区)>…

结论:即使不设置专门的互斥变量 mutex,也不会出现多个进程同时访问盘子的现象

原因在于

  • 本题中的缓冲区大小为 1,在任何时刻,appleorangeplate 三个同步信号量中最多只有一个是1。

  • 因此在任何时刻,最多只有一个进程的P操作不会被阻塞,并顺利地进入临界区…


若盘子容量设置为 2

父亲 P(plate),可以访问盘子 → 母亲 P(plate),可以访问盘子 → 父亲在往盘子里放苹果,同时母亲也可以往盘子里放橘子。

  • 于是就出现了两个进程同时访问缓冲区的情况,有可能导致两个进程写入缓冲区的数据相互覆盖的情况。

因此,如果缓冲区大小大于 111,就必须专门设置一个互斥信号量 mutex 来保证互斥访问缓冲区。

3、Java 案例

import java.util.Deque;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;class ProducerConsumerByLock {//互斥资源private static Deque<String> queueApple = new LinkedList<>();private static Deque<String> queueOrange = new LinkedList<>();private static int maxSize = 1;private static Lock lock = new ReentrantLock(false);private static Condition apple = lock.newCondition();private static Condition orange = lock.newCondition();private static Condition plate = lock.newCondition();public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture.runAsync(new Dad());CompletableFuture.runAsync(new Mom());CompletableFuture.runAsync(new Daughter());CompletableFuture.runAsync(new Son());//CompletableFuture.runAsync(new Consumer()); 产生的进程默认是守护进程try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}public static class Dad implements Runnable {//放入苹果public void put() {while (true) {lock.lock();try {while (queueApple.size() == maxSize || queueOrange.size() == maxSize) {System.out.println("盘子不为空, 父亲阻塞");try {plate.await();} catch (InterruptedException e) {e.printStackTrace();}}//放入苹果queueApple.offerLast("apple");System.out.println("父亲放入了 1 个苹果");//通知苹果的所有消费者apple.signalAll();} finally {lock.unlock();}try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void run() {put();}}public static class Mom implements Runnable {//放入橘子public void put() {while (true) {lock.lock();try {while (queueOrange.size() == maxSize || queueApple.size() == maxSize) {System.out.println("盘子不为空, 母亲阻塞");try {plate.await();} catch (InterruptedException e) {e.printStackTrace();}}//放入橘子queueOrange.offerLast("orange");System.out.println("母亲放入了 1 个橘子");//通知橘子的所有消费者orange.signalAll();} finally {lock.unlock();}try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void run() {put();}}public static class Son implements Runnable {//获取橘子public void get() {while (true) {String v = "";lock.lock();try {while (queueOrange.isEmpty()) {System.out.println("盘子为空或者没有橘子,儿子阻塞");try {orange.await();} catch (InterruptedException e) {e.printStackTrace();}}//获取橘子v = queueOrange.poll();System.out.println("儿子吃了一个橘子: " + v);//通知所有生产者, 父亲和母亲plate.signalAll();} finally {lock.unlock();}//防止一瞬间消费完成try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void run() {get();}}public static class Daughter implements Runnable {//获取苹果public void get() {String v = "";while (true) {lock.lock();try {while (queueApple.isEmpty()) {System.out.println("盘子为空或者没有苹果,女儿阻塞");try {apple.await();} catch (InterruptedException e) {e.printStackTrace();}}//获取橘子v = queueApple.poll();System.out.println("女儿吃了一个苹果: " + v);//通知所有生产者, 父亲和母亲plate.signalAll();} finally {lock.unlock();}//防止一瞬间消费完成try {Thread.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}@Overridepublic void run() {get();}}
}

stdout:

父亲放入了 1 个苹果
盘子不为空, 母亲阻塞
女儿吃了一个苹果: apple
母亲放入了 1 个橘子
儿子吃了一个橘子: orange
父亲放入了 1 个苹果
盘子为空或者没有橘子,儿子阻塞
女儿吃了一个苹果: apple
母亲放入了 1 个橘子
儿子吃了一个橘子: orange
盘子为空或者没有橘子,儿子阻塞
盘子为空或者没有苹果,女儿阻塞
父亲放入了 1 个苹果
女儿吃了一个苹果: apple
母亲放入了 1 个橘子
儿子吃了一个橘子: orange
母亲放入了 1 个橘子
盘子不为空, 父亲阻塞
儿子吃了一个橘子: orange
父亲放入了 1 个苹果
女儿吃了一个苹果: apple
盘子为空或者没有橘子,儿子阻塞
母亲放入了 1 个橘子
儿子吃了一个橘子: orange
盘子为空或者没有苹果,女儿阻塞

4、总结

总结:在生产者-消费者问题中,如果缓冲区大小为 1 ,那么有可能不需要设置互斥信号量就可以实现互斥访问缓冲区的功能。

  • 当然,这不是绝对的,要具体问题具体分析。

建议:在考试中如果来不及仔细分析,可以加上互斥信号量,保证各进程一定会互斥地访问缓冲区。

  • 但需要注意的是,实现互斥的P操作一定要在实现同步的 P 操作之后,否则可能引起 “死锁” 。

解决 “多生产者-多消费者问题” 的关键在于理清复杂的同步关系。

在分析同步问题(一前一后问题)的时候不能从单个进程行为的角度来分析,要把“一前一后”发生的事看做是两种“事件”的前后关系。

比如,如果从单个进程行为的角度来考虑的话,我们会有以下结论:

  • 如果盘子里装有苹果,那么一定要女儿取走苹果后父亲或母亲才能再放入水果\\color{red}如果盘子里装有苹果,那么一定要女儿取走苹果后父亲或母亲才能再放入水果如果盘子里装有苹果,那么一定要女儿取走苹果后父亲或母亲才能再放入水果
  • 如果盘子里装有橘子,那么一定要儿子取走橘子后父亲或母亲才能再放入水果\\color{red}如果盘子里装有橘子,那么一定要儿子取走橘子后父亲或母亲才能再放入水果如果盘子里装有橘子,那么一定要儿子取走橘子后父亲或母亲才能再放入水果
  • 这么看是否就意味着要设置四个同步信号量分别实现这四个“一前一后”的关系了?

2.17、多生产者-多消费者进程

正确的分析方法应该从“事件\\textcolor{red}{事件}事件”的角度来考虑

  • 我们可以把上述四对 “进程行为的前后关系” 抽象为一对 “事件的前后关系
  • 盘子变空事件 → 放入水果事件。“盘子变空事件” 既可由儿子引发,也可由女儿引发;“放水果事件”"既可能是父亲执行,也可能是母亲执行。这样的话,就可以用一个同步信号量解决问题了

2.17、多生产者-多消费者进程