Java教程

深入分析线程池的实现原理

多线程 小海豚博客管理员 2020-04-15 00:15:58.0 187 0条

一.说明

线程池,顾名思义就是存放线程的池子,池子里存放了很多可以复用的线程。

如果不用类似线程池的容器,每当我们需要执行用户任务的时候都去创建新的线程,任务执行完之后线程就被回收了,这样频繁地创建和销毁线程会浪费大量的系统资源。

因此,线程池通过线程复用机制,并对线程进行统一管理,具有以下优点:

降低系统资源消耗。通过复用已存在的线程,降低线程创建和销毁造成的消耗;

提高响应速度。当有任务到达时,无需等待新线程的创建便能立即执行;

提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗大量系统资源,还会降低系统的稳定性,使用线程池可以进行对线程进行统一的分配、调优和监控。

ThreadPoolExecutor是线程池框架的一个核心类,本文通过对ThreadPoolExecutor源码的分析(基于JDK 1.8),来深入分析线程池的实现原理。

二.ThreadPoolExecutor类的属性

先从ThreadPoolExecutor类中的字段开始:

  1. // 线程池的控制状态,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量
  2. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
  3. //值为29,用来表示偏移量
  4. private static final int COUNT_BITS = Integer.SIZE - 3;
  5. //线程池的最大容量,其值的二进制为:00011111111111111111111111111111(29个1)
  6. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
  7. // 线程池的运行状态,总共有5个状态,用高3位来表示
  8. private static final int RUNNING = -1 << COUNT_BITS;
  9. private static final int SHUTDOWN = 0 << COUNT_BITS;
  10. private static final int STOP = 1 << COUNT_BITS;
  11. private static final int TIDYING = 2 << COUNT_BITS;
  12. private static final int TERMINATED = 3 << COUNT_BITS;
  13. //任务缓存队列,用来存放等待执行的任务
  14. private final BlockingQueue<Runnable> workQueue;
  15. //全局锁,对线程池状态等属性修改时需要使用这个锁
  16. private final ReentrantLock mainLock = new ReentrantLock();
  17. //线程池中工作线程的集合,访问和修改需要持有全局锁
  18. private final HashSet<Worker> workers = new HashSet<Worker>();
  19. // 终止条件
  20. private final Condition termination = mainLock.newCondition();
  21. //线程池中曾经出现过的最大线程数
  22. private int largestPoolSize;
  23. //已完成任务的数量
  24. private long completedTaskCount;
  25. //线程工厂
  26. private volatile ThreadFactory threadFactory;
  27. //任务拒绝策略
  28. private volatile RejectedExecutionHandler handler;
  29. //线程存活时间
  30. private volatile long keepAliveTime;
  31. //是否允许核心线程超时
  32. private volatile boolean allowCoreThreadTimeOut;
  33. //核心池大小,若allowCoreThreadTimeOut被设置,核心线程全部空闲超时被回收的情况下会为0
  34. private volatile int corePoolSize;
  35. //最大池大小,不得超过CAPACITY
  36. private volatile int maximumPoolSize;
  37. //默认的任务拒绝策略
  38. private static final RejectedExecutionHandler defaultHandler =
  39. new AbortPolicy();
  40. private static final RuntimePermission shutdownPerm =
  41. new RuntimePermission("modifyThread");
  42. private final AccessControlContext acc;

在ThreadPoolExecutor类的这些属性中,线程池状态是控制线程池生命周期至关重要的属性,这里就以线程池状态为出发点进行研究。

通过上面的源码可知,线程池的运行状态总共有5种,其值和含义分别如下:

RUNNING: 高3位为111,接受新任务并处理阻塞队列中的任务

SHUTDOWN: 高3位为000,不接受新任务但会处理阻塞队列中的任务

STOP:高3位为001,不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务

TIDYING: 高3位为010,所有任务都已终止,工作线程数量为0,线程池将转化到TIDYING状态,即将要执行terminated()钩子方法

TERMINATED: 高3位为011,terminated()方法已经执行结束

然而,线程池中并没有使用单独的变量来表示线程池的运行状态,而是使用一个AtomicInteger类型的变量ctl来表示线程池的控制状态,其将线程池运行状态与工作线程的数量打包在一个整型中,用高3位来表示线程池的运行状态,低29位来表示线程池中工作线程的数量,对ctl的操作主要参考以下几个函数:

  1. // 通过与的方式,获取ctl的高3位,也就是线程池的运行状态
  2. private static int runStateOf(int c) { return c & ~CAPACITY; }
  3. //通过与的方式,获取ctl的低29位,也就是线程池中工作线程的数量
  4. private static int workerCountOf(int c) { return c & CAPACITY; }
  5. //通过或的方式,将线程池状态和线程池中工作线程的数量打包成ctl
  6. private static int ctlOf(int rs, int wc) { return rs | wc; }
  7. //SHUTDOWN状态的值是0,比它大的均是线程池停止或清理状态,比它小的是运行状态
  8. private static boolean isRunning(int c) {
  9. return c < SHUTDOWN;
  10. }

接下来,我们看一下线程池状态的所有转换情况,如下:

RUNNING -> SHUTDOWN:调用shutdown(),可能在finalize()中隐式调用

(RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()

SHUTDOWN -> TIDYING:当缓存队列和线程池都为空时

STOP -> TIDYING:当线程池为空时

TIDYING -> TERMINATED:当terminated()方法执行结束时

通常情况下,线程池有如下两种状态转换流程:

RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED
RUNNING -> STOP -> TIDYING -> TERMINATED

三.ThreadPoolExecutor类的构造方法

通常情况下,我们使用线程池的方式就是new一个ThreadPoolExecutor对象来生成一个线程池。接下来,先看ThreadPoolExecutor类的构造函数:

  1. //间接调用最后一个构造函数,采用默认的任务拒绝策略AbortPolicy和默认的线程工厂
  2. public ThreadPoolExecutor(int corePoolSize,
  3. int maximumPoolSize,
  4. long keepAliveTime,
  5. TimeUnit unit,
  6. BlockingQueue<Runnable> workQueue);
  7. //间接调用最后一个构造函数,采用默认的任务拒绝策略AbortPolicy
  8. public ThreadPoolExecutor(int corePoolSize,
  9. int maximumPoolSize,
  10. long keepAliveTime,
  11. TimeUnit unit,
  12. BlockingQueue<Runnable> workQueue,
  13. ThreadFactory threadFactory);
  14. //间接调用最后一个构造函数,采用默认的默认的线程工厂
  15. public ThreadPoolExecutor(int corePoolSize,
  16. int maximumPoolSize,
  17. long keepAliveTime,
  18. TimeUnit unit,
  19. BlockingQueue<Runnable> workQueue,
  20. RejectedExecutionHandler handler);
  21. //前面三个分别调用了最后一个,主要的构造函数
  22. public ThreadPoolExecutor(int corePoolSize,
  23. int maximumPoolSize,
  24. long keepAliveTime,
  25. TimeUnit unit,
  26. BlockingQueue<Runnable> workQueue,
  27. ThreadFactory threadFactory,
  28. RejectedExecutionHandler handler);

接下来,看下最后一个构造函数的具体实现:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) {
  8. //参数合法性校验
  9. if (corePoolSize < 0 ||
  10. maximumPoolSize <= 0 ||
  11. maximumPoolSize < corePoolSize ||
  12. keepAliveTime < 0)
  13. throw new IllegalArgumentException();
  14. //参数合法性校验
  15. if (workQueue == null || threadFactory == null || handler == null)
  16. throw new NullPointerException();
  17. this.acc = System.getSecurityManager() == null ?
  18. null :
  19. AccessController.getContext();
  20. //初始化对应的属性
  21. this.corePoolSize = corePoolSize;
  22. this.maximumPoolSize = maximumPoolSize;
  23. this.workQueue = workQueue;
  24. this.keepAliveTime = unit.toNanos(keepAliveTime);
  25. this.threadFactory = threadFactory;
  26. this.handler = handler;
  27. }

下面解释下一下构造器中各个参数的含义:

1.corePoolSize

线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行。

2.maximumPoolSize

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。

3.keepAliveTime

线程空闲时的存活时间。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,keepAliveTime参数也会起作用,直到线程池中的线程数为0。

4.unit

keepAliveTime参数的时间单位。

5.workQueue

任务缓存队列,用来存放等待执行的任务。如果当前线程数为corePoolSize,继续提交的任务就会被保存到任务缓存队列中,等待被执行。

一般来说,这里的BlockingQueue有以下三种选择:

SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。因此,如果线程池中始终没有空闲线程(任务提交的平均速度快于被处理的速度),可能出现无限制的线程增长。

LinkedBlockingQueue:基于链表结构的阻塞队列,如果不设置初始化容量,其容量为Integer.MAX_VALUE,即为无界队列。因此,如果线程池中线程数达到了corePoolSize,且始终没有空闲线程(任务提交的平均速度快于被处理的速度),任务缓存队列可能出现无限制的增长。

ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。

