目录
- 线程池设计思路
- 线程池实现原理
- ThreadPoolExecutor 示例
- ThreadPoolExecutor 源码详解
- execute提交任务源码分析
- addWorker创建线程源码详解
- runWorker执行任务源码详解
- processWorkerExit回收线程源码详解
- getTask获取任务源码详解
- tryTerminate终止线程源码详解
- 创建多少线程比较合适
- 初始化线程池
- 结束语
- Reference
摘要:分享JAVA JUC线程池干货,首先描述线程池的基本概念,然后介绍线程工厂和拒绝策略,其次逐步深入线程池实现原理和线程池状态机,最后结合实战讲解源码。
JUC干货系列目录:
- JAVA JUC干货之线程池状态和状态切换
- JAVA JUC干货之线程池实现原理和源码详解(上)
- JAVA JUC干货之线程池实现原理和源码详解(下)
我在《JAVA JUC干货之线程池实现原理和源码详解(上)》中介绍了线程池的基本功能,本文分享线程池实现原理,并结合案例详解线程池源码。
线程池设计思路
本节主要参考文章《Java 线程池详解,图文并茂,还有谁不会》,感兴趣的读者可以去看看。有句话叫做艺术来源于生活,编程语言也是如此,很多设计思想能映射到日常生活中,比如封装、继承和抽象等等。今天我们要说的线程池,它同样可以在现实世界找到对应的实体——工厂。先假想一个工厂的生产流程:
工厂生产流程 假如有一个工厂,工厂中有固定的一批工人,称为正式员工,每个正式员工同时只能做一件任务,由这些正式员工完成工厂接收的订单。随着新任务增长的速度远远大于工人做任务的速度,必定出现正式员工忙不过来的场景,工厂会将生产原料暂时堆积在仓库中,等有空闲的正式员工时再处理。因为正式员工清闲自在,空闲了也不会主动处理仓库中的生产任务,所以需要调度员实时调度。仓库堆积满了后,订单还在增加怎么办?
工厂只能临时扩招一批工人来应对生产高峰,而这批工人在高峰结束后,由于新任务增长的速度放缓慢了,是要辞掉的,所以称为临时工。当临时工也以招满后(受限于工位数量限制导致临时工数量也有上限),后面的订单只能忍痛拒绝了。我们做如下一番映射:
工厂线程池订单任务正式员工核心线程临时工非核心线程仓库阻塞队列工位数量最大线程数调度员getTask(),将任务队列中的任务调度给空闲线程 映射后,形成如下线程池流程图,两者是不是有异曲同工之妙?
工厂-线程池流程映射图 这样,线程池的工作原理或者说流程就很好理解了,在下一章详细介绍。
线程池实现原理
线程池的执行原理是通过循环地从任务队列中取出任务,然后将任务分配给空闲的工作线程执行。当任务队列为空时,线程池会进入等待状态,直到有新的任务到来。线程池还提供设置线程数和任务队列长度的能力,以控制并发线程的数量。
下面我们进入正题,看一下在线程池中一个任务从提交到最终执行完毕经历了哪些过程。
敲黑板划重点,用一句话简述ThreadPoolExecutor线程池实现原理:首先创建核心线程,再把任务放入阻塞队列,其次创建非核心线程,最后抛弃任务。详细流程图如下:
线程池实现原理 从执行流程可知,属性判断顺序如下:corePoolSize -> workQueue -> maxinumPoolSize。线程池执行流程归纳为如下:
1.预热核心线程:提交新任务后,如果线程数没有达到核心线程数 corePoolSize,则创建核心线程执行新任务,而且空闲的核心线程处于阻塞状态不执行新任务直到队列中有任务。即线程池优先填满corePoolSize个核心线程,再复用核心线程处理任务。如果线程数达到核心线程数,则进入下个流程。
2.任务入队:如果线程数达到核心线程数,且任务队列未满,则把新任务放入阻塞队列末尾;这个场景下,核心线程执行完当前任务后自动从任务队列中获取任务来执行。如果任务队列已满,则进入下个流程。
3.创建非核心线程:如果任务队列已满而且最大线程数 maximumPoolSize > 线程数poolSize > corePoolSize,则创建新的普通线程并立刻执行当前的新任务;温馨提示,这个场景下新任务早于队列中的任务执行。如果线程数达到最大线程数,则进入执行拒绝任务的流程。
非核心线程数为 maximumPoolSize - corePoolSize,即为最大线程数减去核心线程数。
4.拒绝任务:如果线程数等于最大线程数且任务队列已满,则根据拒绝策略处理新任务。
5.复用线程:线程执行完任务后去检查任务队列里是否有任务需要执行,若有则马上执行;否则,进入阻塞状态。
6.销毁线程:普通线程如果无事可做的时间超过keepAliveTime,那么就会被回收,以便减少内存占用和资源消耗,实现对系统资源的调优。如果调用了allowCoreThreadTimeOut(true)方法,则会给核心线程数设置存活时间,使得超过keepAliveTime的核心线程也会被销毁,从而最终有可能导致线程池中的线程数为0。
通常情况下,随着阻塞队列中任务数的减少,线程数最终会收缩到 corePoolSize 的大小,故线程数维持在corePoolSize和maximumPoolSize之间。
这里有个细节值得大家品味,在线程池里的线程数小于核心线程数的前提下提交任务的时候,虽然核心线程因任务队列空空如也而可能处于闲置状态,但是依然会继续创建核心线程,而非复用已有的、空闲的核心线程。
ThreadPoolExecutor 示例
下面这个使用execute提交任务的示例用于演示线程池执行流程:- import com.google.common.util.concurrent.ThreadFactoryBuilder;
- import java.util.concurrent.*;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * @Author 楼兰胡杨
- * @Date 2025-05-16
- * @Description: 通过调整for循环次数验证线程池执行机制
- */
- public class ThreadPoolTest {
- public static ThreadPoolExecutor executor;
- public static void main(String[] args) {
- ThreadFactory guavaFactory = new ThreadFactoryBuilder().setNameFormat("pool-Wiener-%d").build();
- // 自定义线程工厂
- ThreadFactory jucFactory = new ThreadFactory() {
- private final AtomicInteger mThreadNum = new AtomicInteger(1);
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "Wiener-" + mThreadNum.getAndIncrement());
- }
- };
- RejectionImpl rejection = new RejectionImpl();
- executor = new ThreadPoolExecutor(2, 5, 200, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(10), guavaFactory, rejection);
- for (int i = 0; i < 25; i++) {
- MyPoolTask myTask = new MyPoolTask(i);
- executor.execute(myTask);
- }
- executor.shutdown();
- }
- }
- class MyPoolTask implements Runnable {
- private int taskNum;
- ThreadPoolExecutor myExe = ThreadPoolTest.executor;
- public MyPoolTask(int num) {
- this.taskNum = num;
- }
- @Override
- public void run() {
- // System.out.println("正在执行task " + taskNum);
- System.out.println(Thread.currentThread().getName() + ",活跃线程数目:" + myExe.getPoolSize()
- + ",队列长度:" +
- myExe.getQueue().size()
- + ",当前任务是" + taskNum + ",已执行任务数目:" + myExe.getCompletedTaskCount());
- try {
- Thread.currentThread().sleep(300);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("执行完毕task " + taskNum);
- }
- @Override
- public String toString() {
- return taskNum + "";
- }
- }
- class RejectionImpl implements RejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- System.out.println("被拒绝任务是 " + r.toString());
- }
- }
复制代码 执行结果如下:- pool-Wiener-1,活跃线程数目:3,队列长度:10,当前任务是1,已执行任务数目:0
- pool-Wiener-0,活跃线程数目:2,队列长度:9,当前任务是0,已执行任务数目:0
- pool-Wiener-2,活跃线程数目:3,队列长度:10,当前任务是12,已执行任务数目:0
- pool-Wiener-4,活跃线程数目:5,队列长度:10,当前任务是14,已执行任务数目:0
- pool-Wiener-3,活跃线程数目:5,队列长度:10,当前任务是13,已执行任务数目:0
- 被拒绝任务是 15
- 被拒绝任务是 16
- 被拒绝任务是 17
- 被拒绝任务是 18
- 被拒绝任务是 19
- 被拒绝任务是 20
- 被拒绝任务是 21
- 被拒绝任务是 22
- 被拒绝任务是 23
- 被拒绝任务是 24
- 执行完毕task 0
- 执行完毕task 14
- pool-Wiener-0,活跃线程数目:5,队列长度:9,当前任务是2,已执行任务数目:1
- 执行完毕task 1
- pool-Wiener-4,活跃线程数目:5,队列长度:8,当前任务是3,已执行任务数目:2
- 执行完毕task 12
- pool-Wiener-2,活跃线程数目:5,队列长度:6,当前任务是5,已执行任务数目:4
- pool-Wiener-1,活跃线程数目:5,队列长度:7,当前任务是4,已执行任务数目:3
- 执行完毕task 13
- pool-Wiener-3,活跃线程数目:5,队列长度:5,当前任务是6,已执行任务数目:5
- 执行完毕task 3
- pool-Wiener-4,活跃线程数目:5,队列长度:4,当前任务是7,已执行任务数目:6
- 执行完毕task 2
- 执行完毕task 4
- pool-Wiener-0,活跃线程数目:5,队列长度:3,当前任务是8,已执行任务数目:7
- 执行完毕task 6
- pool-Wiener-1,活跃线程数目:5,队列长度:2,当前任务是9,已执行任务数目:8
- 执行完毕task 5
- pool-Wiener-3,活跃线程数目:5,队列长度:1,当前任务是10,已执行任务数目:9
- pool-Wiener-2,活跃线程数目:5,队列长度:0,当前任务是11,已执行任务数目:10
- 执行完毕task 9
- 执行完毕task 8
- 执行完毕task 10
- 执行完毕task 11
- 执行完毕task 7
复制代码 从执行结果日志可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序for循环中,把创建的任务数从25个降低到15个,就不会抛出任务被拒绝的异常了。
ThreadPoolExecutor 源码详解
程序猿为什么需要读源码?阅读源码是程序猿成长过程中非常重要的一个环节。这就像学习语言时需要阅读经典文学作品一样,通过阅读源码,程序员可以学到很多优秀的编程技巧和最佳实践;可以了解多样化的技术选型方案,培养架构设计敏感度,为技术决策提供参考依据。具体来说,读源码有以下几个好处:
深入了解工具库或框架:当你使用第三方库或者框架时,深入其内部结构有助于更好地掌握它们的功能以及局限性,从而更高效地运用到自己的项目中去。
增强业务能力和团队协作能力:阅读和理解其他同事敲的代码可以使人深度认识项目,精准了解业务逻辑,还能促进成员间的交流与合作。
面试准备:在求职过程中,了解常见开源框架的源码实现可以帮助你在面试环节中脱颖而出,因为许多公司会考察候选人对这些技术细节的理解程度。
拓展技术视野:接触不同项目的源码(如微服务框架、分布式系统)可了解多样化的技术选型方案,培养架构设计敏感度,为技术决策提供参考依据。
总之,对于任何有志于提升自身技术水平的程序员而言,定期花时间去研究高质量的源码是一项不可或缺的学习活动。你对哪些源码感兴趣?可以评论区留言,一起品尝。下面浅谈我对 ThreadPoolExecutor 源码的理解,不当之处还请在评论区留言,一起翻越ThreadPoolExecutor这座山。
execute提交任务源码分析
在ThreadPoolExecutor类中,最核心的任务提交方法是execute(Runnable command)方法,虽然通过submit也可以提交任务,但是submit方法底层实现中最终调用的还是execute,所以我们只需要分析execute的源码即可。在JDK 21中,execute(Runnable command)的源代码如下所示,已经添加中文注释,同时保留了原汁原味的英文注释:- /**
- * Executes the given task sometime in the future. The task
- * may execute in a new thread or in an existing pooled thread.
- *
- * If the task cannot be submitted for execution, either because this
- * executor has been shutdown or because its capacity has been reached,
- * the task is handled by the current {@link RejectedExecutionHandler}.
- *
- * @param command the task to execute
- * @throws RejectedExecutionException at discretion of
- * {@code RejectedExecutionHandler}, if the task
- * cannot be accepted for execution
- * @throws NullPointerException if {@code command} is null
- */
- public void execute(Runnable command) {
- // 判断提交的任务是否为null
- if (command == null)
- throw new NullPointerException();
- /*
- * Proceed in 3 steps(执行过程可以分为如下三个步骤):
- *
- * 1. If fewer than corePoolSize threads are running, try to
- * start a new thread with the given command as its first
- * task. The call to addWorker atomically checks runState and
- * workerCount, and so prevents false alarms that would add
- * threads when it shouldn't, by returning false.
- *
- * 2. If a task can be successfully queued, then we still need
- * to double-check whether we should have added a thread
- * (because existing ones died since last checking) or that
- * the pool shut down since entry into this method. So we
- * recheck state and if necessary roll back the enqueuing if
- * stopped, or start a new thread if there are none.
- *
- * 3. If we cannot queue task, then we try to add a new
- * thread. If it fails, we know we are shut down or saturated
- * and so reject the task.
- */
- // 可以将 ctl 理解成保存了线程数和运行状态等信息的变量
- int c = ctl.get();
- // 判断工作线程总数是否小于核心线程数,小于的时候可以添加核心线程,并且将当前任务作为此线程执行的第一个任务
- if (workerCountOf(c) < corePoolSize) {
- //新增的核心线程成功执行任务,流程结束;addWorker第二个参数为true,表示根据核心线程数判断线程数量
- if (addWorker(command, true))
- return;
- // addWorker执行报错,需要再次检查变量值
- c = ctl.get();
- }
- //判断线程池的状态是否正常而且任务放入任务队列是否正常
- if (isRunning(c) && workQueue.offer(command)) {
- // 同理,为了防止向任务队列中又提交新的任务造成错误,再次更新变量值
- int recheck = ctl.get();
- if (! isRunning(recheck) && remove(command))
- //当前线程池处于非运行状态且将刚添加的任务成功从任务队列移除,执行拒绝策略
- reject(command);
- else if (workerCountOf(recheck) == 0)
- // 线程数为0,第一个参数为null,表示创建线程但不启动
- addWorker(null, false);
- //线程数超过核心线程数而且队列已满,需要新增非核心线程
- } else if (!addWorker(command, false))
- //工作线程已经达到了最大线程数阈值,或者是调用了关闭线程池的方法,触发拒绝策略
- reject(command);
- }
复制代码 假设线程数达到核心线程数且队列未满,这时新提交的任务可以直接复用空闲的核心线程吗?从源码来看,新提交的任务需要放入任务队列,空闲的核心线程是从任务队列拿任务。
线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是借助生产者消费者模式将任务和线程两者解耦,不让两者直接绑定,方便做后续的工作分配。在队列为空时,线程会等待任务进入队列;当队列不满时,任务会等待线程来执行。
从上述源码不难发现,execute 方法多次通过 addWorker 方法来添加线程处理任务,故我们下面来阅读 addWorker 方法的源码。
addWorker创建线程源码详解
- /**
- * Set containing all worker threads in pool. 线程池中存放线程的集合,维护一组Worker对象
- * Accessed only when holding mainLock.
- */
- private final HashSet<Worker> workers = new HashSet<>();
-
- /*
- * Methods for creating, running and cleaning up after workers
- */
- /**
- * Checks if a new worker can be added with respect to current
- * pool state and the given bound (either core or maximum). If so,
- * the worker count is adjusted accordingly, and, if possible, a
- * new worker is created and started, running firstTask as its
- * first task. This method returns false if the pool is stopped or
- * eligible to shut down. It also returns false if the thread
- * factory fails to create a thread when asked. If the thread
- * creation fails, either due to the thread factory returning
- * null, or due to an exception (typically OutOfMemoryError in
- * Thread.start()), we roll back cleanly.
- *
- * @param firstTask the task the new thread should run first (or
- * null if none). Workers are created with an initial first task
- * (in method execute()) to bypass queuing when there are fewer
- * than corePoolSize threads (in which case we always start one),
- * or when the queue is full (in which case we must bypass queue).
- * Initially idle threads are usually created via
- * prestartCoreThread or to replace other dying workers.
- *
- * @param core if true use corePoolSize as bound, else
- * maximumPoolSize. (A boolean indicator is used here rather than a
- * value to ensure reads of fresh values after checking other pool
- * state).
- * @return true if successful
- */
- private boolean addWorker(Runnable firstTask, boolean core) {
- // 外层循环:判断线程池状态
- retry:
- for (int c = ctl.get();;) {
- // Check if queue empty only if necessary.
- if (runStateAtLeast(c, SHUTDOWN)
- && (runStateAtLeast(c, STOP)
- || firstTask != null
- || workQueue.isEmpty()))
- // 线程池已经被停止或者关闭,或者队列为空,终止流程,返回 false
- return false;
- // 死循环
- for (;;) {
- // 检查当前工作线程数是否已经达到核心线程数(core 为 true)或最大线程数(core 为 false)
- if (workerCountOf(c)
- >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
- // 达到核心线程数或最大线程数,返回 false
- return false;
- //通过cas操作增加线程池的工作线程数量
- if (compareAndIncrementWorkerCount(c))
- // 成功原子地增加工作线程数,跳出外层的 for 循环
- break retry;
- // 增加线程数量失败,再次读取 ctl 的值
- c = ctl.get(); // Re-read ctl
- // 如果当前运行状态不等于
- if (runStateAtLeast(c, SHUTDOWN))
- // 线程池处于关闭状态,跳到外层循环重新执行
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- /**
- * 线程数量+1成功的后续操作:添加新工作线程到工作线程集合,启动工作线程执行firstTask
- */
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- //添加新的Worker对象来处理任务firstTask
- w = new Worker(firstTask);
- //获取 Worker 对象中的线程。Worker类是任务线程的包装类,内部封装了一个Thread类型的变量
- final Thread t = w.thread;
- if (t != null) {
- // 开始加内置锁
- final ReentrantLock mainLock = this.mainLock;
- // 基于ReentrantLock的同步块
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- // 再次检查状态,保证线程工厂没有失败或者在获取锁之前线程池没有被关闭
- int c = ctl.get();
- //添加线程的前提条件:①线程处于运行状态,②线程处于shutdown状态并且firstTask为null
- if (isRunning(c) ||
- (runStateLessThan(c, STOP) && firstTask == null)) {
- // 检查线程状态,如果不是新建状态就抛异常
- if (t.getState() != Thread.State.NEW)
- throw new IllegalThreadStateException();
- //workers是一个HashSet<Woker> 集合,往里面新增Worker类型的工作线程对象
- workers.add(w);
- // 把工作线程添加标志置为 true
- workerAdded = true;
- int s = workers.size();
- // largestPoolSize 表示线程池中出现过的最大线程数
- if (s > largestPoolSize)
- // 更新线程池中线程数最大值
- largestPoolSize = s;
- }
- } finally {
- // 释放锁
- mainLock.unlock();
- }
- if (workerAdded) {
- // 启动新创建的线程,执行当前任务firstTask
- container.start(t);
- // 把线程启动标志置为 true
- workerStarted = true;
- }
- }
- } finally {
- //判断线程有没有启动成功,如果没有则调用addWorkerFailed方法
- if (! workerStarted)
- addWorkerFailed(w);
- }
- // 返回线程启动标志
- return workerStarted;
- }
复制代码 我们来看看函数addWorker(Runnable firstTask, boolean core) 的入参。firstTask:添加的新线程需要执行的第一任务,执行完成之后才能执行任务队列中其它任务;如果没有任务,则需要设置为null。core的值如果是true,则使用corePoolSize与线程数进行比较,判断线程数是否已经达到核心线程数;否则,使用maximumPoolSize判断线程数是否已经达到最大线程数。
Worker类继承了AbstractQueuedSynchronizer,实现了Runnable接口。线程池中,每一个线程都被封装成一个Worker对象,ThreadPool维护的其实就是存放在变量HashSet workers中的一组Worker对象。Worker 在执行完任务后,还会无限循环获取工作队列里的任务来执行。我们选择关键源码简单看一下 Worker 类,它是 ThreadPoolExecutor 类的一个内部类:- private final class Worker extends AbstractQueuedSynchronizer implements Runnable
- {
- /** Thread this worker is running in. Null if factory fails. 处理任务的线程 */
- @SuppressWarnings("serial") // Unlikely to be serializable
- final Thread thread;
- /** Initial task to run. Possibly null.保存传入的任务 */
- @SuppressWarnings("serial") // Not statically typed as Serializable
- Runnable firstTask;
- /** Per-thread task counter */
- volatile long completedTasks;
- // TODO: switch to AbstractQueuedLongSynchronizer and move
- // completedTasks into the lock word.
- /**
- * Creates with given first task and thread from ThreadFactory.
- * @param firstTask the first task (null if none)
- */
- Worker(Runnable firstTask) {
- //设置AQS的同步状态
- setState(-1); // inhibit interrupts until runWorker
- // 封装任务
- this.firstTask = firstTask;
- // 新增执行任务的线程,this表示线程,故Worker对象在启动的时候就会调用函数run
- this.thread = getThreadFactory().newThread(this);
- }
- /** Delegates main run loop to outer runWorker. */
- public void run() {
- runWorker(this);
- }
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
- // 使用AQS实现独占锁,不允许重入
- protected boolean tryAcquire(int unused) {
- // 使用CAS修改状态,不允许重入
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
- }
复制代码 Worker类中的函数run()的实现都在 runWorker() 方法中,故我们看一下runWorker()的源码。
runWorker执行任务源码详解
- /**
- * Main worker run loop. Repeatedly gets tasks from queue and
- * executes them, while coping with a number of issues:
- *
- * 1. We may start out with an initial task, in which case we
- * don't need to get the first one. Otherwise, as long as pool is
- * running, we get tasks from getTask. If it returns null then the
- * worker exits due to changed pool state or configuration
- * parameters. Other exits result from exception throws in
- * external code, in which case completedAbruptly holds, which
- * usually leads processWorkerExit to replace this thread.
- *
- * 2. Before running any task, the lock is acquired to prevent
- * other pool interrupts while the task is executing, and then we
- * ensure that unless pool is stopping, this thread does not have
- * its interrupt set.
- *
- * 3. Each task run is preceded by a call to beforeExecute, which
- * might throw an exception, in which case we cause thread to die
- * (breaking loop with completedAbruptly true) without processing
- * the task.
- *
- * 4. Assuming beforeExecute completes normally, we run the task,
- * gathering any of its thrown exceptions to send to afterExecute.
- * We separately handle RuntimeException, Error (both of which the
- * specs guarantee that we trap) and arbitrary Throwables.
- * Because we cannot rethrow Throwables within Runnable.run, we
- * wrap them within Errors on the way out (to the thread's
- * UncaughtExceptionHandler). Any thrown exception also
- * conservatively causes thread to die.
- *
- * 5. After task.run completes, we call afterExecute, which may
- * also throw an exception, which will also cause thread to
- * die. According to JLS Sec 14.20, this exception is the one that
- * will be in effect even if task.run throws.
- *
- * The net effect of the exception mechanics is that afterExecute
- * and the thread's UncaughtExceptionHandler have as accurate
- * information as we can provide about any problems encountered by
- * user code.
- *
- * @param w the worker
- */
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();// 获取当前线程
- // 用创建线程时传入的firstTask初始化任务
- Runnable task = w.firstTask;
- w.firstTask = null;// 清空 Worker 对象的第一个任务
- // 解锁,允许中断
- w.unlock(); // allow interrupts
- // 标记线程是否因为异常退出循环,默认为 true
- boolean completedAbruptly = true;
- try {
- //循环获取任务,在getTask()返回null时跳出while循环,且回收线程
- // 如果firstTask为null,则使用getTask()从任务队列取任务
- while (task != null || (task = getTask()) != null) {
- w.lock();
- // If pool is stopping, ensure thread is interrupted;
- // if not, ensure thread is not interrupted. This
- // requires a recheck in second case to deal with
- // shutdownNow race while clearing interrupt
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- //当前线程被打上中断标志
- wt.interrupt();
- try {
- //执行任务前的钩子方法,让继承类做一些统计之类的事情
- beforeExecute(wt, task);
- try {
- // 执行任务
- task.run();
- // 执行任务后的钩子方法
- afterExecute(task, null);
- } catch (Throwable ex) {
- afterExecute(task, ex);
- throw ex;
- }
- } finally {
- // 把执行完的任务设置为null,从而触发getTask()重新获取任务;增加任务数,同时释放锁
- task = null;
- // 更新已完成的任务数
- w.completedTasks++;
- w.unlock();
- }
- }
- // 标记线程正常退出
- completedAbruptly = false;
- } finally {
- // 回收/销毁线程
- processWorkerExit(w, completedAbruptly);
- }
- }
复制代码 getTask结果为null时跳出while循环,执行processWorkerExit,销毁线程;如果执行processWorkerExit的入参completedAbruptly=true,表示线程意外退出,需要减少计数。
在这个方法中,while (task != null || (task = getTask()) != null)说明线程会循环地使用getTask函数从线程池的任务队列中取任务并执行任务,直到取出的任务对象为null,此时线程池已经关闭或者任务队列为空。接下来就会跳出 while 循环进入 finally 语句块执行processWorkerExit(),尝试回收线程。这行while循环代码也说明如果队列中没有任务且核心线程数小于corePoolSize,则来任务时一定会创建核心线程,而已经创建的核心线程哪怕一直无所事事也不会执行新任务。
到这里就介绍完线程池脚踏实地干活的Worker类了,简单归纳一下:worker对象同时封装了一个任务Runnable firstTask和一个线程final Thread thread,它的创建依赖于线程状态,任务的执行也是在worker里去处理。这里有个小技巧值得大家细细品味——在函数runWorker中,会把执行完的任务及时设置为null但保留线程,从而触发函数getTask()重新获取任务,实现线程复用。
processWorkerExit回收线程源码详解
- /**
- * Performs cleanup and bookkeeping for a dying worker. Called
- * only from worker threads. Unless completedAbruptly is set,
- * assumes that workerCount has already been adjusted to account
- * for exit. This method removes thread from worker set, and
- * possibly terminates the pool or replaces the worker if either
- * it exited due to user task exception or if fewer than
- * corePoolSize workers are running or queue is non-empty but
- * there are no workers.
- *
- * @param w the worker
- * @param completedAbruptly if the worker died due to user exception
- */
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
- decrementWorkerCount();
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- completedTaskCount += w.completedTasks;
- // 将线程引用移出线程池
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- // 回收线程
- tryTerminate();
- int c = ctl.get();
- if (runStateLessThan(c, STOP)) {
- if (!completedAbruptly) {
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- if (workerCountOf(c) >= min)
- return; // replacement not needed
- }
- addWorker(null, false);
- }
- }
复制代码 事实上,在这个processWorkerExit函数中,将线程引用移出线程池线程集合且调用了tryTerminate()就已经回收了线程。但由于引起线程销毁的可能性有很多,故线程池要判断是什么触发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态重新分配线程。
getTask获取任务源码详解
本节,我们来分析线程怎么通过 getTask() 方法从任务队列中取出任务的。- /**
- * Performs blocking or timed wait for a task, depending on
- * current configuration settings, or returns null if this worker
- * must exit because of any of:
- * 1. There are more than maximumPoolSize workers (due to
- * a call to setMaximumPoolSize).
- * 2. The pool is stopped.
- * 3. The pool is shutdown and the queue is empty.
- * 4. This worker timed out waiting for a task, and timed-out
- * workers are subject to termination (that is,
- * {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
- * both before and after the timed wait, and if the queue is
- * non-empty, this worker is not the last thread in the pool.
- *
- * @return task, or null if the worker must exit, in which case
- * workerCount is decremented
- */
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- // 死循环,此方法要么返回 null,要么返回 Runnable 对象代表取到了任务
- for (;;) {
- int c = ctl.get();
- // Check if queue empty only if necessary.
- if (runStateAtLeast(c, SHUTDOWN)
- && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
- //线程处于非运行状态时,若满足如下两个条件之一,则线程数减1
- // 1.线程处于 STOP 及以上(STOP、TIDYING、TERMINAL)的状态;2.任务队列为空
- decrementWorkerCount();
- return null;
- }
- // 计算线程总数
- int wc = workerCountOf(c);
- // Are workers subject to culling? 可以销毁线程吗
- // 变量timed用于判断是否需要进行超时控制
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- // 如下场景可以销毁线程:①线程数大于最大线程数;
- // ②允许核心线程被回收且空闲时间超过存活时间;
- // ③非核心线程空闲时间超过存活时间
- // TODO wc可以为0吗?
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- // 线程数减一成功时返回null,表名当前线程可以被回收
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- /**
- * 从阻塞队列中取Runnable类型的任务对象,
- */
- Runnable r = timed ?
- // timed 为 true,调用poll方法,如果keepAliveTime纳秒内没有拿到任务,则返回null
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- // timed 为 false,调用take方法,此时线程永久阻塞直到有任务对象返回
- workQueue.take();
- if (r != null)
- // 返回从阻塞队列中取到的任务,终止循环
- return r;
- // 线程没有拿到任务,标记为已过期等待被销毁
- timedOut = true;
- } catch (InterruptedException retry) {
- //当前线程在获取任务时发生中断,则设置为不过期并返回循环重试
- timedOut = false;
- }
- }
- }
复制代码 tryTerminate终止线程源码详解
在《Java线程池状态和状态切换》中已经介绍过,通过 shutdownNow 与 shutdown可以触发线程池关闭流程,当方法执行完毕后,线程池将会进入 STOP 或者 SHUTDOWN 状态。但是此时线程池并未真正的被关闭,在runWorker方法最后的finally块中,调用了processWorkerExit方法,其逻辑实现中调用了一个 tryTerminate 方法,这个才是正在关闭线程池的钩子方法。楼兰胡杨本节和各位一起看看函数tryTerminate的源码。- /**
- * Transitions to TERMINATED state if either (SHUTDOWN and pool
- * and queue empty) or (STOP and pool empty). If otherwise
- * eligible to terminate but workerCount is nonzero, interrupts an
- * idle worker to ensure that shutdown signals propagate. This
- * method must be called following any action that might make
- * termination possible -- reducing worker count or removing tasks
- * from the queue during shutdown. The method is non-private to
- * allow access from ScheduledThreadPoolExecutor.
- */
- final void tryTerminate() {
- for (;;) {
- int c = ctl.get();
- // 如果线程池不处于预停机状态,则不进行停机
- if (isRunning(c) ||
- runStateAtLeast(c, TIDYING) ||
- (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
- return;
- if (workerCountOf(c) != 0) { // Eligible to terminate
- // 当前还有工作线程,不停机
- interruptIdleWorkers(ONLY_ONE);
- return;
- }
- // 线程处于预关闭状态,开始关闭线程池
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 尝试通过 CAS 将线程池状态修改为 TIDYING
- if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
- try {
- terminated();
- } finally {
- // 尝试通过 CAS 将线程池状态修改为 TERMINATED
- ctl.set(ctlOf(TERMINATED, 0));
- termination.signalAll();
- container.close();
- }
- return;
- }
- } finally {
- mainLock.unlock();
- }
- // else retry on failed CAS
- }
- }
复制代码 哦了,再来看看函数 terminated(),是不是瞬间感觉很坑爹?它的方法体里面神!马!也!没!干!淡定,其实它是个钩子方法,允许通过重写在线程池被终止时做一些特殊的业务逻辑,默认的线程池没有什么要做的事情,当然也没有必要写什么啦~- /**
- * Method invoked when the Executor has terminated. Default
- * implementation does nothing. Note: To properly nest multiple
- * overridings, subclasses should generally invoke
- * {@code super.terminated} within this method.
- */
- protected void terminated() { }
复制代码 创建多少线程比较合适
要想合理的配置线程池线程数,就必须首先分析任务特性,可以从以下几个角度来进行分析:
- 任务的性质:CPU 密集型任务,IO 密集型任务和混合型任务;
- 任务的优先级:高,中和低;
- 任务的执行时间;
- 任务的依赖关系:是否依赖其他系统资源,如数据库连接;
上面都是需要考虑的因素,至于应该创建多个线程,还是需要压测的,单纯考虑CPU 密集型任务和IO 密集型不合适。另外,建议任务性质不同的任务用不同规模的线程池分开处理,保证业务解耦。
初始化线程池
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。在实际中如果需要线程池创建之后立即创建线程,可以通过prestartCoreThreadhe 和 prestartAllCoreThreads两个方法实现,二者都通过调用函数boolean addWorker(Runnable firstTask, boolean core)来实现。
prestartCoreThread()用于在创建线程池的时候初始化一个核心线程,实现源码如下:- /**
- * Starts a core thread, causing it to idly wait for work. This
- * overrides the default policy of starting core threads only when
- * new tasks are executed. This method will return {@code false}
- * if all core threads have already been started.
- *
- * @return {@code true} if a thread was started
- */
- public boolean prestartCoreThread() {
- return workerCountOf(ctl.get()) < corePoolSize &&
- //注意传进去的参数firstTask是null
- addWorker(null, true);
- }
复制代码 prestartAllCoreThreads():初始线程池时创建所有核心线程,实现源码如下:- /**
- * Starts all core threads, causing them to idly wait for work. This
- * overrides the default policy of starting core threads only when
- * new tasks are executed.
- *
- * @return the number of threads started
- */
- public int prestartAllCoreThreads() {
- int n = 0;
- while (addWorker(null, true))//注意传入的任务是null
- ++n;
- return n;
- }
复制代码 注意上面传进boolean addWorker(Runnable firstTask, boolean core)的参数firstTask是null,创建的核心线程会被阻塞在getTask方法中,等待获取任务队列中的任务。
八股文:当任务数超过核心线程数时,如何直接启用最大线程数maximumPoolSize?
分析:题目中【任务数超过核心线程数】可以理解为【线程数超过核心线程数】。
答:综合【线程池执行流程】所述得知,既然我们的预期是任务数超过核心线程数时新提交的任务不进入任务队列,就需要人为干预第二步 任务入队。这答案就显而易见了——在创建线程池的时候,指定任务队列使用SynchronousQueue。SynchronousQueue是不能存储元素的一个队列,它的特性是每生产一个任务就需要指定一个消费者来处理这个任务;否则,阻塞生产者。
结束语
至此,已经介绍完线程池核心知识点,预祝各位读者在工作中能够迅速而准确地处理线程池相关需求,就像运斤成风一样。
在编程这个复杂严峻的环境中,请活得优雅坦然:也许你的钱包空空如也,也许你的工作不够好,也许你正处在困境中,也许你被情所弃。不论什么原因,请你在出门时,一定要把自己打扮地清清爽爽,昂起头,挺起胸,面带微笑,从容自若地面对生活和面对工作。人生就像蒲公英,没事尽量少吹风;只要你自己真正撑起来了一片天地,别人无论如何是压不垮你的,内心的强大才是真正的强大。
Reference
- JUC线程池: ThreadPoolExecutor详解
- https://blog.csdn.net/xaiobit_hl/article/details/132281971
- https://blog.csdn.net/weixin_43918863/article/details/146354499
- https://zhuanlan.zhihu.com/p/2983855645
- https://www.cnblogs.com/w08e/p/18410687
读后有收获,小礼物走一走,请作者喝咖啡。 Buy me a coffee. ☕Get red packets. 作者:楼兰胡杨链接:https://www.cnblogs.com/east7/p/18903603本文版权归作者和博客园共有,欢迎转载,但请注明原文链接,并保留此段声明,否则保留追究法律责任的权利。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |