线程间的协作(2)——生产者与消费者模式

1.何为生产者与消费者

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。


import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName:Restraurant
 * @Description:何为生产者与消费者
 * @author: 
 * @date:2018年5月3日
 */
public class Restraurant {
	Meal m=null;
	Chef chef=new Chef(this);
	WaitPerson wait=new WaitPerson(this);
	ExecutorService service=Executors.newCachedThreadPool();
	public Restraurant() {
		service.execute(chef);
		service.execute(wait);
	}
	public static void main(String[] args) {
		new Restraurant();
	}
}
/**
 * @ClassName:Meal
 * @Description:生产者生成的数据
 * @author: 
 * @date:2018年5月3日
 */
class Meal{
	private final int orderNum;//食物订单编号
	public Meal(int num){
		orderNum=num;
	}
	public String toString(){
		return "Meal"+orderNum;
	}
}
/**
 * @ClassName:Chef
 * @Description:厨师类,及生产者
 * @author: 
 * @date:2018年5月3日
 */
class Chef implements Runnable{
	Restraurant r;
	int count=0;
	public Chef(Restraurant r) {
		this.r=r;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				synchronized (this) {
					while(r.m!=null){
						System.out.println("厨师等待中");
						wait();//等待服务员取餐
					}
				}
				if(count++==10){
					System.out.println("今日已售完");
					r.service.shutdownNow();
				}
				System.out.println("订单完成,服务员取餐");
				synchronized (r.wait) {
					r.m=new Meal(count);
					r.wait.notifyAll();
					
				}
				TimeUnit.SECONDS.sleep(1);
			}
		}catch (InterruptedException e) {
			System.out.println("生产者线程强制中断");
		}
		
	}
}
/**
 * @ClassName:WaitPerson
 * @Description:服务员类,即消费者
 * @author: 
 * @date:2018年5月3日
 */
class WaitPerson implements Runnable{
	Restraurant r;
	public WaitPerson(Restraurant r) {
		this.r=r;
	}
	@Override
	public void run() {
		try {
			while (!Thread.interrupted()) {
				synchronized (this) {
					while (r.m == null) {
						System.out.println("服务员等待中");
						wait();// 等待厨师生成食物
					}
				}

				System.out.println("服务员以取餐" + r.m);
				synchronized (r.chef) {
					r.m = null;
					r.chef.notifyAll();
				}
			}
		} catch (InterruptedException e) {
			System.out.println("消费者线程强制中断");
		}
		
	}
	
}

2.生产者与消费者模式

    1)产生原因:在多线程开发 中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理 完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须 等待生产者。wait与notify方法以一种非常低级的方式解决了任务互相通知的问题,即每次交互都要进行一次握手,极大影响的效率以及性能,为了解决这种生产消费能力不均衡的问题,便有了生产者和消费者模式。

    2)原理:生产者和消费者模式是通过一个容器(比如同步阻塞队列)来解决生产者和消费者的强耦合问题。生产者和消 费者彼此之间不直接通信,而是通过阻塞队列来进行通信,所以生产者生产完数据之后不用 等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取, 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。 这个阻塞队列就是用来给生产者和消费者解耦的。java.util.concurrent.BlockingQueue接口提供了这个队列,通常使用其实现子类ArrayBlockingQueue,LinkedBlockingQueue。当消费者任务试图从同步队列中获取对象,如果队列为空时,那么队列则会挂起消费者任务,并且当拥有足够多的元素可用时才会恢复消费者任务。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class UseBlockingQueue {
	public static void main(String[] args) throws InterruptedException {
		LinkedBlockingQueue<Toast> dry=new LinkedBlockingQueue<Toast>(),
				butter=new LinkedBlockingQueue<Toast>(),
				jam=new LinkedBlockingQueue<Toast>(),
				con=new LinkedBlockingQueue<Toast>();
		ExecutorService exec=Executors.newCachedThreadPool();
		exec.execute(new MakeToast(dry));//制作初始吐司任务
		exec.execute(new Butter(dry,butter));//吐司抹黄油任务
		exec.execute(new Jam(butter,jam));//吐司抹果酱任务
		exec.execute(new Consumer(jam));//消费者任务,食用吐司
		TimeUnit.SECONDS.sleep(5);
		exec.shutdownNow();
	}
}
class Toast{
	private int status;//吐司状态:0代表制作吐司,1代表抹黄油,2代表向抹了黄油的吐司抹果酱
	private final int id;
	public Toast(int id1) {
		id=id1;
	}
	public void butter(){
		status=1;
	};
	public void jam(){
		status=2;
	}
	public int getStatus(){
		return status;
	}
	public int getId(){
		return id;
	}
	public String toString(){
		return "toast "+id+":"+status;
	}
}
/**
 * @Description:制作初始吐司
 */
class MakeToast implements Runnable{
	private LinkedBlockingQueue<Toast> queue=new LinkedBlockingQueue<Toast>();
	private int count=0;
	public MakeToast(LinkedBlockingQueue<Toast> q) {
		queue=q;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Thread.sleep(1000);//制作时间
				Toast t=new Toast(count);
				System.out.println(t);
				queue.put(t);//添加到同步队列
				count++;
			}
		}catch (InterruptedException e) {
			System.out.println("make process interrupted");
		}
		System.out.println("make process off");
	}
}
/**
 * @Description:涂抹黄油
 */
class Butter implements Runnable{
	private LinkedBlockingQueue<Toast> queue1,queue2;//未加料吐司队列,抹黄油后吐司队列
	public Butter(LinkedBlockingQueue<Toast> q1,LinkedBlockingQueue<Toast>q2) {
		queue1=q1;
		queue2=q2;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Toast t=queue1.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加
				t.butter();
				System.out.println(t);
				queue2.put(t);
			}
		}catch (InterruptedException e) {
			System.out.println("butter process interrupted");
		}
		System.out.println("butter process off");
	}
}
/**
 * @Description:涂抹果酱
 */
class Jam implements Runnable{
	private LinkedBlockingQueue<Toast> queue1,queue2;//抹黄油后吐司队列,抹果酱吐司队列
	public Jam(LinkedBlockingQueue<Toast> q1,LinkedBlockingQueue<Toast>q2) {
		queue1=q1;
		queue2=q2;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Toast t=queue1.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加
				t.jam();
				System.out.println(t);
				queue2.put(t);
			}
		}catch (InterruptedException e) {
			System.out.println("jam process interrupted");
		}
		System.out.println("jam process off");
	}
}
/**
 * @Description:被食用
 */
class Consumer implements Runnable{
	private LinkedBlockingQueue<Toast> finished;//抹黄油后吐司队列,抹果酱吐司队列
	int count=0;
	public Consumer(LinkedBlockingQueue<Toast> q) {
		finished=q;
	}
	@Override
	public void run() {
		try{
			while(!Thread.interrupted()){
				Toast t=finished.take();//如果队列中没有可用元素将会阻塞,直至有可用元素被添加
				if(t.getId()!=count++||t.getStatus()!=2){
					System.out.println("过程出现错误");
					return;
				}else{
					System.out.println("所有过程正确实现"+"toast "+t.getId()+"被食用");
				}
			}
		}catch (InterruptedException e) {
			System.out.println("eat process interrupted");
		}
		System.out.println("eat process off");
	}
}

 

上一篇:微信开发模式无法验证以及返回消息中文乱码的情况


下一篇:运维自动化之ansible playbook安装lamp环境