|
1 | 1 | # ThreadPoolExecutor 源码剖析
|
2 | 2 |
|
| 3 | +源码基于 JDK9 |
| 4 | + |
| 5 | +--- |
| 6 | + |
3 | 7 | <!-- TOC -->
|
4 | 8 |
|
5 | 9 | - [概览](#概览)
|
6 | 10 | - [继承结构](#继承结构)
|
7 | 11 | - [状态转换](#状态转换)
|
8 | 12 | - [个性定制](#个性定制)
|
| 13 | + - [Core and maximum pool sizes](#core-and-maximum-pool-sizes) |
| 14 | + - [On-demand construction](#on-demand-construction) |
| 15 | + - [Creating new threads](#creating-new-threads) |
| 16 | + - [Keep-alive times](#keep-alive-times) |
| 17 | + - [Queuing](#queuing) |
| 18 | + - [Rejected tasks](#rejected-tasks) |
| 19 | + - [Hook methods](#hook-methods) |
| 20 | + - [Queue maintenance](#queue-maintenance) |
| 21 | + - [Finalization](#finalization) |
9 | 22 | - [任务处理流程](#任务处理流程)
|
10 | 23 | - [最佳实践](#最佳实践)
|
11 | 24 | - [参考](#参考)
|
|
43 | 56 |
|
44 | 57 | ### 状态转换
|
45 | 58 |
|
| 59 | + |
| 60 | + |
| 61 | +线程池的状态在 `ThreadPoolExecutor` 中的实际表示方式是一个 `AtomicInteger` 类型的成员变量的高三位(因为有 5 种状态),名称为 `ctl`。 |
| 62 | + |
| 63 | +`ctl` 是 32 位的整型变量,他封装了两个变量: |
| 64 | + |
| 65 | +1. 线程池运行状态。高三位。 |
| 66 | + - RUNNING: 111 |
| 67 | + - SHUTDOWN: 000 |
| 68 | + - STOP: 001 |
| 69 | + - TIDYING: 010 |
| 70 | + - TERMINATED: 011 |
| 71 | +1. 有效工作线程数。低 29 位。 |
| 72 | + |
| 73 | +因为 `ctl` 是 `AtomicInteger` 的实例,其上的操作基于 CAS,是线程安全的。 |
| 74 | + |
| 75 | +1. `shutdown()` |
| 76 | + 1. `advanceRunState(SHUTDOWN);` 首先将运行状态修改为 `SHUTDOWN`此时当有新任务提交时直接抛出 `RejectedExecutionException` 来拒绝服务。 |
| 77 | + 1. `interruptIdleWorkers()` 通过调用 Worker 线程的 `interrupt()` 方法试图中断空闲 worker。// todo `t.interrupt()` 方法会对那些线程状态有效?成功调用会产生什么影响?Java 的线程状态和操作系统内部线程状态之间有什么关系?此处涉及的知识面稍广,浪费了一些时间依旧没能理解,把 ThreadPoolExecutor 过完以后再系统解决。 |
| 78 | + 1. `tryTerminate()` 只对于运行状态为 `STOP` 或 `SHUTDOWN`+任务队列为空两种情况, 当 Worker 数量未减到 0 之前,每次调用会尝试中断一个 Worker 线程。当任务队列为不为空时,Worker 线程处理完正在处理的任务,会从工作队列中取出未处理的任务继续工作,循环这个过程至任务队列为空,Worker 获取不到要处理的任务时会将其从 Worker 集中移除,worker 数量减一,然后在 `processWorkerExit()` 方法中再次调用 `tryTerminate()` 。当 Worker 线程数量减到 0 以后再调用该方法时,会将运行状态修改为 `TIDYING` 并调用 `terminated()` 方法。`ThreadPoolExextor` 中该方法为空函数所以 `TIDYING` 和 `TERMINATED` 两个状态基本没有区别。 |
| 79 | +1. `shutdownNow()` |
| 80 | + 1. `advanceRunState(STOP)` 首先将运行状态修改为 `STOP` 此时当有新任务提交时直接抛出 `RejectedExecutionException` 来拒绝服务。 |
| 81 | + 1. `interruptWorkers();` |
| 82 | + 1. `drainQueue()`,直接清空任务队列 |
| 83 | + 1. `tryTerminate()` |
| 84 | + |
46 | 85 | ### 个性定制
|
47 | 86 |
|
| 87 | +#### Core and maximum pool sizes |
| 88 | + |
| 89 | +`ThreadPoolExecutor` 会依照 `corePoolSize` 和 `maximumPoolSize` 两个字段来动态调整线程池的大小。新任务提交过来时,如果当前活动的线程数少于 `corePoolSize` 会创建一个新线程来处理这个新任务即使当前有空闲线程。如果当前线程数大于 `corePoolSize` 小于 `maximumPoolSize` 且任务队列已满时也会创建新线程。 |
| 90 | + |
| 91 | +- 配置两个属性相等时可以获得固定容量的线程池。 |
| 92 | +- 将 `maximumPoolSize` 设为一个很大的数(比如 `Integer.MAX_VAlUE`)时,可以获得一个无上界的线程池,可以用来处理任意数量的并发任务。(Tips: 线程过多并不合适,因为物理机的 CPU 数量有限,无法同时处理那么多线程,只会白白占用资源,所以这个属性可以根据实际物理机 CPU 数量来配置。) |
| 93 | + |
| 94 | +通常来说这两个属性,只通过构造器来配置,但是 `ThreadPoolExecutor` 也提供了 `setter` 方法可以在运行时配置。 |
| 95 | + |
| 96 | +#### On-demand construction |
| 97 | + |
| 98 | +如果在构造 `ThreadPoolExecutor` 时,任务队列中已经有要处理的任务了,那么在创建好以后,可用通过直接调用 `prestartCoreThread()` 或 `prestartAllCoreThreads()` 来创建核心线程去处理这些任务。否则这些任务就只能在有新任务提交过来以后才能开始处理。 |
| 99 | + |
| 100 | +#### Creating new threads |
| 101 | + |
| 102 | +新线程是通过 `ThreadFactory` 来创建的,如果在构造时未指定,就使用默认的 `java.util.concurrent.Executors.DefaultThreadFactory`。该 `ThreadFactory` 创建的线程都属于同一个线程组、`Thread.NORM_PRIORITY` 优先级、用户线程。 |
| 103 | + |
| 104 | +我们可以通过指定一个不同的线程工厂来修改线程名、线程组、优先级、线程守护状态等等。 |
| 105 | + |
| 106 | +#### Keep-alive times |
| 107 | + |
| 108 | +如果当前线程数量超出了 `corePoolSize`,超出的那部分非核心线程会在空闲超出 `keepAliveTime` 时被终止。这能够线程池活跃状态不足时及时回收占用的资源。该参数也可以使用 `setKeepAliveTime(long, TimeUnit)` 来运行时动态修改。可以通过使用 `Long.MAX_VALUE TimeUnit.NANOSECONDS` 两个参数来禁用线程回收。默认情况下核心线程超时不回收,可以通过配置 `keepAliveTime` 和 `allowCoreThreadTimeOut` 来允许核心线程超时回收。 |
| 109 | + |
| 110 | +#### Queuing |
| 111 | + |
| 112 | +任意的 `BlockingQueue` 实现都可以作为任务队列。任务队列的使用对线程池收缩会有一定影响: |
| 113 | + |
| 114 | +- 如果当前线程数少于 `corePoolSize`,新提交的任务会直接提交给新创建的线程。 |
| 115 | +- 如果当前线程数不少于 `corePoolSize`,新提交的任务会提交到任务队列中。 |
| 116 | +- 如果新任务无法提交到任务队列(队列已满),会尝试创建一个新线程,如果线程数达到了 `maximumPoolSize` 而导致新线程无法创建则该任务会被拒绝。 |
| 117 | + |
| 118 | +一般来说,任务队列有三种使用策略: |
| 119 | + |
| 120 | +1. 直接交付。直接将到达的任务交付给线程,而不是将任务暂存起来。当没有空闲线程可用时直接新建线程。这种方式通常需要无上界的 `maximumPoolSize` 来防止拒绝服务。当然这种方式也有缺点,新任务到达的速度超过任务处理的速度时,新建的线程数量会越来越多。耗费内存。常常使用 `SynchronousQueue` 作为任务队列的实现类。 |
| 121 | +1. 无界队列。使用无界队列的话,执行任务的线程数不会超过 `corePoolSize` 的大小,但核心线程都无空闲时,新到的任务会添加到任务队列。当新任务到达的速度超过了任务处理的速度时,任务会积累的越来越多。常常使用 `LinkedBlockingQueue` 作为任务队列的实现类。 |
| 122 | +1. 有界队列。和有限的 `maximumPoolSize` 结合使用能够防止资源耗尽。但是队列的大小和 `maximumPoolSize` 的大小配置权衡起来会更难一些。大队列加小容量线程池可以最小化 CPU使用率、OS 资源和上下文切换的开销。但是有可能吞吐量会比较低,如果任务频繁阻塞(I/O操作)的话无法最优使用 CPU 资源。如果使用小队列加大容量的线程池,可以保证 CPU 的使用率,但是上下文调度的开销可能会过大,这也会降低吞吐量。常使用 `ArrayBlockingQueue` 作为任务队列的实现类。 |
| 123 | + |
| 124 | +#### Rejected tasks |
| 125 | + |
| 126 | +1. `Executor` 状态不再是 `RUNNING`(已经被 `SHUTDOWN`) |
| 127 | +1. 任务队列已满并且线程数量达到最大值,已达到饱和状态。 |
| 128 | + |
| 129 | +#### Hook methods |
| 130 | + |
| 131 | +`ThreadPoolExecutor` 也提供了一些其他方法,子类可以重写这些方法来提供额外的支持:重新初始化 `ThreadLocals`,收集统计信息,添加日志等等。 |
| 132 | + |
| 133 | +1. `beforeExecute(Thread, Runnable)`, //任务执行前调用 |
| 134 | +1. `afterExecute(Runnable, Throwable)` //任务执行后调用 |
| 135 | +1. `terminated()` // `Executor` 状态转为 `TIDYING` 后调用 |
| 136 | + |
| 137 | +#### Queue maintenance |
| 138 | + |
| 139 | +1. `getQueue()` 可以访问任务队列,但是只鼓励用于监控与调试。 |
| 140 | +1. `remove(Runnable)` 和 `purge()` 方法可以用于取消尚未执行的任务。`remove(Runnable)` 直接从任务队列删除,`purge()` 从任务队列批量删除已取消的 `Future` |
| 141 | + |
| 142 | +#### Finalization |
| 143 | + |
| 144 | +当线程池没有到 GC Roots 的引用并且 Worker 数为 0 时会被自动回收。 |
| 145 | + |
| 146 | +如果想要在忘记调用 `shutdown()` 时也能确保未被引用的线程池被回收的话,需要确保未使用的线程最终都被能终止。可以设置合适的 `keepAliveTime` 以及 `allowCoreThreadTimeOut`。 |
| 147 | + |
48 | 148 | ## 任务处理流程
|
49 | 149 |
|
50 | 150 | ## 最佳实践
|
51 | 151 |
|
| 152 | +一般情况下使用 `Executors` 的工厂方法来创建即可适用于大多数场景。需要配置的话参考 个性定制 来配置更合适自己项目的 `ThreadPoolExecutor`。 |
| 153 | + |
52 | 154 | ## 参考
|
53 | 155 |
|
54 | 156 | 1. [Java SE 9 & JDK 9 -- java.util.concurrent](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/package-summary.html)
|
|
57 | 159 | 1. [Java SE 9 & JDK 9 -- ExecutorService](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ExecutorService.html)
|
58 | 160 | 1. [Java SE 9 & JDK 9 -- ThreadPoolExecutor](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/ThreadPoolExecutor.html)
|
59 | 161 | 1. [Java SE 9 & JDK 9 -- Source Code](.)
|
| 162 | +1. [What is Daemon thread in Java?](https://stackoverflow.com/questions/2213340/what-is-daemon-thread-in-java) |
| 163 | +1. [深入理解java线程池—ThreadPoolExecutor](http://www.jianshu.com/p/ade771d2c9c0) |
0 commit comments