/** * The main pool control state, ctl, is an atomic integer packing * two conceptual fields * 主池控制状态ctl是一个原子整数,包含两个概念字段: * workerCount, indicating the effective number of threads * workerCount,指示有效线程数 * runState, indicating whether running, shutting down etc * runState,指示是否运行、关机等 * In order to pack them into one int, we limit workerCount to * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 * billion) otherwise representable. If this is ever an issue in * the future, the variable can be changed to be an AtomicLong, * and the shift/mask constants below adjusted. But until the need * arises, this code is a bit faster and simpler using an int. * 为了将它们打包成一个整型,我们将workerCount限制为(2^29)-1(大约5亿)个线程,而不是(2^31)-1(20亿)个线程。 * 如果将来出现这样的问题,可以将变量更改为AtomicLong,并调整下面的shift/mask常量。 * 但是在需要之前,使用int可以使这段代码更快更简单。 * <p> * The workerCount is the number of workers that have been * permitted to start and not permitted to stop. The value may be * transiently different from the actual number of live threads, * for example when a ThreadFactory fails to create a thread when * asked, and when exiting threads are still performing * bookkeeping before terminating. The user-visible pool size is * reported as the current size of the workers set. * <p> * workerCount是允许开始和不允许停止的工作线程数量。 * 该值可能与实际活动线程的数量存在暂时性差异, * 例如,当ThreadFactory在被请求时无法创建线程, * 以及退出的线程在终止前仍在执行bookkeeping时。 * 用户可见的池大小报告为工作线程集的当前大小。 * <p> * The runState provides the main lifecycle control, taking on values: * runState提供了主要的生命周期控制,具有以下值: * <p> * RUNNING: Accept new tasks and process queued tasks * 接受新任务并处理排队的任务 * SHUTDOWN: Don't accept new tasks, but process queued tasks * 不接受新任务,而是处理排队的任务 * STOP: Don't accept new tasks, don't process queued tasks, * and interrupt in-progress tasks * 不接受新任务,不处理排队的任务,中断正在进行的任务 * TIDYING: All tasks have terminated, workerCount is zero, * the thread transitioning to state TIDYING * will run the terminated() hook method * 所有任务都已终止,workerCount为零, * 过渡到状态TIDYING的线程将运行terminated()钩子方法 * TERMINATED: terminated() has completed * terminated()方法执行完毕 * <p> * The numerical order among these values matters, to allow * ordered comparisons. The runState monotonically increases over * time, but need not hit each state. The transitions are: * 为了允许有序比较,这些值之间的数值顺序很重要。 * 运行状态会随着时间单调地增加,但不需要达到每个状态。 * 转换: * <p> * RUNNING -> SHUTDOWN * On invocation of shutdown(), perhaps implicitly in finalize() * 在调用shutdown()时,可以隐式地在finalize()中调用 * (RUNNING or SHUTDOWN) -> STOP * On invocation of shutdownNow() * 调用shutdownNow() * SHUTDOWN -> TIDYING * When both queue and pool are empty * 当队列和池都为空时 * STOP -> TIDYING * When pool is empty * 当池是空的时候 * TIDYING -> TERMINATED * When the terminated() hook method has completed * 当terminated()钩子方法完成时 * <p> * Threads waiting in awaitTermination() will return when the * state reaches TERMINATED. * 当状态达到TERMINATED时,在awaitTermination()中等待的线程将返回。 * <p> * Detecting the transition from SHUTDOWN to TIDYING is less * straightforward than you'd like because the queue may become * empty after non-empty and vice versa during SHUTDOWN state, but * we can only terminate if, after seeing that it is empty, we see * that workerCount is 0 (which sometimes entails a recheck -- see * below). * 检测状态由SHUTDOWN到TIDYING并不是你想的那么简单,因为队列在非空之后可能变为空,在SHUTDOWN状态下也可能变为空, * 但我们只能在看到它是空的和 workerCount is 0 情况下终止。有时需要根据上下文重新检查。 */ privatefinalAtomicIntegerctl=newAtomicInteger(ctlOf(RUNNING, 0)); /** * Integer.SIZE为32,COUNT_BITS为29 */ privatestaticfinalintCOUNT_BITS= Integer.SIZE - 3; /** * 29左移一位、减一、等于:2^29-1 最大线程数 */ privatestaticfinalintCAPACITY= (1 << COUNT_BITS) - 1;
/* * Bit field accessors that don't require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */ privatestaticbooleanrunStateLessThan(int c, int s) { return c < s; }
privatestaticbooleanrunStateAtLeast(int c, int s) { return c >= s; }
/** * RUNNING状态为负数,肯定小于SHUTDOWN * @param c ctl * @return 线程池是否为运行状态 */ privatestaticbooleanisRunning(int c) { return c < SHUTDOWN; }
/** * Attempts to CAS-increment the workerCount field of ctl. * 试图增加ctl的workerCount字段。 */ privatebooleancompareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); }
/** * Attempts to CAS-decrement the workerCount field of ctl. * 尝试减少ctl的workerCount字段。 */ privatebooleancompareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); }
/** * Decrements the workerCount field of ctl. This is called only on * abrupt termination of a thread (see processWorkerExit). Other * decrements are performed within getTask. * 递减ctl的workerCount字段。这只在线程突然终止时调用(请参阅processWorkerExit)。 * 在getTask中执行其他递减。 */ privatevoiddecrementWorkerCount() { do { } while (!compareAndDecrementWorkerCount(ctl.get())); }
/** * Thread this worker is running in. Null if factory fails. * 线程这个工作程序正在运行。如果工厂失败,则为空。 */ final Thread thread; /** * Initial task to run. Possibly null. * 要运行的初始任务。可能是null。 */ Runnable firstTask; /** * Per-thread task counter * 线程任务计数器 */ volatilelong completedTasks;
构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13
/** * Creates with given first task and thread from ThreadFactory. * 使用ThreadFactory中给定的第一个任务和线程创建。 * * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { //禁止中断,直到运行工作程序 // inhibit interrupts until runWorker setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
既然实现了Runnable接口,必然实现run方法:
1 2 3 4 5 6 7
/** * Delegates main run loop to outer runWorker * 将主运行循环委托给外部运行工人 */ publicvoidrun() { runWorker(this); }
/** * Main worker run loop. Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * 主工作程序运行循环。重复地从队列获取任务并执行它们,同时处理一些问题 * <p> * 1. We may start out with an initial task, in which case we * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * 我们可能从一个初始任务开始,在这种情况下,我们不需要获得第一个任务。 * 否则,只要池在运行,我们就从getTask获得任务。 * 如果返回null,则工作人员将由于更改池状态或配置参数而退出。 * 其他出口由外部代码中的异常引发,在这种情况下completedAbruptly保持, * 这通常导致processWorkerExit替换这个线程。 * <p> * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * 在运行任何任务之前,都要获取锁,以防止在执行任务时发生其他池中断, * 然后确保除非池停止,否则这个线程没有中断集。 * <p> * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * 每个任务运行之前都有一个对beforeExecute的调用,这可能会引发异常, * 在这种情况下,我们在不处理任务的情况下导致线程死亡(终止循环completedAbruptly true)。 * <p> * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread's * UncaughtExceptionHandler). Any thrown exception also * conservatively causes thread to die. * 假设beforeExecute正常完成,我们运行任务,收集它抛出的任何异常发送给afterExecute。 * 我们分别处理RuntimeException、Error(规范保证我们捕获了这两个错误)和任意可抛出对象。 * 因为我们不能在Runnable.run中重新抛出抛出的对象,所以我们在抛出时将它们封装在错误中(到线程的UncaughtExceptionHandler)。 * 任何抛出的异常也会保守地导致线程死亡。 * <p> * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * task.run完成后,我们调用afterExecute,这也可能引发异常,这也会导致线程死亡。 * 根据JLS Sec 14.20,即使task.run抛出,这个异常也将生效。 * <p> * The net effect of the exception mechanics is that afterExecute * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * 异常机制的净效果是,afterExecute和线程的UncaughtExceptionHandler * 拥有关于用户代码遇到的任何问题的尽可能准确的信息。 * * @param w the worker */ finalvoidrunWorker(Worker w) { //获取当前线程 Threadwt= Thread.currentThread(); //获取第一个任务 Runnabletask= w.firstTask; //第一个任务位置置空 w.firstTask = null; //因为Worker实现了AQS,此处是释放锁 //new Worker()是state==-1,此处是调用Worker类的 release(1)方法,将state置为0。 // Worker中interruptIfStarted()中只有state>=0才允许调用中断 // allow interrupts w.unlock(); //是否突然完成,如果是由于异常导致的进入finally,那么completedAbruptly==true就是突然完成的 booleancompletedAbruptly=true; try { //先处理firstTask,之后依次处理其他任务 while (task != null || (task = getTask()) != null) { //获取锁 w.lock(); // If pool is stopping, ensure thread is interrupted; //如果池停止,确保线程被中断; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt //如果没有,请确保线程没有中断。 // 这需要在第二种情况下重新检查,以处理清除中断时的shutdownNow竞争 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //自定义实现 beforeExecute(wt, task); Throwablethrown=null; try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; thrownewError(x); } finally { //自定义实现 afterExecute(task, thrown); } } finally { task = null; //任务完成数+1 w.completedTasks++; //释放锁 w.unlock(); } } completedAbruptly = false; } finally { //Worker的结束后的处理工作 processWorkerExit(w, completedAbruptly); } }
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 根据当前配置设置执行阻塞或定时等待任务, * 或者如果该worker因为任何原因必须退出,则返回null * 1. There are more than maximumPoolSize workers (due to * a call to setMaximumPoolSize). * 大于 maximumPoolSize 个 workers(由于调用setMaximumPoolSize) * 2. The pool is stopped. * 线程池关闭 * 3. The pool is shutdown and the queue is empty. * 线程池关闭了并且队列为空 * 4. This worker timed out waiting for a task, and timed-out * workers are subject to termination (that is, * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) * both before and after the timed wait, and if the queue is * non-empty, this worker is not the last thread in the pool. * 这个worker超时等待任务,超时的worker在超时等待之前和之后都可能终止 * (即,{@code allowCoreThreadTimeOut || workerCount > corePoolSize}), * 如果队列不是空的,那么这个worker不是池中的最后一个线程。 * * @return task, or null if the worker must exit, in which case * workerCount is decremented 如果worker必须退出,则为空,在这种情况下workerCount将递减 */ private Runnable getTask() { // Did the last poll() time out? booleantimedOut=false;
for (; ; ) { //获取线程池状态 intc= ctl.get(); intrs= runStateOf(c);
// Check if queue empty only if necessary. //仅在必要时检查队列是否为空。 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //递减ctl的workerCount字段 decrementWorkerCount(); returnnull; }
//获取workerCount数量 intwc= workerCountOf(c);
// Are workers subject to culling? booleantimed= allowCoreThreadTimeOut || wc > corePoolSize;
/** * Performs cleanup and bookkeeping for a dying worker. Called * only from worker threads. Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit. This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers. * 为垂死的worker进行清理和bookkeeping。仅从工作线程调用。 * 除非completedAbruptly被设置,否则假定workerCount已经被调整以考虑退出。 * 此方法从工作集中移除线程,如果线程池由于用户任务异常而退出, * 或者运行的工作池小于corePoolSize,或者队列非空但没有工作池, * 则可能终止线程池或替换工作池 * * @param w the worker * @param completedAbruptly if the worker died due to user exception * 是否突然完成,死于异常 */ privatevoidprocessWorkerExit(Worker w, boolean completedAbruptly) { // If abrupt, then workerCount wasn't adjusted // true:用户线程运行异常,需要扣减 // false:getTask方法中扣减线程数量 if (completedAbruptly) //递减ctl的workerCount字段。 decrementWorkerCount();
/** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * 在将来的某个时候执行给定的命令.该命令可以在新线程、池化线程或调用线程中执行,具体由Executor的实现者决定 * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */ voidexecute(Runnable command);
/** * Submits a value-returning task for execution and returns a * Future representing the pending results of the task. The * Future's {@code get} method will return the task's result upon * successful completion. * 提交一个value-returning任务以执行,并返回一个表示该任务未决结果的Future。 * Future的{@code get}方法将在成功完成任务后返回任务的结果。 * <p> * If you would like to immediately block waiting * for a task, you can use constructions of the form * {@code result = exec.submit(aCallable).get();} * 如果希望立即阻塞等待任务,可以使用如下的结构 * {@code result = exec.submit(aCallable).get();} * * <p>Note: The {@link Executors} class includes a set of methods * that can convert some other common closure-like objects, * for example, {@link java.security.PrivilegedAction} to * {@link Callable} form so they can be submitted. * * @param task the task to submit * @param <T> the type of the task's result * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ <T> Future<T> submit(Callable<T> task);
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * <p> * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ publicvoidexecute(Runnable command) { //任务为空,抛出异常 if (command == null) thrownewNullPointerException(); /* * Proceed in 3 steps: * 三步处理: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * 如果运行的线程小于corePoolSize,则尝试用给定的命令作为第一个任务启动一个新线程。 * 对addWorker的调用原子性地检查runState和workerCount,因此可以通过返回false来防止错误警报, * 因为错误警报会在不应该添加线程的时候添加线程。 * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * 如果一个任务可以成功排队,那么我们仍然需要再次检查是否应该添加一个线程 * (因为自上次检查以来已有的线程已经死亡),或者池在进入这个方法后关闭。 * 因此,我们重新检查状态,如果必要的话,如果停止,则回滚队列; * 如果没有,则启动一个新线程。 * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. * 如果无法对任务排队,则尝试添加新线程。 * 如果它失败了,我们知道pool被关闭或饱和,所以拒绝任务。 */ //获取线程控制字段的值 intc= ctl.get(); //如果当前工作线程数量少于corePoolSize(核心线程数) if (workerCountOf(c) < corePoolSize) { //创建新的线程并执行任务,如果成功就返回 if (addWorker(command, true)) return; //上一步失败,重新获取ctl c = ctl.get(); } //如果线城池正在运行,且入队成功 if (isRunning(c) && workQueue.offer(command)) { //重新获取ctl intrecheck= ctl.get(); //如果线程没有运行且删除任务成功 if (!isRunning(recheck) && remove(command)) //拒绝任务 reject(command); //如果当前的工作线程数量为0,只要还有活动的worker线程,就可以消费workerQueue中的任务 elseif (workerCountOf(recheck) == 0) //第一个参数为null,说明只为新建一个worker线程,没有指定firstTask addWorker(null, false);
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread.start()), we roll back cleanly. * 检查是否可以根据当前池状态和给定的界限(核心或最大值)添加新worker, * 如果是这样,worker计数将相应地进行调整,如果可能,将创建并启动一个新worker, * 并将运行firstTask作为其第一个任务。 * 如果池已停止或有资格关闭,则此方法返回false。 * 如果线程工厂在被请求时没有创建线程,则返回false。 * 如果线程创建失败,要么是由于线程工厂返回null,要么是由于异常 * (通常是Thread.start()中的OutOfMemoryError)),我们将干净地回滚。 * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * 新线程应该首先运行的任务(如果没有,则为null) * 当线程数少于corePoolSize线程时(在方法execute()中), * 或者当队列已满时(在这种情况下,我们必须绕过队列), * 使用初始的第一个任务创建worker(在方法execute()中)来绕过队列。 * 最初,空闲线程通常是通过prestartCoreThread创建的,或者用来替换其他垂死的worker。 * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * 如果为true,则用corePoolSize绑定,如果是false,则用maximumPoolSize。 * (这里使用布尔指示符而不是值来确保在检查其他池状态后读取新值) * @return true if successful */ privatebooleanaddWorker(Runnable firstTask, boolean core) { //好久没见过这种写法了 retry: //线程池状态与工作线程数量处理 //worker数量+1 for (; ; ) { //获取当前线程池状态与线程数 intc= ctl.get(); //获取当前线程池状态 intrs= runStateOf(c);
// Check if queue empty only if necessary. 仅在必要时检查队列是否为空。 //如果池子处于SHUTDOWN,STOP,TIDYING,TERMINATED的时候 不处理提交的任务 //判断线程池是否可以添加worker线程 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) returnfalse; //线程池处于工作状态 for (; ; ) { //获取工作线程数量 intwc= workerCountOf(c);
//试图增加ctl的workerCount字段 if (compareAndIncrementWorkerCount(c)) //中断外层循环 break retry; // Re-read ctl c = ctl.get(); //如果当前线程池状态已经改变 if (runStateOf(c) != rs) //继续外层循环 continue retry; // else CAS failed due to workerCount change; retry inner loop //否则CAS因workerCount更改而失败;重试内循环 } }
//添加到worker线程集合,并启动线程 //工作线程状态 booleanworkerStarted=false; booleanworkerAdded=false; //继承AQS并实现了Runnable接口 Workerw=null; try { //将任务封装 w = newWorker(firstTask); //获取当前线程 finalThreadt= w.thread; if (t != null) { //获取全局锁 finalReentrantLockmainLock=this.mainLock; //全局锁定 mainLock.lock(); try { // Recheck while holding lock.持锁时重新检查。 // Back out on ThreadFactory failure or if // shut down before lock acquired.退出ThreadFactory故障,或者在获取锁之前关闭。 intrs= runStateOf(ctl.get());
//如果当前线程池关闭了 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // precheck that t is startable //测试该线程是否活动。如果线程已经启动并且还没有死,那么它就是活的。 if (t.isAlive()) thrownewIllegalThreadStateException(); //入工作线程池 workers.add(w); ints= workers.size(); //跟踪最大的池大小 if (s > largestPoolSize) largestPoolSize = s; //状态 workerAdded = true; } } finally { //释放锁 mainLock.unlock(); } //如果工作线程加入成功,开始线程的执行,并设置状态 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //判断工作线程是否启动成功 if (!workerStarted) //回滚工作线程创建 addWorkerFailed(w); } //返回工作线程状态 return workerStarted; }
/** * Rolls back the worker thread creation. * 回滚工作线程创建 * - removes worker from workers, if present * 如果存在,则从worker中移除worker * - decrements worker count * 递减ctl的workerCount字段。 * - rechecks for termination, in case the existence of this * worker was holding up termination * 重新检查终止,以防这个worker的存在导致终止 */ privatevoidaddWorkerFailed(Worker w) { //获取全局锁 finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { //如果存在,则从worker中移除worker if (w != null) workers.remove(w); //递减ctl的workerCount字段。 decrementWorkerCount(); //重新检查终止 tryTerminate(); } finally { mainLock.unlock(); } }
/** * Transitions to TERMINATED state if either (SHUTDOWN and pool * and queue empty) or (STOP and pool empty). If otherwise * eligible to terminate but workerCount is nonzero, interrupts an * idle worker to ensure that shutdown signals propagate. This * method must be called following any action that might make * termination possible -- reducing worker count or removing tasks * from the queue during shutdown. The method is non-private to * allow access from ScheduledThreadPoolExecutor. * 如果是SHUTDOWN或者STOP 且池子为空,转为TERMINATED状态。 * 如果有条件终止,但是workerCount不为零,则中断空闲worker,以确保关机信号传播。 * 必须在任何可能使终止成为可能的操作之后调用此方法--在关机期间减少worker数量或从队列中删除任务。 * 该方法是非私有的,允许从ScheduledThreadPoolExecutor访问。 */ finalvoidtryTerminate() { for (; ; ) { intc= ctl.get(); //如果线程池处于运行中,或者阻塞队列中仍有任务,返回 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) return;
// Eligible to terminate,还有工作线程 if (workerCountOf(c) != 0) { //中断空闲工作线程 interruptIdleWorkers(ONLY_ONE); return; }
//获取全局锁 finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { //设置ctl状态TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //方法在执行程序终止时调用,默认什么都不执行 terminated(); } finally { //完成terminated()方法,状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //唤醒所有等待条件的节点 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } } /** * Method invoked when the Executor has terminated. Default * implementation does nothing. Note: To properly nest multiple * overridings, subclasses should generally invoke * {@code super.terminated} within this method. * 方法在执行程序终止时调用 */ protectedvoidterminated() { }
/** * Interrupts threads that might be waiting for tasks (as * indicated by not being locked) so they can check for * termination or configuration changes. Ignores * SecurityExceptions (in which case some threads may remain * uninterrupted). * 中断可能正在等待任务的线程(没有被锁住)这样他们就可以检查终止或配置更改。 * 忽略securityexception(在这种情况下,一些线程可能保持不间断)。 * * @param onlyOne If true, interrupt at most one worker. This is * called only from tryTerminate when termination is otherwise * enabled but there are still other workers. In this case, at * most one waiting worker is interrupted to propagate shutdown * signals in case all threads are currently waiting. * Interrupting any arbitrary thread ensures that newly arriving * workers since shutdown began will also eventually exit. * To guarantee eventual termination, it suffices to always * interrupt only one idle worker, but shutdown() interrupts all * idle workers so that redundant workers exit promptly, not * waiting for a straggler task to finish. * If true, 最多打断一个worker。 * 只有在以其他方式启用终止时,才从tryTerminate调用这个函数,但是仍然有其他worker。 */ privatevoidinterruptIdleWorkers(boolean onlyOne) { finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Threadt= w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
reject(Runnable command)拒绝策略
1 2 3 4 5 6 7 8
/** * Invokes the rejected execution handler for the given command. * Package-protected for use by ScheduledThreadPoolExecutor. * 为给定的命令调用被拒绝的执行处理程序 */ finalvoidreject(Runnable command) { handler.rejectedExecution(command, this); }