⽣产者消费者问题Java三种实现
⽣产者-消费者Java实现
2017-07-27
1 概述
⽣产者消费者问题是多线程的⼀个经典问题,它描述是有⼀块缓冲区作为仓库,⽣产者可以将产品放⼊仓库,消费者则可以从仓库中取⾛产品。
解决⽣产者/消费者问题的⽅法可分为两类:
采⽤某种机制保护⽣产者和消费者之间的同步;
在⽣产者和消费者之间建⽴⼀个管道。
第⼀种⽅式有较⾼的效率,并且易于实现,代码的可控制性较好,属于常⽤的模式。第⼆种管道缓冲区不易控制,被传输数据对象不易于封装等,实⽤性不强。
在Java中有四种⽅法⽀持同步,其中前三个是同步⽅法,⼀个是管道⽅法。
wait() / notify()⽅法
await() / signal()⽅法
BlockingQueue阻塞队列⽅法
PipedInputStream / PipedOutputStream
本⽂只介绍前三种。
2 实现
2.1 wait() / notify()⽅法
wait() / nofity()⽅法是基类Object的两个⽅法:
wait()⽅法:当缓冲区已满/空时,⽣产者/消费者线程停⽌⾃⼰的执⾏,放弃锁,使⾃⼰处于等等状态,让其他线程执⾏。
notify()⽅法:当⽣产者/消费者向缓冲区放⼊/取出⼀个产品时,向其他等待的线程发出可执⾏的通知,同时放弃锁,使⾃⼰处于等待状态。
缓冲区Storage.java代码如下:
import java.util.LinkedList;
public class Storage
{
// 仓库最⼤存储量
private final int MAX_SIZE = 100;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
// ⽣产产品
public void produce(String producer)
{
synchronized (list)
{
// 如果仓库已满
while (list.size() == MAX_SIZE)
{
System.out.println("仓库已满,【"+producer+"】:暂时不能执⾏⽣产任务!");
try
{
// 由于条件不满⾜,⽣产阻塞
list.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
// ⽣产产品
list.add(new Object());
System.out.println("【"+producer+"】:⽣产了⼀个产品\t【现仓储量为】:" + list.size());            ifyAll();
}
}
// 消费产品
public void consume(String consumer)
{
synchronized (list)
{
//如果仓库存储量不⾜
while (list.size()==0)
{
System.out.println("仓库已空,【"+consumer+"】:暂时不能执⾏消费任务!");
try
{
// 由于条件不满⾜,消费阻塞
list.wait();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
System.out.println("【"+consumer+"】:消费了⼀个产品\t【现仓储量为】:" + list.size());            ifyAll();
}
}
public LinkedList<Object> getList()
{
return list;
}
public void setList(LinkedList<Object> list)
{
this.list = list;
}
public int getMAX_SIZE()
{
return MAX_SIZE;
}
}
View Code
Test.java
public class Test {
public static void main(String[] args)
{
Storage storage=new Storage();
for(int i=1;i<6;i++)
{
int finalI = i;
new Thread(new Runnable() {
@Override
public void run() {
storage.produce(String.format("⽣成者%d:", finalI));
}
}).start();
}
for(int i=1;i<4;i++)
{
int finalI = i;
new Thread(()-> sume(String.format("消费者%d:", finalI))).start();
}
}
}
View Code
结果如下:
仓库已空,【消费者1】:暂时不能执⾏消费任务!
【⽣产者3】:⽣产了⼀个产品【现仓储量为】:1
【消费者2】:消费了⼀个产品【现仓储量为】:0
仓库已空,【消费者3】:暂时不能执⾏消费任务!
【⽣产者1】:⽣产了⼀个产品【现仓储量为】:1
【⽣产者4】:⽣产了⼀个产品【现仓储量为】:2
【⽣产者2】:⽣产了⼀个产品【现仓储量为】:3
【⽣产者5】:⽣产了⼀个产品【现仓储量为】:4
【消费者1】:消费了⼀个产品【现仓储量为】:3
【消费者3】:消费了⼀个产品【现仓储量为】:2
2.2 await() / signal()⽅法
await()和signal()的功能基本上和wait() / nofity()相同,完全可以取代它们,但是它们和新引⼊的锁定机制Lock直接挂钩,具有更⼤的灵活性。通过在Lock对象上调⽤newCondition()⽅法,将条件变量和⼀个锁对象进⾏绑定,进⽽控制并发程序访问竞争资源的安全。
缓冲区Storage.java代码如下:
import java.util.LinkedList;
import urrent.locks.Condition;
import urrent.locks.Lock;
import urrent.locks.ReentrantLock;
public class Storage {
// 仓库最⼤存储量
private final int MAX_SIZE = 100;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
// 锁
private final Lock lock = new ReentrantLock();
// 仓库满的条件变量
private final Condition full = wCondition();
// 仓库空的条件变量
private final Condition empty = wCondition();
// ⽣产产品
public void produce(String producer) {
lock.lock();
// 如果仓库已满
while (list.size() == MAX_SIZE) {
System.out.println("仓库已满,【" + producer + "】:暂时不能执⾏⽣产任务!");
try {
// 由于条件不满⾜,⽣产阻塞
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// ⽣产产品
list.add(new Object());
System.out.println("【" + producer + "】:⽣产了⼀个产品\t【现仓储量为】:" + list.size());
empty.signalAll();
// 释放锁
lock.unlock();
}
// 消费产品
public void consume(String consumer) {
// 获得锁
pipedinputstreamlock.lock();
// 如果仓库存储量不⾜
while (list.size() == 0) {
System.out.println("仓库已空,【" + consumer + "】:暂时不能执⾏消费任务!");
try {
// 由于条件不满⾜,消费阻塞
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("【" + consumer + "】:消费了⼀个产品\t【现仓储量为】:" + list.size());
full.signalAll();
// 释放锁
lock.unlock();
}
public LinkedList<Object> getList() {
return list;
}
public void setList(LinkedList<Object> list) {
this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}
View Code
2.3 BlockingQueue
它是⼀个已经在内部实现了同步的队列,实现⽅式采⽤的是我们第2种await() / signal()⽅法。它可以在⽣成对象时指定容量⼤⼩。它⽤于阻塞操作的是put()和take()⽅法:
put()⽅法:类似于我们上⾯的⽣产者线程,容量达到最⼤时,⾃动阻塞。
take()⽅法:类似于我们上⾯的消费者线程,容量为0时,⾃动阻塞。
import urrent.LinkedBlockingQueue;
public class Storage {
// 仓库最⼤存储量
private final int MAX_SIZE = 100;
// 仓库存储的载体
private LinkedBlockingQueue<Object> list = new LinkedBlockingQueue<Object>(100);
// ⽣产产品
public void produce(String producer) {
// 如果仓库已满
if (list.size() == MAX_SIZE) {
System.out.println("仓库已满,【" + producer + "】:暂时不能执⾏⽣产任务!");
}
// ⽣产产品
try {
list.put(new Object());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("【" + producer + "】:⽣产了⼀个产品\t【现仓储量为】:" + list.size());
}
/
/ 消费产品
public void consume(String consumer) {
// 如果仓库存储量不⾜
if (list.size() == 0) {
System.out.println("仓库已空,【" + consumer + "】:暂时不能执⾏消费任务!");
}
try {
list.take();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("【" + consumer + "】:消费了⼀个产品\t【现仓储量为】:" + list.size());
}
public LinkedBlockingQueue<Object> getList() {
return list;
}
public void setList(LinkedBlockingQueue<Object> list) { this.list = list;
}
public int getMAX_SIZE() {
return MAX_SIZE;
}
}
View Code

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。