6.threadFactory

线程工厂,创建新线程时使用的线程工厂。

7.handler

任务拒绝策略,当阻塞队列满了,且线程池中的线程数达到maximumPoolSize,如果继续提交任务,就会采取任务拒绝策略处理该任务,线程池提供了4种任务拒绝策略:

AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,默认策略;

CallerRunsPolicy:由调用execute方法的线程执行该任务;

DiscardPolicy:丢弃任务,但是不抛出异常;

DiscardOldestPolicy:丢弃阻塞队列最前面的任务,然后重新尝试执行任务(重复此过程)。

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。

四.线程池的实现原理

1.提交任务

线程池框架提供了两种方式提交任务,submit()和execute(),通过submit()方法提交的任务可以返回任务执行的结果,通过execute()方法提交的任务不能获取任务执行的结果。

submit()方法的实现有以下三种:

  1. public Future<?> submit(Runnable task);
  2. public <T> Future<T> submit(Runnable task, T result);
  3. public <T> Future<T> submit(Callable<T> task);

下面以第一个方法为例简单看一下submit()方法的实现:

  1. public Future<?> submit(Runnable task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<Void> ftask = newTaskFor(task, null);
  4. execute(ftask);
  5. return ftask;
  6. }

submit()方法是在ThreadPoolExecutor的父类AbstractExecutorService类实现的,最终还是调用的ThreadPoolExecutor类的execute()方法,下面着重看一下execute()方法的实现。

  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. //获取线程池控制状态
  5. int c = ctl.get();
  6. // (1)
  7. //worker数量小于corePoolSize
  8. if (workerCountOf(c) < corePoolSize) {
  9. //创建worker,addWorker方法boolean参数用来判断是否创建核心线程
  10. if (addWorker(command, true))
  11. //成功则返回
  12. return;
  13. //失败则再次获取线程池控制状态
  14. c = ctl.get();
  15. }
  16. //(2)
  17. //线程池处于RUNNING状态,将任务加入workQueue任务缓存队列
  18. if (isRunning(c) && workQueue.offer(command)) {
  19. // 再次检查,获取线程池控制状态,防止在任务入队的过程中线程池关闭了或者线程池中没有线程了
  20. int recheck = ctl.get();
  21. //线程池不处于RUNNING状态,且将任务从workQueue移除成功
  22. if (! isRunning(recheck) && remove(command))
  23. //采取任务拒绝策略
  24. reject(command);
  25. //worker数量等于0
  26. else if (workerCountOf(recheck) == 0)
  27. //创建worker
  28. addWorker(null, false);
  29. }
  30. //(3)
  31. else if (!addWorker(command, false)) //创建worker
  32. reject(command); //如果创建worker失败,采取任务拒绝策略
  33. }

execute()方法的执行流程可以总结如下:

若线程池工作线程数量小于corePoolSize,则创建新线程来执行任务

若工作线程数量大于或等于corePoolSize,则将任务加入BlockingQueue

若无法将任务加入BlockingQueue(BlockingQueue已满),且工作线程数量小于maximumPoolSize,则创建新的线程来执行任务

若工作线程数量达到maximumPoolSize,则创建线程失败,采取任务拒绝策略

可以结合下面的两张图来理解线程池提交任务的执行流程。

ThreadPoolExecutor的execute()方法的执行示意图

2.创建线程

