需求:
线程池(采用ThreadPoolExecutor)有2类共计10个作战区线程在跑,都是从blockingQueue里面取武器进行消费,其中5个是对德,另外5个对日,另外有n个生产者线程负责对blockingQueue发送武器; 现在由于中途德国率先投降,需要削减对德的驻军,请问这个多线程代码框架如何设计? 要求: 尽可能采用juc原始的多线程工具; 不允许使用redis和db相关; blockingQueue里面的消息要全部被消费后才能退出;/** * @Title: ArsenalTester.java * @Package: inter * @Description: TODO(用一句话描述该文件做什么) * @author: 陈元俊 * @date: 2019年5月24日 下午12:42:23 * @version V1.0 * @Copyright: 2019 All rights reserved. */package inter;import java.util.UUID;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/** * @ClassName: ArsenalTester * @Description: 模拟二战武器生产 * @author: 陈元俊 * @date: 2019年5月24日 下午12:42:23 */public class ArsenalTester { private static final String CANCEL_NAZI = "CANCEL_NAZI"; private static final String CANCEL_JAPAN = "CANCEL_JAPAN"; /* 兵工厂,这里是生产者 */ public static class Arsenal implements Runnable { private BlockingQueueequipments; public static volatile boolean stop = false; // 全局的兵工厂生产开关 public Arsenal(BlockingQueue equipments) { this.equipments = equipments; } public static AtomicInteger counts = new AtomicInteger(); @Override public void run() { while (!stop) { String equip = UUID.randomUUID().toString(); // System.out.println("Arsenal produced " + equip); equipments.add(equip); counts.incrementAndGet(); try { Thread.sleep(10); } catch (InterruptedException e) { } } System.out.println("stop producing!"); } } /* 负责对日的司令部 默认5个战区,从兵工厂要装备*/ public static class FightJapanHQ implements Runnable { private BlockingQueue equipments; public static AtomicInteger jpcounts = new AtomicInteger(); public FightJapanHQ(BlockingQueue equipments) { this.equipments = equipments; } @Override public void run() { while (true) { String equip; try { equip = equipments.take(); if (equip.equals(CANCEL_JAPAN)) { System.out.println("-------------------------FightJapanHQ demolished!"); // 毒丸收到,撤销战区 return; } else if (CANCEL_NAZI.equals(equip)) { // System.out.println("DONT GIVE ME NAZI!"); equipments.add(equip); continue; } jpcounts.incrementAndGet(); System.out.println("Asia HQ fetch " + equip); } catch (InterruptedException e) { System.out.println("-=-=-=cannot get"); } } } } /* 负责对德的司令部,默认5个战区,从兵工厂要装备 */ public static class FightNaziHQ implements Runnable { private BlockingQueue equipments; public static AtomicInteger nzcounts = new AtomicInteger(); public FightNaziHQ(BlockingQueue equipments) { this.equipments = equipments; } @Override public void run() { while (true) { try { String equip = equipments.take(); if (CANCEL_NAZI.equals(equip)) { System.out.println("-------------------------FightNaziHQ demolished!"); // 毒丸收到,撤销战区 return; } else if (CANCEL_JAPAN.equals(equip)) { // System.out.println("DONT GIVE ME JP!"); equipments.add(equip); continue; } nzcounts.incrementAndGet(); System.out.println("Euro HQ fetch " + equip); } catch (InterruptedException e) { System.out.println("-=-=-=cannot get"); } } } } public static void main(String[] args) throws Exception { ExecutorService battleField = Executors.newCachedThreadPool(); BlockingQueue equipments = new ArrayBlockingQueue<>(1000); new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 10; i++) { battleField.submit(new Arsenal(equipments)); } for (int i = 0; i < 5; i++) { battleField.submit(new FightNaziHQ(equipments)); battleField.submit(new FightJapanHQ(equipments)); } System.out.println("i am out!"); // 发射后不管 } }).start(); Thread.sleep(5000); System.out.println("=============================Nazi surrendered!============================="); // 德国先投降 for (int i = 0; i < 4; i++) { boolean val = equipments.add(CANCEL_NAZI); // 这里取消掉4个对德战区,保留1个 } Thread.sleep(5000); Arsenal.stop = true; System.out.println("Celebrate peace"); // 二战胜利!马放南山,但此刻Queue里面大概率仍然有订单! for (int i = 0; i < 1; i++) { boolean val = equipments.add(CANCEL_NAZI); // 别忘了战后取消剩下的1个对德战区 // System.out.println("put res " + val); } for (int i = 0; i < 5; i++) { boolean val = equipments.add(CANCEL_JAPAN); // 取消剩下的5个对日战区 // System.out.println("put res " + val); } battleField.shutdown(); while (!battleField.isTerminated()) { } System.out.println("Totally produced " + Arsenal.counts); System.out.println("Finally FightNaziHQ received " + FightNaziHQ.nzcounts.intValue() + " and FightJapanHQ received " + FightJapanHQ.jpcounts.intValue()); System.out.println("equipments now remains " + equipments.size()); }}