03、JAVAEE---多线程(三)
队列可以有两种实现方式size变量 vs 预留空间一、什么是阻塞队列1、手动实现相较于传统队列阻塞队列线程安全因为其带有阻塞效果package javaee; import java.util.concurrent.ArrayBlockingQueue; public class MyBlockingQueue { public int[] arr; public int front 0; //队头下标 public int rear 0; //队尾下标 public int size 0; //当前元素个数 Object locker new Object(); public MyBlockingQueue(int num) { this.arr new int[num]; } public void put(int elem) throws InterruptedException { synchronized (locker) { //判满 //这里是while不是if while (size arr.length) { //满则加锁等待 locker.wait(); } arr[rear] elem; rear; size; if (rear arr.length) { rear 0; } //放完唤醒别人 locker.notify(); } } public int take() throws InterruptedException { synchronized (locker) { //判空 //替换if为while while (size 0) { locker.wait(); } int ret arr[front]; front; size--; if (front arr.length) { front 0; } //拿完唤醒别人 locker.notify(); return ret; } } public static void main(String[] args) { MyBlockingQueue queue new MyBlockingQueue(1000); Thread product new Thread(()-{ int n 1; while (true){ try { queue.put(n); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(放入的元素 n); n; try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread consumer new Thread(()-{ while (true) { try { int t queue.take(); System.out.println(拿出的元素 t); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); product.start(); consumer.start(); } }2、阻塞队列的size实现要点①引入新变量size代替了(rear1)%array.length front判满这样做可以不用多创建一个空间但代价是多维护一个变量②队头队尾移动使用越界判断重置代替取模③通过加锁实现线程安全④注意这里不要使用if否则会导致刚被唤醒没检查就继续往下执行了3、生产者消费者模型✅优点:①降低锁冲突生产者和消费者不再直接互斥访问共享资源通过队列缓冲减少锁竞争②解耦合生产者和消费者不直接依赖一方修改不影响另一方③削峰填谷队列缓冲能平滑突发流量避免下游被瞬间冲垮❌缺点:①通信效率引入中间队列数据需要先存再取比直接调用多了一次传递开销②增加系统架构复杂程度提高运维成本多了一个队列组件需要考虑队列容量、持久化、监控、运维等问题4、标准库中的使用package javaee; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; //阻塞队列标准库的使用 public class Demo19 { public static void main(String[] args) { BlockingQueueInteger queue new ArrayBlockingQueue(1000); Thread product new Thread(()-{ int count 0; while (true){ try { queue1.put(count); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } count; System.out.println(存入的元素 count ); } }); Thread consumer new Thread(()-{ while (true){ int x 0; try { x queue1.take(); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(取出的元素 x ); } }); product.start(); consumer.start(); } }拿来就能用使用上非常简单二、线程池1、作用①降低资源消耗避免频繁创建/销毁线程线程创建需要分配栈内存~1MB、进行系统调用开销大②提高响应速度线程预先创建好任务来了直接执行不用等线程创建③便于统一管理控制线程数量、管理任务队列、提供拒绝策略防止资源耗尽2、构造方法的参数重点①线程池 ThreadPoolExecutor有七大核心参数corePoolSize核心线程数maximumPoolSize最大线程数核心临时keepAliveTime临时线程空闲存活时间unit存活时间的单位workQueue阻塞队列存放等待的任务threadFactory创建线程的工厂handler拒绝策略队列满且线程满时②其中有四种拒绝策略策略类名行为比喻① 抛异常AbortPolicy默认直接抛出RejectedExecutionException公司说不招了滚② 调用者执行CallerRunsPolicy谁提交的任务谁自己执行老板说你自己干吧③ 丢弃最老的DiscardOldestPolicy丢弃队列里等待最久的任务把排最久的人赶走新人进来④ 丢弃最新的DiscardPolicy悄悄丢弃当前提交的任务假装没看见新人3、手动实现的简单线程池package javaee; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; //自定义的简易线程池 //实现了核心功能创建固定数量的工作线程通过阻塞队列接收任务并执行 class MyThreadPoolExecutor{ // 线程安全的队列当队列为空时take()会阻塞当队列满时put()会阻塞 private BlockingQueueRunnable queue new LinkedBlockingQueue(); //构造方法参数是线程个数 public MyThreadPoolExecutor(int nThread){ for (int i 0; i nThread; i) { Thread a new Thread(()-{ while (true){ try { Runnable runnable queue.take(); runnable.run(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); a.start(); } } public void submit(Runnable runnable) throws InterruptedException { queue.put(runnable); } } public class Demo21 { public static void main(String[] args) { //创建10个线程并在构造方法内启动 MyThreadPoolExecutor m new MyThreadPoolExecutor(10); //提交1000个任务 for (int i 0; i 1000; i) { int id i; try { m.submit(()-{ System.out.println(第 id 个任务); }); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }4、标准库中的使用package javaee; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Demo20 { public static void main(String[] args) throws InterruptedException { ExecutorService service Executors.newFixedThreadPool(10); //i不是事实final有报错 // for (int i 1; i 1000; i) { // service.submit(()-{ // System.out.println(第 i 个元素); // }); // Thread.sleep(1000); // } for (int i 1; i 1000; i) { //加入新变量 int id i; service.submit(()-{ System.out.println(第 id 个元素); }); Thread.sleep(1000); } } }Executors是对ThreadPoolExecutor的封装提供了几种预设的线程池创建方法使用起来很方便。但在生产环境中更推荐直接使用ThreadPoolExecutor的构造方法手动指定所有参数避免Executors预设参数可能带来的 OOM 风险《阿里巴巴Java开发手册》规定线程池不允许用Executors创建要用ThreadPoolExecutor手动指定参数这样可以根据业务场景灵活设置队列类型、拒绝策略等做到线程池参数可控三、定时器1、简单使用package javaee; import java.util.Timer; import java.util.TimerTask; public class Demo22 { public static void main(String[] args) { Timer timer new Timer(); //添加任务 //第一个参数表示要执行的任务不是简单的Runnable而是TimerTask //第二个参数表示需要等待多久后执行 timer.schedule(new TimerTask() { Override public void run() { System.out.println(hello 1); } },1000); timer.schedule(new TimerTask() { Override public void run() { System.out.println(hello 2); } },2000); timer.schedule(new TimerTask() { Override public void run() { System.out.println(hello 3); } },5000); System.out.println(hello main); } }使用上非常简单重点在于下面的手动实现一个定时器2、手动实现重点package javaee; //手动实现定时器 public class Demo26{ private volatile boolean running true; private Thread worker; // 延迟执行一次 public void schedule(Runnable task, long delay) { worker new Thread(() - { try { Thread.sleep(delay); if (running) task.run(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); worker.start(); } // 延迟 周期执行核心区别在这里 public void scheduleAtFixedRate(Runnable task, long delay, long period) { worker new Thread(() - { try { Thread.sleep(delay); while (running) { task.run(); Thread.sleep(period); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); worker.start(); } public void stop() { running false; if (worker ! null) { worker.interrupt(); } } public static void main(String[] args) throws InterruptedException { Demo26 timer new Demo26(); // 测试1延迟执行一次 System.out.println(开始执行等待1秒...); timer.schedule(() - { System.out.println(hello 1延迟1秒执行); }, 1000); // 测试2延迟周期执行 timer.scheduleAtFixedRate(() - { System.out.println(hello 2每2秒执行一次); }, 3000, 2000); // 运行10秒后停止 Thread.sleep(10000); timer.stop(); System.out.println(定时器已停止); } }多线程完