线程池状态

线程池的内部状态由AtomicInteger修饰的ctl表示,其高3位表示线程池的运行状态,低29位表示线程池中的线程数量:
具体看代码注释。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* 主池控制状态ctl是一个原子整数,包含两个概念字段:
* workerCount, indicating the effective number of threads
* workerCount,指示有效线程数
* runState, indicating whether running, shutting down etc
* runState,指示是否运行、关机等
* In order to pack them into one int, we limit workerCount to
* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
* billion) otherwise representable. If this is ever an issue in
* the future, the variable can be changed to be an AtomicLong,
* and the shift/mask constants below adjusted. But until the need
* arises, this code is a bit faster and simpler using an int.
* 为了将它们打包成一个整型,我们将workerCount限制为(2^29)-1(大约5亿)个线程,而不是(2^31)-1(20亿)个线程。
* 如果将来出现这样的问题,可以将变量更改为AtomicLong,并调整下面的shift/mask常量。
* 但是在需要之前,使用int可以使这段代码更快更简单。
* <p>
* The workerCount is the number of workers that have been
* permitted to start and not permitted to stop. The value may be
* transiently different from the actual number of live threads,
* for example when a ThreadFactory fails to create a thread when
* asked, and when exiting threads are still performing
* bookkeeping before terminating. The user-visible pool size is
* reported as the current size of the workers set.
* <p>
* workerCount是允许开始和不允许停止的工作线程数量。
* 该值可能与实际活动线程的数量存在暂时性差异,
* 例如,当ThreadFactory在被请求时无法创建线程,
* 以及退出的线程在终止前仍在执行bookkeeping时。
* 用户可见的池大小报告为工作线程集的当前大小。
* <p>
* The runState provides the main lifecycle control, taking on values:
* runState提供了主要的生命周期控制,具有以下值:
* <p>
* RUNNING: Accept new tasks and process queued tasks
* 接受新任务并处理排队的任务
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* 不接受新任务,而是处理排队的任务
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* 不接受新任务,不处理排队的任务,中断正在进行的任务
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* 所有任务都已终止,workerCount为零,
* 过渡到状态TIDYING的线程将运行terminated()钩子方法
* TERMINATED: terminated() has completed
* terminated()方法执行完毕
* <p>
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
* 为了允许有序比较,这些值之间的数值顺序很重要。
* 运行状态会随着时间单调地增加,但不需要达到每个状态。
* 转换:
* <p>
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* 在调用shutdown()时,可以隐式地在finalize()中调用
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* 调用shutdownNow()
* SHUTDOWN -> TIDYING
* When both queue and pool are empty
* 当队列和池都为空时
* STOP -> TIDYING
* When pool is empty
* 当池是空的时候
* TIDYING -> TERMINATED
* When the terminated() hook method has completed
* 当terminated()钩子方法完成时
* <p>
* Threads waiting in awaitTermination() will return when the
* state reaches TERMINATED.
* 当状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。
* <p>
* Detecting the transition from SHUTDOWN to TIDYING is less
* straightforward than you'd like because the queue may become
* empty after non-empty and vice versa during SHUTDOWN state, but
* we can only terminate if, after seeing that it is empty, we see
* that workerCount is 0 (which sometimes entails a recheck -- see
* below).
* 检测状态由SHUTDOWN到TIDYING并不是你想的那么简单,因为队列在非空之后可能变为空,在SHUTDOWN状态下也可能变为空,
* 但我们只能在看到它是空的和 workerCount is 0 情况下终止。有时需要根据上下文重新检查。
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* Integer.SIZE为32,COUNT_BITS为29
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 29左移一位、减一、等于:2^29-1 最大线程数
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
/**
* 即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
* 111 0 0000 0000 0000 0000 0000 0000 0000
* -1 原码:0000 ... 0001 反码:1111 ... 1110 补码:1111 ... 1111
* 左移操作:后面补 0
* 111 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
* 000 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
* 001 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 即高3位为010,所有任务都已终止,workerCount为零,过渡到状态TIDYING的线程将运行terminated()钩子方法;
* 010 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 即高3位为011,terminated()方法执行完毕;
* 011 0 0000 0000 0000 0000 0000 0000 0000
*/
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl

/**
* 根据ctl计算runState
*
* @param c ctl
* @return runState
*/
private static int runStateOf(int c) {
//2^29 = 001 0 0000 0000 0000 0000 0000 0000 0000
//2^29-1 = 000 1 1111 1111 1111 1111 1111 1111 1111
//~(2^29-1)=111 0 0000 0000 0000 0000 0000 0000 0000
//假设c为 STOP 001 0 0000 0000 0000 0000 0000 0000 0000
// 最终值: 001 0 0000 0000 0000 0000 0000 0000 0000
return c & ~CAPACITY;
}

/**
* 根据ctl计算 workerCount
*
* @param c ctl
* @return workerCount
*/
private static int workerCountOf(int c) {
//2^29-1 = 000 1 1111 1111 1111 1111 1111 1111 1111
//假设c = 000 0 0000 0000 0000 0000 0000 0000 0001 1个线程
//最终值: 000 0 0000 0000 0000 0000 0000 0000 0001 1
return c & CAPACITY;
}

/**
* 根据runState和workerCount计算ctl
*
* @param rs runState
* @param wc workerCount
* @return ctl
*/
private static int ctlOf(int rs, int wc) {
//假设 rs: STOP 001 0 0000 0000 0000 0000 0000 0000 0000
//假设 wc: 000 0 0000 0000 0000 0000 0000 0000 0001 1个线程
//最终值: 001 0 0000 0000 0000 0000 0000 0000 0001
return rs | wc;
}

/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

/**
* RUNNING状态为负数,肯定小于SHUTDOWN
* @param c ctl
* @return 线程池是否为运行状态
*/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

/**
* Attempts to CAS-increment the workerCount field of ctl.
* 试图增加ctl的workerCount字段。
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}

/**
* Attempts to CAS-decrement the workerCount field of ctl.
* 尝试减少ctl的workerCount字段。
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}

/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
* 递减ctl的workerCount字段。这只在线程突然终止时调用(请参阅processWorkerExit)。
* 在getTask中执行其他递减。
*/
private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(ctl.get()));
}

Doug Lea大神的设计啊,感觉计算机的基础真的是数学。
线程池内部状态转换图.png

内部类Worker

Worker继承了AbstractQueuedSynchronizer,并且实现了Runnable接口。
维护了以下三个变量,其中completedTasks由volatile修饰。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Thread this worker is running in. Null if factory fails.
* 线程这个工作程序正在运行。如果工厂失败,则为空。
*/
final Thread thread;
/**
* Initial task to run. Possibly null.
* 要运行的初始任务。可能是null。
*/
Runnable firstTask;
/**
* Per-thread task counter
* 线程任务计数器
*/
volatile long completedTasks;

构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Creates with given first task and thread from ThreadFactory.
* 使用ThreadFactory中给定的第一个任务和线程创建。
*
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
//禁止中断,直到运行工作程序
// inhibit interrupts until runWorker
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

既然实现了Runnable接口,必然实现run方法:

1
2
3
4
5
6
7
/**
* Delegates main run loop to outer runWorker
* 将主运行循环委托给外部运行工人
*/
public void run() {
runWorker(this);
}
runWorker(Worker w)执行任务