从execute()方法的实现可以看出,addWorker()方法主要负责创建新的线程并执行任务,代码实现如下:

  1. //addWorker有两个参数:Runnable类型的firstTask,用于指定新增的线程执行的第一个任务;boolean类型的core,表示是否创建核心线程
  2. //该方法的返回值代表是否成功新增一个线程
  3. private boolean addWorker(Runnable firstTask, boolean core) {
  4. retry:
  5. for (;;) {
  6. int c = ctl.get();
  7. int rs = runStateOf(c);
  8. // (1)
  9. if (rs >= SHUTDOWN &&
  10. ! (rs == SHUTDOWN &&
  11. firstTask == null &&
  12. ! workQueue.isEmpty()))
  13. return false;
  14. for (;;) {
  15. int wc = workerCountOf(c);
  16. //线程数超标,不能再创建线程,直接返回
  17. if (wc >= CAPACITY ||
  18. wc >= (core ? corePoolSize : maximumPoolSize))
  19. return false;
  20. //CAS操作递增workCount
  21. //如果成功,那么创建线程前的所有条件校验都满足了,准备创建线程执行任务,退出retry循环
  22. //如果失败,说明有其他线程也在尝试往线程池中创建线程(往线程池提交任务可以是并发的),则继续往下执行
  23. if (compareAndIncrementWorkerCount(c))
  24. break retry;
  25. //重新获取线程池控制状态
  26. c = ctl.get();
  27. // 如果线程池的状态发生了变更,如有其他线程关闭了这个线程池,那么需要回到外层的for循环
  28. if (runStateOf(c) != rs)
  29. continue retry;
  30. //如果只是CAS操作失败的话,进入内层的for循环就可以了
  31. }
  32. }
  33. //到这里,创建线程前的所有条件校验都满足了,可以开始创建线程来执行任务
  34. //worker是否已经启动
  35. boolean workerStarted = false;
  36. //是否已将这个worker添加到workers这个HashSet中
  37. boolean workerAdded = false;
  38. Worker w = null;
  39. try {
  40. //创建一个worker,从这里可以看出对线程的包装
  41. w = new Worker(firstTask);
  42. //取出worker中的线程对象,Worker的构造方法会调用ThreadFactory来创建一个新的线程
  43. final Thread t = w.thread;
  44. if (t != null) {
  45. //获取全局锁, 并发的访问线程池workers对象必须加锁,持有锁的期间线程池也不会被关闭
  46. final ReentrantLock mainLock = this.mainLock;
  47. mainLock.lock();
  48. try {
  49. //重新获取线程池的运行状态
  50. int rs = runStateOf(ctl.get());
  51. //小于SHUTTDOWN即RUNNING
  52. //等于SHUTDOWN并且firstTask为null,不接受新的任务,但是会继续执行等待队列中的任务
  53. if (rs < SHUTDOWN ||
  54. (rs == SHUTDOWN && firstTask == null)) {
  55. //worker里面的thread不能是已启动的
  56. if (t.isAlive())
  57. throw new IllegalThreadStateException();
  58. //将新创建的线程加入到线程池中
  59. workers.add(w);
  60. int s = workers.size();
  61. // 更新largestPoolSize
  62. if (s > largestPoolSize)
  63. largestPoolSize = s;
  64. workerAdded = true;
  65. }
  66. } finally {
  67. mainLock.unlock();
  68. }
  69. //线程添加线程池成功,则启动新创建的线程
  70. if (workerAdded) {
  71. t.start();
  72. workerStarted = true;
  73. }
  74. }
  75. } finally {
  76. //若线程启动失败,做一些清理工作,例如从workers中移除新添加的worker并递减wokerCount
  77. if (! workerStarted)
  78. addWorkerFailed(w);
  79. }
  80. //返回线程是否启动成功
  81. return workerStarted;
  82. }

因为代码(1)处的逻辑不利于理解,我们通过(1)的等价实现来理解:

  1. if (rs>=SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
  2. //等价实现
  3. rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())

其含义为,满足下列条件之一则直接返回false,线程创建失败:

  1. rs > SHUTDOWN,也就是STOP,TIDYINGTERMINATED,此时不再接受新的任务,且中断正在执行的任务
  2. rs = SHUTDOWNfirstTask != null,此时不再接受任务,但是仍会处理任务缓存队列中的任务
  3. rs = SHUTDOWN,队列为空

多说一句,若线程池处于 SHUTDOWN, firstTask 为 null,且 workQueue 非空,那么还得创建线程继续处理任务缓存队列中的任务。

总结一下,addWorker()方法完成了如下几件任务:

  • 1.原子性的增加workerCount

  • 2.将用户给定的任务封装成为一个worker,并将此worker添加进workers集合中

  • 3.启动worker对应的线程

  • 4.若线程启动失败,回滚worker的创建动作,即从workers中移除新添加的worker,并原子性的减少workerCount

    3.工作线程的实现

从addWorker()方法的实现可以看出,工作线程的创建和启动都跟ThreadPoolExecutor中的内部类Worker有关。下面我们分析Worker类来看一下工作线程的实现。

Worker类继承自AQS类,具有锁的功能;实现了Runable接口,可以将自身作为一个任务在线程中执行。

  1. private final class Worker
  2. extends AbstractQueuedSynchronizer
  3. implements Runnable

Worker的主要字段就下面三个,代码也比较简单。
//用来封装worker的线程,线程池中真正运行的线程,通过线程工厂创建而来

  1. final Thread thread;
  2. //worker所对应的第一个任务,可能为空
  3. Runnable firstTask;
  4. //记录当前线程完成的任务数
  5. volatile long completedTasks;

