问题描述
在学习多线程编程的时候,我们经常会碰到的一个问题就是producer consumer问题。这个概念在学习操作系统的时候也会常碰到。本身这个问题的描述比较简单。假设我们有两个进程或者线程。他们俩就好比是工厂里一条流水线上的两个车间。其中线程A生产的部件要交给下一个线程B来做进一步的处理。这个时候,线程A就相当于一个生产者,而线程B就相当于一个消费者。在生产者生产了一个部件之后,它们会通过一个传送带传递这个部件。也就是说,线程A负责往传送带上面放部件,而线程B负责从传送带上面取部件。这样,两个线程在各自运行的时候需要访问同样的一个资源。那么,我们该怎么来有效的解决这个问题呢?
解决方法讨论
根据前面的假定条件,我们可以将两个线程之间要共同访问的部分定义为一个数组。那么,对于这两个线程来说,他们在一定程度上是独立的。一个线程可以往共享资源队列里放东西,另外一个可以从里面取东西。对于producer线程来说,它需要考虑的就是如果我往资源队列里放东西时,队列满了,那么我必须要等待consumer线程取走一部分元素,这样才能继续进行。另外,和其他线程访问同一个资源队列的时候,还要保证访问是线程安全的。同样,对于consumer线程来说,如果资源队列是空的,同样,该线程也必须要等待producer线程将产生的元素放入队列。
根据这部分的讨论,我们问题的核心就在于怎么样使得一个线程和另外一个线程互斥的访问同一个资源呢?另外,如果一个线程发现目前资源情况不适合自己运行,又该怎么去停止自己而让其他的线程来运行呢? 这里,我们实际上有几种办法。
wait, notify
在java里面,如果我们希望一个线程在已经占有资源的锁的情况下先中止自己的执行并释放锁,那么就需要调用wait方法。这样,我们前面的第二个问题就得到了解答。如果我们在已经完成了自己的那部分操作,需要释放锁并要其他的线程来继续使用时呢?我们可以通过调用notify和notifyAll方法来通知其他因为资源被加锁之后处于阻塞状态的线程。
通过这一部分的讨论,我们可以得出一个线程producer的大致流程如下:
1. 如果资源队列满,则调用wait方法释放所,一直等待到资源队列有空缺。
2. 在资源队列加入新的元素。
3. 调用notify/notifyAll方法来唤醒其他等待的线程。
这是Producer部分的代码:
public class Producer implements Runnable { /** * Store to work with */ private EventStorage storage; /** * Constructor of the class. Initialize the storage. * @param storage The store to work with */ public Producer(EventStorage storage){ this.storage=storage; } /** * Core method of the producer. Generates 100 events. */ @Override public void run() { for (int i=0; i<100; i++){ storage.set(); } } }
调用wait和notify部分的代码在storage.set的方法定义中:
public synchronized void set() { while(storage.size() == maxSize) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } storage.offer(new Date()); System.out.printf("Set: %d", storage.size()); notifyAll(); }
这里,我们用一个循环来一直判断资源队列的状况。
对于Consumer来说,我们也可以有一个类似的流程,只是它被阻塞的条件是资源队列为空,需要等待Producer产生新的元素。Consumer的代码如下:
public class Consumer implements Runnable { /** * Store to work with */ private EventStorage storage; /** * Constructor of the class. Initialize the storage * @param storage The store to work with */ public Consumer(EventStorage storage){ this.storage=storage; } /** * Core method for the consumer. Consume 100 events */ @Override public void run() { for (int i=0; i<100; i++){ storage.get(); } } }
storage.get方法里定义了具体怎么使用wait, notify的过程:
public synchronized void get() { while(storage.size() == 0) { try { wait(); } catch(InterruptedException e) { e.printStackTrace(); } } System.out.printf("Get: %d: %s", storage.size(), storage.poll()); notifyAll(); }
这里还有一个需要注意的地方就是,我们的get, set方法都使用了synchronized的修饰,表示它是线程互斥的。从我们对synchronized的理解就知道,对于线程访问,它每次只有一个线程能够获得这个锁来进入这个方法。而我们使用wait方法的时候,却能够在获得这个锁的情况下,将锁释放了。就是这个wait方法,挺神奇的,在这里能够实现我们一个很关键的要求。不然当我们拿到锁又不满足执行条件想要释放的时候就很麻烦。后面附件里有示例代码的完整实现。
BlockingQueue
前面我们是用的wait, notify的方式来实现的释放锁和通知线程机制。实际上在java里也有一些数据结构已经帮我们封装好了互斥的机制。一个典型的就是BlockingQueue。BlockingQueue是一个定义的接口,具体的实现里有ArrayBlockingQueue, LinkedBlockingQueue。参照前面的问题,我们也可以写一个简单的使用BlockingQueue的演示程序。
Producer:
import java.util.concurrent.BlockingQueue; public class Producer implements Runnable { private BlockingQueue<Message> queue; public Producer(BlockingQueue<Message> q) { this.queue = q; } @Override public void run() { for(int i = 0; i < 100; i++) { Message msg = new Message("" + i); try { Thread.sleep(i); queue.put(msg); System.out.println("Produced " + msg.getMsg()); } catch(InterruptedException e) { e.printStackTrace(); } } Message msg = new Message("exit"); try { queue.put(msg); } catch(InterruptedException e) { e.printStackTrace(); } } }
Consumer:
import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable { private BlockingQueue<Message> queue; public Consumer(BlockingQueue<Message> q) { this.queue = q; } @Override public void run() { try { Message msg; while((msg = queue.take()).getMsg() != "exit") { Thread.sleep(10); System.out.println("Consumed " + msg.getMsg()); } } catch(InterruptedException e) { e.printStackTrace(); } } }
ProducerConsumerService
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ProducerConsumerService { public static void main(String[] args) { BlockingQueue<Message> queue = new ArrayBlockingQueue<>(10); Producer producer = new Producer(queue); Consumer consumer = new Consumer(queue); new Thread(producer).start(); new Thread(consumer).start(); System.out.println("Producer and consumer has been started"); } }
这个程序里我们通过定义一个BlockingQueue,每次Producer或Consumer都从其中来存取元素,利用它本身的机制保证对数据的访问是互斥的。
Condition
除了前面提到的两种办法,还有一种似乎更加细粒度控制的机制来实现同样的效果。那就是通过condition的await, signal机制。
和前面的方法类似,这里不再列出producer, consumer的细节,只是把对被操作数据的封装样式给列了出来:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BoundedBuffer { private final String[] buffer; private final int capacity; private int front; private int rear; private int count; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); public BoundedBuffer(int capacity) { super(); this.capacity = capacity; buffer = new String[capacity]; } public void deposit(String data) throws InterruptedException { lock.lock(); try { while (count == capacity) { notFull.await(); } buffer[rear] = data; rear = (rear + 1) % capacity; count++; notEmpty.signal(); } finally { lock.unlock(); } } public String fetch() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } String result = buffer[front]; front = (front + 1) % capacity; count--; notFull.signal(); return result; } finally { lock.unlock(); } } }
这部分代码比较有意思的地方是它需要定义一个Lock,通过这个Lock对象再来创建condition对象。这些对象针对正好问题里缓冲区域数据是满还是空这两个条件。如果进入一个互斥的比如说缓冲区满或者空的时候,另外一个线程则要释放当前的锁并进行等待。
condition和前面提到的thread所带方法看起来很相似。你看,thread里面用的wait, notify方法一个是让当前线程进入等待状态,一个是唤醒其他所有等待的线程。而condition里的await, signal方法恰恰看起来有类似的功能。而且,他们的结构也很类似,前者是使用一个synchronized的块,而后者是通过一个lock来保证线程访问互斥。他们的主要细节差别体现在如下。当我们使用wait()/notify()方法的时候,所有等待状态的线程或对象都会被通知到,所以我们不知道具体是哪个对象/线程最终被唤醒了。而await()/signal()方法我们可以区分哪个对象或者线程得到特定的signal。这样,我们就有一个好处,在前面的wait()/notify()过程中,我们是将所有休眠的线程一股脑儿都唤醒了,很可能使得这个时候不符合运行条件的线程抢占了锁,然后又发现不行又要去释放锁,这样造成了效率的低下。而采用这个await()/signal()的方式更加精确。像在本例中,只有等待条件符合的那些线程会被唤醒,其他的就没有任何影响。
总结
Producer Consumer问题其实不是一个很新鲜的概念。它的本质是要保证多个线程对数据的同步和互斥访问。对于多个访问的线程来说无非就是这么几个步骤,访问资源的时候先获取锁,发现自己可以执行就执行,不行就把锁给放了,通知其他线程再来获取。
参考资料
http://www.journaldev.com/1034/java-blockingqueue-example-implementing-producer-consumer-problem
http://www.baptiste-wicht.com/2010/09/java-concurrency-part-5-monitors-locks-and-conditions/
http://stackoverflow.com/questions/10395571/condition-vs-wait-notify-mechanism
相关推荐
在Linux操作系统下用C或C++实现经典同步问题:生产者-消费者问题。 含源代码和文档。 内容: 1.一个大小为10的缓冲区,初始状态为空。 2.2个生产者,随机等待一段时间,往缓冲区中添加数据,若缓冲区已满,等待消费...
producer-consumer-model 生产者-消费者模型
代码在这里
本文为文章 https://blog.csdn.net/u012779110/article/details/104312468 的代码示例,供参考学习用。
single-producer, single-consumer lock-free queue
Simple application demonstrate kafka java springboot
kettle kafka 消费者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。
利用信号量实现的多线程之间的同步与互斥,详情看博客文章Linux多线程编程(二)---线程之间的同步与互斥进阶实验
Kafka-Simple-Producer-Consumer:使用Java 8的kafka的生产者和消费者的简单变化
Algorithm for Producer-Consumer PoolsElad Gidron CS DepartmentTechnion, Haifa, Israel eladgi@cs.technion.ac.ilIdit Keidar EE DepartmentTechnion, Haifa, Israel idish@ee.technion.ac.ilDmitri Perelman ∗...
* The classic producer-consumer example. * Illustrates mutexes and conditions. * by Zou jian guo <ah_zou@tom.com> * 2003-12-22 * *************************************************/ #include #...
The use of producer-consumer questions helps to better understand threads.
节点生产者/消费者示例这是 Node.js 中生产者/消费者问题的“简单”解决方案。 在这个系统中,Producer/Generator 将发送一系列随机算术表达式(为了好玩而采用反向波兰表示法)。 Consumer/Evaluator 将接受这些...
JAVA中生产者-消费者的线程JAVA中Threads+Producer-Consumer的使用演示要测试它,只需在 Java IDE 中打开它并运行 Main 类。
Spring-cloud-contract-producer-kafka-consumer Spring云合约生产商kafka消费者集成测试
生产者-消费者模式的 Java 8 变体
9号实验室工作 任务 使用生产者-消费者模板实现搜寻器。 爬网程序是一种旨在对Internet页面进行... producer的数量由network_threads命令行network_threads parser_threads , consumer的数量parser_threads参数par
“生产者---消费者”问题是最著名的进程同步问题。它描述了一组生产者向一组消费者提供产品,它们共享一个有界缓冲区,生产者向其中投放产品,消费者从中取得产品。它是许多相互合作进程的抽象,如输入进程与计算...
用PHP实现Queue-Producer-Consumer Web Crawler。通过amphp / parallel依赖项使用多个进程或本机线程来爬网域以获取响应者链接。 / _ \ \_\(_)/_/ _//o|\_ / | @作者:罗伯特·伯恩斯@电子邮件: 安装 使用'...