runWorker.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
* 主工作程序运行循环。重复地从队列获取任务并执行它们,同时处理一些问题
* <p>
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
* 我们可能从一个初始任务开始,在这种情况下,我们不需要获得第一个任务。
* 否则,只要池在运行,我们就从getTask获得任务。
* 如果返回null,则工作人员将由于更改池状态或配置参数而退出。
* 其他出口由外部代码中的异常引发,在这种情况下completedAbruptly保持,
* 这通常导致processWorkerExit替换这个线程。
* <p>
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
* 在运行任何任务之前,都要获取锁,以防止在执行任务时发生其他池中断,
* 然后确保除非池停止,否则这个线程没有中断集。
* <p>
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
* 每个任务运行之前都有一个对beforeExecute的调用,这可能会引发异常,
* 在这种情况下,我们在不处理任务的情况下导致线程死亡(终止循环completedAbruptly true)。
* <p>
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
* 假设beforeExecute正常完成,我们运行任务,收集它抛出的任何异常发送给afterExecute。
* 我们分别处理RuntimeException、Error(规范保证我们捕获了这两个错误)和任意可抛出对象。
* 因为我们不能在Runnable.run中重新抛出抛出的对象,所以我们在抛出时将它们封装在错误中(到线程的UncaughtExceptionHandler)。
* 任何抛出的异常也会保守地导致线程死亡。
* <p>
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
* task.run完成后,我们调用afterExecute,这也可能引发异常,这也会导致线程死亡。
* 根据JLS Sec 14.20,即使task.run抛出,这个异常也将生效。
* <p>
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
* 异常机制的净效果是,afterExecute和线程的UncaughtExceptionHandler
* 拥有关于用户代码遇到的任何问题的尽可能准确的信息。
*
* @param w the worker
*/
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
//获取第一个任务
Runnable task = w.firstTask;
//第一个任务位置置空
w.firstTask = null;
//因为Worker实现了AQS,此处是释放锁
//new Worker()是state==-1,此处是调用Worker类的 release(1)方法,将state置为0。
// Worker中interruptIfStarted()中只有state>=0才允许调用中断
// allow interrupts
w.unlock();
//是否突然完成,如果是由于异常导致的进入finally,那么completedAbruptly==true就是突然完成的
boolean completedAbruptly = true;
try {
//先处理firstTask,之后依次处理其他任务
while (task != null || (task = getTask()) != null) {
//获取锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
//如果池停止,确保线程被中断;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
//如果没有,请确保线程没有中断。
// 这需要在第二种情况下重新检查,以处理清除中断时的shutdownNow竞争
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
//自定义实现
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
//自定义实现
afterExecute(task, thrown);
}
} finally {
task = null;
//任务完成数+1
w.completedTasks++;
//释放锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
//Worker的结束后的处理工作
processWorkerExit(w, completedAbruptly);
}
}
getTask()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 根据当前配置设置执行阻塞或定时等待任务,
* 或者如果该worker因为任何原因必须退出,则返回null
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 大于 maximumPoolSize 个 workers(由于调用setMaximumPoolSize)
* 2. The pool is stopped.
* 线程池关闭
* 3. The pool is shutdown and the queue is empty.
* 线程池关闭了并且队列为空
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
* 这个worker超时等待任务,超时的worker在超时等待之前和之后都可能终止
* (即,{@code allowCoreThreadTimeOut || workerCount > corePoolSize}),
* 如果队列不是空的,那么这个worker不是池中的最后一个线程。
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented 如果worker必须退出,则为空,在这种情况下workerCount将递减
*/
private Runnable getTask() {
// Did the last poll() time out?
boolean timedOut = false;

for (; ; ) {
//获取线程池状态
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
//仅在必要时检查队列是否为空。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//递减ctl的workerCount字段
decrementWorkerCount();
return null;
}

//获取workerCount数量
int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

//线程超时控制
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
//尝试减少ctl的workerCount字段
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//如果有超时控制,则使用带超时时间的poll
//否则使用take,没有任务的时候一直阻塞
//这两个方法都会抛出InterruptedException
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//有任务就返回
if (r != null)
return r;
//获取任务超时,肯定是走了poll逻辑
timedOut = true;
} catch (InterruptedException retry) {
//被中断
timedOut = false;
}
}
}
processWorkerExit(Worker w, boolean completedAbruptly)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* Performs cleanup and bookkeeping for a dying worker. Called
* only from worker threads. Unless completedAbruptly is set,
* assumes that workerCount has already been adjusted to account
* for exit. This method removes thread from worker set, and
* possibly terminates the pool or replaces the worker if either
* it exited due to user task exception or if fewer than
* corePoolSize workers are running or queue is non-empty but
* there are no workers.
* 为垂死的worker进行清理和bookkeeping。仅从工作线程调用。
* 除非completedAbruptly被设置,否则假定workerCount已经被调整以考虑退出。
* 此方法从工作集中移除线程,如果线程池由于用户任务异常而退出,
* 或者运行的工作池小于corePoolSize,或者队列非空但没有工作池,
* 则可能终止线程池或替换工作池
*
* @param w the worker
* @param completedAbruptly if the worker died due to user exception
* 是否突然完成,死于异常
*/
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// If abrupt, then workerCount wasn't adjusted
// true:用户线程运行异常,需要扣减
// false:getTask方法中扣减线程数量
if (completedAbruptly)
//递减ctl的workerCount字段。
decrementWorkerCount();