Worker的构造函数如下。

  1. Worker(Runnable firstTask) {
  2. //设置AQS的state为-1,在执行runWorker()方法之前阻止线程中断
  3. setState(-1);
  4. //初始化第一个任务
  5. this.firstTask = firstTask;
  6. //利用指定的线程工厂创建一个线程,注意,参数是Worker实例本身this
  7. //也就是当执行start方法启动线程thread时,真正执行的是Worker类的run方法
  8. this.thread = getThreadFactory().newThread(this);
  9. }

Worker类继承了AQS类,重写了其相应的方法,实现了一个自定义的同步器,实现了不可重入锁。
//是否持有独占锁

  1. protected boolean isHeldExclusively() {
  2. return getState() != 0;
  3. }
  4. //尝试获取锁
  5. protected boolean tryAcquire(int unused) {
  6. if (compareAndSetState(0, 1)) {
  7. //设置独占线程
  8. setExclusiveOwnerThread(Thread.currentThread());
  9. return true;
  10. }
  11. return false;
  12. }
  13. //尝试释放锁
  14. protected boolean tryRelease(int unused) {
  15. //设置独占线程为null
  16. setExclusiveOwnerThread(null);
  17. setState(0);
  18. return true;
  19. }
  20. //获取锁
  21. public void lock() { acquire(1); }
  22. //尝试获取锁
  23. public boolean tryLock() { return tryAcquire(1); }
  24. //释放锁
  25. public void unlock() { release(1); }
  26. //是否持有锁
  27. public boolean isLocked() { return isHeldExclusively(); }

Worker类还提供了一个中断线程thread的方法。

  1. void interruptIfStarted() {
  2. Thread t;
  3. //AQS状态大于等于0,worker对应的线程不为null,且该线程没有被中断
  4. if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
  5. try {
  6. t.interrupt();
  7. } catch (SecurityException ignore) {
  8. }
  9. }
  10. }

再来看一下Worker类的run()方法的实现,会发现run()方法最终调用了ThreadPoolExecutor类的runWorker()方法。

  1. public void run() {
  2. runWorker(this);
  3. }

4.线程复用机制

通过上文可以知道,worker中的线程start 后,执行的是worker的run()方法,而run()方法最终会调用ThreadPoolExecutor类的runWorker()方法,runWorker()方法实现了线程池中的线程复用机制。下面我们来看一下runWorker()方法的实现。

  1. final void runWorker(Worker w) {
  2. //获取当前线程
  3. Thread wt = Thread.currentThread();
  4. //获取w的firstTask
  5. Runnable task = w.firstTask;
  6. //设置w的firstTask为null
  7. w.firstTask = null;
  8. // 释放锁,设置AQS的state为0,允许中断
  9. w.unlock();
  10. //用于标识线程是否异常终止,finally中processWorkerExit()方法会有不同逻辑
  11. boolean completedAbruptly = true;
  12. try {
  13. //循环调用getTask()获取任务,不断从任务缓存队列获取任务并执行
  14. while (task != null || (task = getTask()) != null) {
  15. //进入循环内部,代表已经获取到可执行的任务,则对worker对象加锁,保证线程在执行任务过程中不会被中断
  16. w.lock();
  17. if ((runStateAtLeast(ctl.get(), STOP) || //若线程池状态大于等于STOP,那么意味着该线程要中断
  18. (Thread.interrupted() && //线程被中断
  19. runStateAtLeast(ctl.get(), STOP))) && //且是因为线程池内部状态变化而被中断
  20. !wt.isInterrupted()) //确保该线程未被中断
  21. //发出中断请求
  22. wt.interrupt();
  23. try {
  24. //开始执行任务前的Hook方法
  25. beforeExecute(wt, task);
  26. Throwable thrown = null;
  27. try {
  28. //到这里正式开始执行任务
  29. task.run();
  30. } catch (RuntimeException x) {
  31. thrown = x; throw x;
  32. } catch (Error x) {
  33. thrown = x; throw x;
  34. } catch (Throwable x) {
  35. thrown = x; throw new Error(x);
  36. } finally {
  37. //执行任务后的Hook方法
  38. afterExecute(task, thrown);
  39. }
  40. } finally {
  41. //置空task,准备通过getTask()获取下一个任务
  42. task = null;
  43. //completedTasks递增
  44. w.completedTasks++;
  45. //释放掉worker持有的独占锁
  46. w.unlock();
  47. }
  48. }
  49. completedAbruptly = false;
  50. } finally {
  51. //到这里,线程执行结束,需要执行结束线程的一些清理工作
  52. //线程执行结束可能有两种情况:
  53. //1.getTask()返回null,也就是说,这个worker的使命结束了,线程执行结束
  54. //2.任务执行过程中发生了异常
  55. //第一种情况,getTask()返回null,那么getTask()中会将workerCount递减
  56. //第二种情况,workerCount没有进行处理,这个递减操作会在processWorkerExit()中处理
  57. processWorkerExit(w, completedAbruptly);
  58. }
  59. }

