JAVA线程池原理与源码分析
1、线程池常用接口介绍
1.1、Executor
1 | public interface Executor { |
执行提交的Runnable任务。其中的execute方法在将来的某个时候执行给定的任务,该任务可以在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定。
1.2、ExecutorService
ExecutorService继承自Executor,下面挑几个方法介绍:
1.2.1、shutdown()
1 | void shutdown(); |
启动有序关闭线程池,在此过程中执行先前提交的任务,但不接受任何新任务。如果线程池已经关闭,调用此方法不会产生额外的效果。此方法不等待以前提交的任务完成执行,可以使用awaitTermination去实现。
1.2.2、shutdownNow()
1 | List<Runnable> shutdownNow(); |
尝试停止所有正在积极执行的任务, 停止处理等待的任务,并返回等待执行的任务列表。 此方法不等待以前提交的任务完成执行,可以使用awaitTermination去实现。除了尽最大努力停止处理积极执行的任务外,没有任何保证。例如,典型的实现是:通过Thread#interrupt取消任务执行,但是任何未能响应中断的任务都可能永远不会终止。
1.2.3、isShutdown()
1 | boolean isShutdown(); |
返回线程池关闭状态。
1.2.4、isTerminated()
1 | boolean isTerminated(); |
如果关闭后所有任务都已完成,则返回 true。注意,除非首先调用了shutdown或shutdownNow,否则isTerminated永远不会返回true。
1.2.5、awaitTermination(long timeout, TimeUnit unit)
1 | boolean awaitTermination(long timeout, TimeUnit unit) |
线程阻塞阻塞,直到所有任务都在shutdown请求之后执行完毕,或者超时发生,或者当前线程被中断(以先发生的情况为准)。
1.2.6、submit
1 | <T> Future<T> submit(Callable<T> task); |
提交一个value-returning任务以执行,并返回一个表示该任务未决结果的Future。 Future的 get方法将在成功完成任务后返回任务的结果。
1.3、ScheduledExecutorService
安排命令在给定的延迟之后运行,或者定期执行,继承自ExecutorService接口由以下四个方法组成:
1 | //在给定延迟之后启动任务,返回ScheduledFuture |
1.4、ThreadFactory
1 | public interface ThreadFactory { |
按需创建新线程的对象。
1.5、Callable
1 |
|
返回任务结果也可能抛出异常。
1.6、Future
1 | public interface Future<V> { |
Future表示异步计算的结果。方法用于检查计算是否完成,等待计算完成并检索计算结果。只有当计算完成时,才可以使用方法get检索结果,如果需要,可以阻塞,直到准备好为止。取消由cancel方法执行。还提供了其他方法来确定任务是否正常完成或被取消。一旦计算完成,就不能取消计算。
1.7、Delayed
1 | public interface Delayed extends Comparable<Delayed> { |
一种混合风格的接口,用于标记在给定延迟之后应该执行的对象。
1.8、ScheduledFuture
1 | public interface ScheduledFuture<V> extends Delayed, Future<V> {} |
2、线程池工作流程
新任务进来时:
- 如果当前运行的线程少于corePoolSize,则创建新线程(核心线程)来执行任务。
- 如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue。
- 如果BlockingQueue队列已满,则创建新的线程(非核心)来处理任务。
- 如果核心线程与非核心线程总数超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler拒绝策略。
3、ThreadPoolExecutor介绍
构造方法:
1 | public ThreadPoolExecutor( |
参数说明:
- corePoolSize
除非设置了 allowCoreThreadTimeOut,否则要保留在线程池中的线程数(即使它们是空闲的)。 - maximumPoolSize
线程池中允许的最大线程数。 - keepAliveTime
当线程数大于corePoolSize时,这是多余的空闲线程在终止新任务之前等待新任务的最长时间。 - unit
keepAliveTime参数的时间单位。 - workQueue
用于在任务执行前保存任务的队列。这个队列只包含execute方法提交的Runnable任务。 - threadFactory
执行程序创建新线程时使用的工厂。 - handler
由于达到线程边界和队列容量而阻塞执行时使用的处理程序。
3.1、BlockingQueue
- SynchronousQueue
不存储元素的阻塞队列,一个插入操作,必须等待移除操作结束,每个任务一个线程。使用的时候maximumPoolSize一般指定成Integer.MAX_VALUE。 - LinkedBlockingQueue
如果当前线程数大于等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中。 - ArrayBlockingQueue
可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则执行拒绝策略。 - DelayQueue
队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。 - priorityBlockingQuene
具有优先级的无界阻塞队列。
3.2、RejectedExecutionHandler
有4个ThreeadPoolExecutor内部类。
- AbortPolicy
直接抛出异常,默认策略。 - CallerRunsPolicy
用调用者所在的线程来执行任务。 - DiscardOldestPolicy
丢弃阻塞队列中靠最前的任务,并执行当前任务。 - DiscardPolicy
直接丢弃任务。
最好自定义饱和策略,实现RejectedExecutionHandler接口,如:记录日志或持久化存储不能处理的任务。
3.3、线程池大小设置
- CPU密集型
尽量使用较小的线程池,减少CUP上下文切换,一般设置为CPU核心数+1。 - IO密集型
可以适当加大线程池数量,IO多,所以在等待IO的时候,充分利用CPU,一般设置为CPU核心数2倍。
但是对于一些特别耗时的IO操作,盲目的用线程池可能也不是很好,通过异步+单线程轮询,上层再配合上一个固定的线程池,效果可能更好,参考Reactor模型。 - 混合型
视具体情况而定。
3.4、任务提交
- Callable
通过submit函数提交,返回Future对象。 - Runnable
通过execute提交,没有返回结果。
3.5、关闭线程池
- shutdown()
仅停止阻塞队列中等待的线程,那些正在执行的线程就会让他们执行结束。 - shutdownNow()
不仅会停止阻塞队列中的线程,而且会停止正在执行的线程。
4、线程池实现原理
4.1、 线程池状态
线程池的内部状态由AtomicInteger修饰的ctl表示,其高3位表示线程池的运行状态,低29位表示线程池中的线程数量。
1 | private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); |
主池控制状态ctl是一个原子整数,包含两个概念字段:
- workerCount:指示有效线程数。
- runState:指示是否运行、关闭等。
为了将这两个字段打包成一个整型,所以将workerCount限制为(2^29)-1个线程,而不是(2^31)-1个线程。
workerCount是工作线程数量。该值可能与实际活动线程的数量存在暂时性差异,例如,当ThreadFactory在被请求时无法创建线程,以及退出的线程在终止前仍在执行bookkeeping时。 用户可见的池大小报告为工作线程集的当前大小。
runState提供了生命周期,具有以下值:
- RUNNING:接受新任务并处理排队的任务
- SHUTDOWN:不接受新任务,而是处理队列的任务。
- STOP:不接受新任务,不处理队列的任务,中断正在进行的任务。
- TIDYING:所有任务都已终止,workerCount为零,过渡到状态TIDYING的线程将运行terminated()钩子方法。
- TERMINATED:terminated()方法执行完毕。
为了允许有序比较,这些值之间的数值顺序很重要。运行状态会随着时间单调地增加,但不需要达到每个状态。转换:
- RUNNING -> SHUTDOWN
在调用shutdown()时,可以隐式地在finalize()中调用。 - (RUNNING or SHUTDOWN) -> STOP
调用shutdownNow()。 - SHUTDOWN -> TIDYING
当队列和池都为空时。 - STOP -> TIDYING
当池是空的时候。 - TIDYING -> TERMINATED
当terminated()钩子方法完成时。
当状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。
下面看以下其他状态信息:
1 | //Integer.SIZE为32,COUNT_BITS为29 |
Doug Lea大神的设计啊,感觉计算机的基础真的是数学。
4.2、 内部类Worker
Worker继承了AbstractQueuedSynchronizer,并且实现了Runnable接口。
维护了以下三个变量,其中completedTasks由volatile修饰。
1 | //线程这个工作程序正在运行。如果工厂失败,则为空。 |
构造方法:
1 | //使用ThreadFactory中给定的第一个任务和线程创建。 |
既然实现了Runnable接口,必然实现run方法:
1 | //Delegates main run loop to outer runWorker |
4.3、runWorker(Worker w)执行任务
先看一眼执行流程图,再看源码,会更清晰一点:
首先来看runWorker(Worker w)源码:
1 | final void runWorker(Worker w) { |
下面再来看上述源码中的getTask()与processWorkerExit(w, completedAbruptly)方法:
4.3.1、getTask()
根据当前配置设置执行阻塞或定时等待任务,或者如果该worker因为任何原因必须退出,则返回null,在这种情况下workerCount将递减。
返回空的情况:
- 大于 maximumPoolSize 个 workers(由于调用setMaximumPoolSize)
- 线程池关闭
- 线程池关闭了并且队列为空
- 这个worker超时等待任务,超时的worker在超时等待之前和之后都可能终止(即allowCoreThreadTimeOut || workerCount > corePoolSize),如果队列不是空的,那么这个worker不是池中的最后一个线程。
1 | private Runnable getTask() { |
4.3.1、processWorkerExit(Worker w, boolean completedAbruptly)
为垂死的worker进行清理和bookkeeping。仅从工作线程调用。除非completedAbruptly被设置,否则假定workerCount已经被调整以考虑退出。此方法从工作集中移除线程,如果线程池由于用户任务异常而退出,或者运行的工作池小于corePoolSize,或者队列非空但没有工作池, 则可能终止线程池或替换工作池。
1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { |
4.4、任务提交
提交有两种:
- Executor#execute(Runnable command)
Executor接口提供的方法,在将来的某个时候执行给定的命令.该命令可以在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定。 - ExecutorService#submit(Callable
task)
提交一个value-returning任务以执行,并返回一个表示该任务未决结果的Future。Future的get方法将在成功完成任务后返回任务的结果。
4.5、任务执行
4.5.1、 execute(Runnable command)
任务执行流程图:
三步处理:
- 如果运行的线程小于corePoolSize,则尝试用给定的命令作为第一个任务启动一个新线程。对addWorker的调用原子性地检查runState和workerCount,因此可以通过返回false来防止错误警报,因为错误警报会在不应该添加线程的时候添加线程。
- 如果一个任务可以成功排队,那么我们仍然需要再次检查是否应该添加一个线程 (因为自上次检查以来已有的线程已经死亡),或者池在进入这个方法后关闭。因此,我们重新检查状态,如果必要的话,如果停止,则回滚队列;如果没有,则启动一个新线程。
- 如果无法对任务排队,则尝试添加新线程。 如果它失败了,我们知道pool被关闭或饱和,所以拒绝任务。
1 | public void execute(Runnable command) { |
下面详细看一下上述代码中出现的方法:addWorker(Runnable firstTask, boolean core)。
4.5.1.1、addWorker(Runnable firstTask, boolean core)
检查是否可以根据当前池状态和给定的界限(核心或最大值)添加新worker,如果是这样,worker计数将相应地进行调整,如果可能,将创建并启动一个新worker, 并将运行firstTask作为其第一个任务。 如果池已停止或有资格关闭,则此方法返回false。如果线程工厂在被请求时没有创建线程,则返回false。如果线程创建失败,要么是由于线程工厂返回null,要么是由于异常 (通常是Thread.start()中的OutOfMemoryError)),我们将回滚。
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
再分析回滚工作线程创建逻辑方法:addWorkerFailed(w)。
回滚工作线程创建,如果存在,则从worker中移除worker, 递减ctl的workerCount字段。,重新检查终止,以防这个worker的存在导致终止。
1 | private void addWorkerFailed(Worker w) { |
其中的tryTerminate()方法:
如果是SHUTDOWN或者STOP 且池子为空,转为TERMINATED状态。如果有条件终止,但是workerCount不为零,则中断空闲worker,以确保关机信号传播。必须在任何可能使终止成为可能的操作之后调用此方法–在关机期间减少worker数量或从队列中删除任务。该方法是非私有的,允许从ScheduledThreadPoolExecutor访问。
1 | final void tryTerminate() { |
4.5.1.2、 reject(Runnable command)拒绝策略
为给定的命令调用被拒绝的执行处理程序。
1 | final void reject(Runnable command) { |