//获取主锁,锁定
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//更新完成任务计数器
completedTaskCount += w.completedTasks;
//移除worker
workers.remove(w);
} finally {
//解锁
mainLock.unlock();
}

// 有worker线程移除,可能是最后一个线程退出需要尝试终止线程池
tryTerminate();

int c = ctl.get();
// 如果线程为running或shutdown状态,即tryTerminate()没有成功终止线程池,则判断是否有必要一个worker
if (runStateLessThan(c, STOP)) {
// 正常退出,计算min:需要维护的最小线程数量
if (!completedAbruptly) {
// allowCoreThreadTimeOut 默认false:是否需要维持核心线程的数量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min ==0 或者workerQueue为空,min = 1
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 如果线程数量大于最少数量min,直接返回,不需要新增线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 添加一个没有firstTask的worker
addWorker(null, false);
}
}

任务提交

Executor.execute(Runnable command)

Executor接口提供的方法,在将来的某个时候执行给定的命令.该命令可以在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定。

1
2
3
4
5
6
7
8
9
10
11
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
* 在将来的某个时候执行给定的命令.该命令可以在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
ExecutorService.submit(Callable task)

提交一个value-returning任务以执行,并返回一个表示该任务未决结果的Future。Future的{@code get}方法将在成功完成任务后返回任务的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Submits a value-returning task for execution and returns a
* Future representing the pending results of the task. The
* Future's {@code get} method will return the task's result upon
* successful completion.
* 提交一个value-returning任务以执行,并返回一个表示该任务未决结果的Future。
* Future的{@code get}方法将在成功完成任务后返回任务的结果。
* <p>
* If you would like to immediately block waiting
* for a task, you can use constructions of the form
* {@code result = exec.submit(aCallable).get();}
* 如果希望立即阻塞等待任务,可以使用如下的结构
* {@code result = exec.submit(aCallable).get();}
*
* <p>Note: The {@link Executors} class includes a set of methods
* that can convert some other common closure-like objects,
* for example, {@link java.security.PrivilegedAction} to
* {@link Callable} form so they can be submitted.
*
* @param task the task to submit
* @param <T> the type of the task's result
* @return a Future representing pending completion of the task
* @throws RejectedExecutionException if the task cannot be
* scheduled for execution
* @throws NullPointerException if the task is null
*/
<T> Future<T> submit(Callable<T> task);

任务执行

execute(Runnable command) 任务执行