runWorker()方法是线程池的核心,实现了线程池中的线程复用机制,来看一下

runWorker()方法都做了哪些工作:

  • 1.运行第一个任务firstTask之后,循环调用getTask()方法获取任务,不断从任务缓存队列获取任务并执行;

  • 2.获取到任务之后就对worker对象加锁,保证线程在执行任务的过程中不会被中断,任务执行完会释放锁;

  • 3.在执行任务的前后,可以根据业务场景重写beforeExecute()和afterExecute()等Hook方法;

  • 4.执行通过getTask()方法获取到的任务

  • 5.线程执行结束后,调用processWorkerExit()方法执行结束线程的一些清理工作

从runWorker()方法的实现可以看出,runWorker()方法中主要调用了getTask()方法和processWorkerExit()方法,下面分别看一下这两个方法的实现。

getTask()的实现

getTask()方法用来不断地从任务缓存队列获取任务并交给线程执行,下面分析一下其实现。

  1. private Runnable getTask() {
  2. //标识当前线程是否超时未能获取到task对象
  3. boolean timedOut = false;
  4. for (;;) {
  5. //获取线程池的控制状态
  6. int c = ctl.get();
  7. //获取线程池的运行状态
  8. int rs = runStateOf(c);
  9. //如果线程池状态大于等于STOP,或者处于SHUTDOWN状态,并且阻塞队列为空,线程池工作线程数量递减,方法返回null,回收线程
  10. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  11. decrementWorkerCount();
  12. return null;
  13. }
  14. //获取worker数量
  15. int wc = workerCountOf(c);
  16. //标识当前线程在空闲时,是否应该超时回收
  17. // 如果allowCoreThreadTimeOut为ture,或当前线程数大于核心池大小,则需要超时回收
  18. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  19. //如果worker数量大于maximumPoolSize(有可能调用了 setMaximumPoolSize(),导致worker数量大于maximumPoolSize)
  20. if ((wc > maximumPoolSize || (timed && timedOut)) //或者获取任务超时
  21. && (wc > 1 || workQueue.isEmpty())) { //workerCount大于1或者阻塞队列为空(在阻塞队列不为空时,需要保证至少有一个工作线程)
  22. if (compareAndDecrementWorkerCount(c))
  23. //线程池工作线程数量递减,方法返回null,回收线程
  24. return null;
  25. //线程池工作线程数量递减失败,跳过剩余部分,继续循环
  26. continue;
  27. }
  28. try {
  29. //如果允许超时回收,则调用阻塞队列的poll(),只在keepAliveTime时间内等待获取任务,一旦超过则返回null
  30. //否则调用take(),如果队列为空,线程进入阻塞状态,无限时等待任务,直到队列中有可取任务或者响应中断信号退出
  31. Runnable r = timed ?
  32. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  33. workQueue.take();
  34. //若task不为null,则返回成功获取的task对象
  35. if (r != null)
  36. return r;
  37. // 若返回task为null,表示线程空闲时间超时,则设置timeOut为true
  38. timedOut = true;
  39. } catch (InterruptedException retry) {
  40. //如果此worker发生了中断,采取的方案是重试,没有超时
  41. //在哪些情况下会发生中断?调用setMaximumPoolSize(),shutDown(),shutDownNow()
  42. timedOut = false;
  43. }
  44. }
  45. }

接下来总结一下getTask()方法会在哪些情况下返回:

  • 1.线程池处于RUNNING状态,阻塞队列不为空,返回成功获取的task对象

  • 2.线程池处于SHUTDOWN状态,阻塞队列不为空,返回成功获取的task对象

  • 3.线程池状态大于等于STOP,返回null,回收线程

  • 4.线程池处于SHUTDOWN状态,并且阻塞队列为空,返回null,回收线程

  • 5.worker数量大于maximumPoolSize,返回null,回收线程

  • 6.线程空闲时间超时,返回null,回收线程

processWorkerExit()的实现

