Java线程池实现原理(二)

转自美团技术团队

线程池生命周期管理

线程池运行的状态,并不是用户显式设置的,而是伴随着线程池的运行,由线程池内部来维护。线程池内部使用一个变量维护两个值:运行状态(runState)和线程数量(workerCount)。在具体实现中,线程池将运行状态(runState)和线程数量(workerCount)两个关键参数的维护放在了一起。

1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl这个AtomicInteger类型,是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它同时包含了两部分的信息:运行状态(runState)和线程数量(workerCount),高3位保存runState,低29位保存workerCount,两个变量之间互不干扰。这样做的好处是:源码中经常出现要同时判断上述两种状态的情况,这时候通过一个变量就可以获取,而不用为了维护两者的一致去占用资源锁,然后线程池也提供了很多方法供用户获取当前的运行状态和线程个数。这里都是用位运算,速度会比基本运算快很多。下面代码是一些线程池的方法:

1
2
3
private static int runStateOf(int c)     { return c & ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c) { return c & CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl

ThreadPoolExecutor的运行状态有5种,分别为:

运行状态 状态描述
RUNNING 能接受新提交的任务,并且也能处理阻塞队列中的任务
SHUTDOWN 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
STOP 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程
TIDYING 所有任务都已经终止了,workerCount为0
TERMINATED 在terminated()方法执行后进入该状态

其生命周期转换如下所示:

任务管理机制

任务调度

任务调度是线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。先来介绍一些基本概念:

corePoolSize:

线程池的基本大小,即在没有任务需要执行的时候线程池的大小,并且只有在工作队列满了的情况下才会创建超过这个数量的线程。(线程池本身也要创建线程去执行任务,只是线程池维护了这些创建线程的生命周期,看什么时候需要创建,什么时候要销毁)

maximumPoolSize:

线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。若果队列中任务已满,并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务。这里值得一提的是largesPoolSize,该变量记录了线程池在整个生命周期中曾经出现的最大线程个数。为什么生活曾经呢?因为线程池创建之后,可以调用setMaximumPoolSize()来改变这个最大线程数目。

poolSize:

线程池中当前线程的数量,当该值为0的时候,意味着没有任何线程,线程池会终止;poolSize也不会超过maximumPoolSize.

首先,所有任务的调度都是由execute方法完成的,这部分完成的工作是:检查现在的线程池的运行状态、运行线程数目、运行策略、决定接下来执行的流程,是直接申请线程执行,缓冲到队列中执行,还是直接拒绝执行。参看上图任务分配机制。其执行过程如下:

  1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  2. 如果workerCount(线程数量) < corePoolSize,则创建并启动一个线程来执行新提交的任务。
  3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列(BlockQueue)没有满,则将任务添加到该阻塞队列中。
  4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池中阻塞队列已满,则创建并启动一个线程来执行新提交的任务。
  5. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务,默认的拒绝策略是直接抛出异常。

其执行流程如下图所示:

任务缓冲

任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,其中任务的管理相对是一件容易的事情,复杂的是线程的管理,这会涉及到线程池数量,等待/唤醒,同步/锁,线程创建和死亡等问题。线程池中以生产者消费者模式,通过一个阻塞队列来实现将任务和线程两者解耦的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加操作是:在队列为空时,获取元素的线程会等待队列变为非空,当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者时往队列中添加元素的线程,消费者时总队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只是从容器里拿元素。下图中展示了线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素:

使用不同的队列可以实现不一样的任务存取策略,在这里,我们可以再介绍下阻塞队列的成员:(针对不同场景使用)

名称 描述
ArrayBlockingQueue 一个用数组实现的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。支持公平锁和非公平锁
LinkedBlockingQueue 一个由链表结构组成的有界队列,此队列按照先进先出的原则对元素进行排序。此队列的默认长度为Integer.MAX_VALUE,所以默认创建的该队列有容量危险
PriorityBlockingQueue 一个支持线程优先级排序的无界队列,默认自然序进行排序,也可以自定义实现compareTo()方法来指定元素排序规则,不能保证同优先级元素的顺序
DelayQueue 一个实现了PriorityBlockingQueue实现延迟获取的无界队列,在创建元素时,可以指定多久才能从队列中获取当前元素,只有延迟满后才能从队列中获取元素
SynchronousQueue 一个不存储元素的阻塞队列,每一个put操作必须等待take操作,否则不能添加元素。支持公平锁和非公平锁。SynchronousQueue的一个只用场景是在线程池里。Executors.newCachedThreadPool()就使用了这个队列,这个线程池根据需要(新任务到来时)创建新线程,如果有空闲线程则会重复使用,线程空闲了60s后会被回收(需要关注使用场景)
LinkedTransferQueue 一个由链表结构组成的无界阻塞队列,相比与其他队列,LinkedTransferQueue队列多了transfer和tryTransfer方法
LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。队列头部和尾部都可以添加和移除元素,多线程并发时,可以将锁的竞争最多降到一半

任务申请

由上文的任务分配部分可以知道,任务的执行有两种可能:

  1. 直接创建新的线程来处理。(在线程初始创建的时候)
  2. 线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次从队列中申请任务去执行。(大多数时间线程池工作的情况)

线程需要从任务缓存模块中不断的取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现。其执行流程如下:

getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态,如果线程池现在不应该持有那么多线程,则返回null。工作线程Worker会不断接收新任务执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。(怎么判断线程是否为可回收)

任务拒绝

任务拒绝模块是线程池的保护部分,线程池有一个最大的容量(maximumPoolSize),当线程池任务队列已满,并且线程池中的线程数目达到最大容量,就需要拒绝执行该任务了,采取任务拒绝策略来保护线程池。

拒绝策略需要实现一个接口:

1
2
3
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

用户可以通过实现这个接口去定制拒绝策略,也可以选择JDK提供的四种已有拒绝策略,其特点如下:

序号 名称 描述
1 ThreadPoolExecutor.AborPolicy 默认方式,直接丢弃任务并抛出RejectedExecutionException异常。在任务不能再提交的时候,抛出异常,及时反馈程序运行状态。如果是比较关键的业务,推荐使用此策略,这样在系统不能承载更大的并发量的时候,能及时通过异常发现问题
2 ThreadPoolExecutor.DiscardPolicy 丢弃任务,但是不抛出异常。使用此策略,可能使我们无法发现系统的异常状态。
3 ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新提交被拒绝的任务。
4 ThreadPoolExecutor.CallerRunsPolicy 由调用线程(提交任务的线程)处理该任务,这种情况是需要让所有任务都执行完毕,那么就适合大量计算的任务类型去执行,多线程仅仅是增大吞吐量的手段,最终必须要让每个任务都执行完毕