execute.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
* <p>
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
//任务为空,抛出异常
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 三步处理:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 如果运行的线程小于corePoolSize,则尝试用给定的命令作为第一个任务启动一个新线程。
* 对addWorker的调用原子性地检查runState和workerCount,因此可以通过返回false来防止错误警报,
* 因为错误警报会在不应该添加线程的时候添加线程。
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 如果一个任务可以成功排队,那么我们仍然需要再次检查是否应该添加一个线程
* (因为自上次检查以来已有的线程已经死亡),或者池在进入这个方法后关闭。
* 因此,我们重新检查状态,如果必要的话,如果停止,则回滚队列;
* 如果没有,则启动一个新线程。
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
* 如果无法对任务排队,则尝试添加新线程。
* 如果它失败了,我们知道pool被关闭或饱和,所以拒绝任务。
*/
//获取线程控制字段的值
int c = ctl.get();
//如果当前工作线程数量少于corePoolSize(核心线程数)
if (workerCountOf(c) < corePoolSize) {
//创建新的线程并执行任务,如果成功就返回
if (addWorker(command, true))
return;
//上一步失败,重新获取ctl
c = ctl.get();
}
//如果线城池正在运行,且入队成功
if (isRunning(c) && workQueue.offer(command)) {
//重新获取ctl
int recheck = ctl.get();
//如果线程没有运行且删除任务成功
if (!isRunning(recheck) && remove(command))
//拒绝任务
reject(command);
//如果当前的工作线程数量为0,只要还有活动的worker线程,就可以消费workerQueue中的任务
else if (workerCountOf(recheck) == 0)
//第一个参数为null,说明只为新建一个worker线程,没有指定firstTask
addWorker(null, false);

//如果线程池不是running状态 或者 无法入队列
//尝试开启新线程,扩容至maxPoolSize,如果addWork(command, false)失败了,拒绝当前command
} else if (!addWorker(command, false))
reject(command);
}

addWorker(Runnable firstTask, boolean core)创建线程,执行任务

addWorker.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
* 检查是否可以根据当前池状态和给定的界限(核心或最大值)添加新worker,
* 如果是这样,worker计数将相应地进行调整,如果可能,将创建并启动一个新worker,
* 并将运行firstTask作为其第一个任务。
* 如果池已停止或有资格关闭,则此方法返回false。
* 如果线程工厂在被请求时没有创建线程,则返回false。
* 如果线程创建失败,要么是由于线程工厂返回null,要么是由于异常
* (通常是Thread.start()中的OutOfMemoryError)),我们将干净地回滚。
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
* 新线程应该首先运行的任务(如果没有,则为null)
* 当线程数少于corePoolSize线程时(在方法execute()中),
* 或者当队列已满时(在这种情况下,我们必须绕过队列),
* 使用初始的第一个任务创建worker(在方法execute()中)来绕过队列。
* 最初,空闲线程通常是通过prestartCoreThread创建的,或者用来替换其他垂死的worker。
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* 如果为true,则用corePoolSize绑定,如果是false,则用maximumPoolSize。
* (这里使用布尔指示符而不是值来确保在检查其他池状态后读取新值)
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
//好久没见过这种写法了
retry:
//线程池状态与工作线程数量处理
//worker数量+1
for (; ; ) {
//获取当前线程池状态与线程数
int c = ctl.get();
//获取当前线程池状态
int rs = runStateOf(c);

// Check if queue empty only if necessary. 仅在必要时检查队列是否为空。
//如果池子处于SHUTDOWN,STOP,TIDYING,TERMINATED的时候 不处理提交的任务
//判断线程池是否可以添加worker线程
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
//线程池处于工作状态
for (; ; ) {
//获取工作线程数量
int wc = workerCountOf(c);

//如果线程数量超过最大值或者超过corePoolSize或者超过maximumPoolSize 拒绝执行任务
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;

//试图增加ctl的workerCount字段
if (compareAndIncrementWorkerCount(c))
//中断外层循环
break retry;
// Re-read ctl
c = ctl.get();
//如果当前线程池状态已经改变
if (runStateOf(c) != rs)
//继续外层循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
//否则CAS因workerCount更改而失败;重试内循环
}
}

//添加到worker线程集合,并启动线程
//工作线程状态
boolean workerStarted = false;
boolean workerAdded = false;
//继承AQS并实现了Runnable接口
Worker w = null;
try {
//将任务封装
w = new Worker(firstTask);
//获取当前线程
final Thread t = w.thread;
if (t != null) {
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
//全局锁定
mainLock.lock();
try {
// Recheck while holding lock.持锁时重新检查。
// Back out on ThreadFactory failure or if
// shut down before lock acquired.退出ThreadFactory故障,或者在获取锁之前关闭。
int rs = runStateOf(ctl.get());

//如果当前线程池关闭了
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
// precheck that t is startable
//测试该线程是否活动。如果线程已经启动并且还没有死,那么它就是活的。
if (t.isAlive())
throw new IllegalThreadStateException();
//入工作线程池
workers.add(w);
int s = workers.size();
//跟踪最大的池大小
if (s > largestPoolSize)
largestPoolSize = s;
//状态
workerAdded = true;
}
} finally {
//释放锁
mainLock.unlock();
}
//如果工作线程加入成功,开始线程的执行,并设置状态
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//判断工作线程是否启动成功
if (!workerStarted)
//回滚工作线程创建
addWorkerFailed(w);
}
//返回工作线程状态
return workerStarted;
}