processWorkerExit()方法负责执行结束线程的一些清理工作,下面分析一下其实现。

  1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
  2. //如果用户任务执行过程中发生了异常,则需要递减workerCount
  3. if (completedAbruptly)
  4. decrementWorkerCount();
  5. final ReentrantLock mainLock = this.mainLock;
  6. //获取全局锁
  7. mainLock.lock();
  8. try {
  9. //将worker完成任务的数量累加到总的完成任务数中
  10. completedTaskCount += w.completedTasks;
  11. //从workers集合中移除该worker
  12. workers.remove(w);
  13. } finally {
  14. //释放锁
  15. mainLock.unlock();
  16. }
  17. //尝试终止线程池
  18. tryTerminate();
  19. //获取线程池控制状态
  20. int c = ctl.get();
  21. if (runStateLessThan(c, STOP)) { //线程池运行状态小于STOP
  22. if (!completedAbruptly) { //如果用户任务执行过程中发生了异常,则直接调用addWorker()方法创建线程
  23. //是否允许核心线程超时
  24. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
  25. //允许核心超时并且workQueue阻塞队列不为空,那线程池中至少有一个工作线程
  26. if (min == 0 && ! workQueue.isEmpty())
  27. min = 1;
  28. //如果工作线程数量workerCount大于等于核心池大小corePoolSize,
  29. //或者允许核心超时并且workQueue阻塞队列不为空时,线程池中至少有一个工作线程,直接返回
  30. if (workerCountOf(c) >= min)
  31. return;
  32. //若不满足上述条件,则调用addWorker()方法创建线程
  33. }
  34. //创建新的线程取代当前线程
  35. addWorker(null, false);
  36. }
  37. }

processWorkerExit()方法中主要调用了tryTerminate()方法,下面看一下tryTerminate()方法的实现。

  1. final void tryTerminate() {
  2. for (;;) {
  3. //获取线程池控制状态
  4. int c = ctl.get();
  5. if (isRunning(c) || //线程池的运行状态为RUNNING
  6. runStateAtLeast(c, TIDYING) || //线程池的运行状态大于等于TIDYING
  7. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) //线程池的运行状态为SHUTDOWN且阻塞队列不为空
  8. //不能终止,直接返回
  9. return;
  10. //只有当线程池的运行状态为STOP,或线程池运行状态为SHUTDOWN且阻塞队列为空时,可以执行到这里
  11. //如果线程池工作线程的数量不为0
  12. if (workerCountOf(c) != 0) {
  13. //仅仅中断一个空闲的worker
  14. interruptIdleWorkers(ONLY_ONE);
  15. return;
  16. }
  17. //只有当线程池工作线程的数量为0时可以执行到这里
  18. final ReentrantLock mainLock = this.mainLock;
  19. //获取全局锁
  20. mainLock.lock();
  21. try {
  22. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { //CAS操作设置线程池运行状态为TIDYING,工作线程数量为0
  23. try {
  24. //执行terminated()钩子方法
  25. terminated();
  26. } finally {
  27. //设置线程池运行状态为TERMINATED,工作线程数量为0
  28. ctl.set(ctlOf(TERMINATED, 0));
  29. //唤醒在termination条件上等待的所有线程
  30. termination.signalAll();
  31. }
  32. return;
  33. }
  34. } finally {
  35. //释放锁
  36. mainLock.unlock();
  37. }
  38. //若CAS操作失败则重试
  39. }
  40. }

tryTerminate()方法的作用是尝试终止线程池,它会在所有可能终止线程池的地方被调用,满足终止线程池的条件有两个:首先,线程池状态为STOP,或者为SHUTDOWN且任务缓存队列为空;其次,工作线程数量为0。

满足了上述两个条件之后,tryTerminate()方法获取全局锁,设置线程池运行状态为TIDYING,之后执行terminated()钩子方法,最后设置线程池状态为TERMINATED。

至此,线程池运行状态变为TERMINATED,工作线程数量为0,workers已清空,且workQueue也已清空,所有线程都执行结束,线程池的生命周期到此结束。

5.关闭线程池

关闭线程池有两个方法,shutdown()和shutdownNow(),下面分别看一下这两个方法的实现。

shutdown()的实现

shutdown()方法将线程池运行状态设置为SHUTDOWN,此时线程池不会接受新的任务,但会处理阻塞队列中的任务。

  1. public void shutdown() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. //获取全局锁
  4. mainLock.lock();
  5. try {
  6. //检查shutdown权限
  7. checkShutdownAccess();
  8. //设置线程池运行状态为SHUTDOWN
  9. advanceRunState(SHUTDOWN);
  10. //中断所有空闲worker
  11. interruptIdleWorkers();
  12. //用onShutdown()钩子方法
  13. onShutdown();
  14. } finally {
  15. //释放锁
  16. mainLock.unlock();
  17. }
  18. //尝试终止线程池
  19. tryTerminate();
  20. }

shutdown()方法首先会检查是否具有shutdown的权限,然后设置线程池的运行状态为SHUTDOWN,之后中断所有空闲的worker,再调用onShutdown()钩子方法,最后尝试终止线程池。

shutdown()方法调用了interruptIdleWorkers()方法中断所有空闲的worker,其实现如下。

  1. private void interruptIdleWorkers() {
  2. interruptIdleWorkers(false);
  3. }
  4. //onlyOne标识是否只中断一个线程
  5. private void interruptIdleWorkers(boolean onlyOne) {
  6. final ReentrantLock mainLock = this.mainLock;
  7. //获取全局锁
  8. mainLock.lock();
  9. try {
  10. //遍历workers集合
  11. for (Worker w : workers) {
  12. //worker对应的线程
  13. Thread t = w.thread;
  14. //线程未被中断且成功获得锁
  15. if (!t.isInterrupted() && w.tryLock()) {
  16. try {
  17. //发出中断请求
  18. t.interrupt();
  19. } catch (SecurityException ignore) {
  20. } finally {
  21. //释放锁
  22. w.unlock();
  23. }
  24. }
  25. //若只中断一个线程,则跳出循环
  26. if (onlyOne)
  27. break;
  28. }
  29. } finally {
  30. //释放锁
  31. mainLock.unlock();
  32. }
  33. }

shutdownNow()的实现

shutdownNow()方法将线程池运行状态设置为STOP,此时线程池不会接受新任务,也不会处理阻塞队列中的任务,并且中断正在运行的任务。

  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. //获取全局锁
  5. mainLock.lock();
  6. try {
  7. //检查shutdown权限
  8. checkShutdownAccess();
  9. //设置线程池运行状态为STOP
  10. advanceRunState(STOP);
  11. //中断所有worker
  12. interruptWorkers();
  13. //将任务缓存队列中等待执行的任务取出并放到list中
  14. tasks = drainQueue();
  15. } finally {
  16. //释放锁
  17. mainLock.unlock();
  18. }
  19. //尝试终止线程池
  20. tryTerminate();
  21. //返回任务缓存队列中等待执行的任务列表
  22. return tasks;
  23. }

shutdownNow()方法与shutdown()方法相似,不同之处在于,前者设置线程池的运行状态为STOP,之后中断所有的worker(并非只是空闲的worker),尝试终止线程池之后,返回任务缓存队列中等待执行的任务列表。

shutdownNow()方法调用了interruptWorkers()方法中断所有的worker(并非只是空闲的worker),其实现如下。

  1. private void interruptWorkers() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. //获取全局锁
  4. mainLock.lock();
  5. try {
  6. //遍历workers集合
  7. for (Worker w : workers)
  8. //调用Worker类的interruptIfStarted()方法中断线程
  9. w.interruptIfStarted();
  10. } finally {
  11. //释放锁
  12. mainLock.unlock();
  13. }
  14. }

五.总结

至此,我们已经阅读了线程池框架的核心类ThreadPoolExecutor类的大部分源码,由衷地赞叹这个类很多地方设计的巧妙之处:

将线程池的运行状态和工作线程数量打包在一起,并使用了大量的位运算

使用CAS操作更新线程控制状态ctl,确保对ctl的更新是原子操作

内部类Worker类继承了AQS,实现了一个自定义的同步器,实现了不可重入锁

使用while循环自旋地从任务缓存队列中获取任务并执行,实现了线程复用机制

调用interrupt()方法中断线程,但注意该方法并不能直接中断线程的运行,只是发出了中断信号,配合BlockingQueue的take(),poll()方法的使用,打断线程的阻塞状态

其实,线程池的本质就是生产者消费者模式,线程池的调用者不断向线程池提交任务,线程池里面的工作线程不断获取这些任务并执行(从任务缓存队列获取任务或者直接执行任务)。

读完本文,相信大家对线程池的实现原理有了深刻的认识,比如向线程池提交一个任务之后线程池的执行流程,一个任务从被提交到被执行会经历哪些过程,一个工作线程从被创建到正常执行到执行结束的执行过程,等等。

暗锚,解决锚点偏移

文章评论

嘿,来试试登录吧!