addWorkerFailed(Worker w)回滚工作线程创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* Rolls back the worker thread creation.
* 回滚工作线程创建
* - removes worker from workers, if present
* 如果存在,则从worker中移除worker
* - decrements worker count
* 递减ctl的workerCount字段。
* - rechecks for termination, in case the existence of this
* worker was holding up termination
* 重新检查终止,以防这个worker的存在导致终止
*/
private void addWorkerFailed(Worker w) {
//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//如果存在,则从worker中移除worker
if (w != null)
workers.remove(w);
//递减ctl的workerCount字段。
decrementWorkerCount();
//重新检查终止
tryTerminate();
} finally {
mainLock.unlock();
}
}
tryTerminate()重新检查终止
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* Transitions to TERMINATED state if either (SHUTDOWN and pool
* and queue empty) or (STOP and pool empty). If otherwise
* eligible to terminate but workerCount is nonzero, interrupts an
* idle worker to ensure that shutdown signals propagate. This
* method must be called following any action that might make
* termination possible -- reducing worker count or removing tasks
* from the queue during shutdown. The method is non-private to
* allow access from ScheduledThreadPoolExecutor.
* 如果是SHUTDOWN或者STOP 且池子为空,转为TERMINATED状态。
* 如果有条件终止,但是workerCount不为零,则中断空闲worker,以确保关机信号传播。
* 必须在任何可能使终止成为可能的操作之后调用此方法--在关机期间减少worker数量或从队列中删除任务。
* 该方法是非私有的,允许从ScheduledThreadPoolExecutor访问。
*/
final void tryTerminate() {
for (; ; ) {
int c = ctl.get();
//如果线程池处于运行中,或者阻塞队列中仍有任务,返回
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;

// Eligible to terminate,还有工作线程
if (workerCountOf(c) != 0) {
//中断空闲工作线程
interruptIdleWorkers(ONLY_ONE);
return;
}

//获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//设置ctl状态TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//方法在执行程序终止时调用,默认什么都不执行
terminated();
} finally {
//完成terminated()方法,状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒所有等待条件的节点
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* {@code super.terminated} within this method.
* 方法在执行程序终止时调用
*/
protected void terminated() {
}
interruptIdleWorkers(boolean onlyOne)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

/**
* Interrupts threads that might be waiting for tasks (as
* indicated by not being locked) so they can check for
* termination or configuration changes. Ignores
* SecurityExceptions (in which case some threads may remain
* uninterrupted).
* 中断可能正在等待任务的线程(没有被锁住)这样他们就可以检查终止或配置更改。
* 忽略securityexception(在这种情况下,一些线程可能保持不间断)。
*
* @param onlyOne If true, interrupt at most one worker. This is
* called only from tryTerminate when termination is otherwise
* enabled but there are still other workers. In this case, at
* most one waiting worker is interrupted to propagate shutdown
* signals in case all threads are currently waiting.
* Interrupting any arbitrary thread ensures that newly arriving
* workers since shutdown began will also eventually exit.
* To guarantee eventual termination, it suffices to always
* interrupt only one idle worker, but shutdown() interrupts all
* idle workers so that redundant workers exit promptly, not
* waiting for a straggler task to finish.
* If true, 最多打断一个worker。
* 只有在以其他方式启用终止时,才从tryTerminate调用这个函数,但是仍然有其他worker。
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

reject(Runnable command)拒绝策略

1
2
3
4
5
6
7
8
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
* 为给定的命令调用被拒绝的执行处理程序
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}