/** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command 可执行的任务 * @throws RejectedExecutionException 任务可能被拒绝(当Executor处理不了的时候) * @throws NullPointerException if command is null */ voidexecute(Runnable command); }
// 提交任务 publicvoidexecute(Runnable command){ if (command == null) thrownew NullPointerException(); /* * 没翻,懒得翻 * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 当前状态值 int c = ctl.get(); // 当前线程数 = workerCountOf(c) 小于 核心线程数 的上限时 // 直接创建一个线程来执行任务 if (workerCountOf(c) < corePoolSize) { // 并发提交场景下可能会失败 if (addWorker(command, true)) return; // 新增成功就可以结束了 // 失败就更新下线程池状态 c = ctl.get(); } // 不能创建核心线程来执行,并不会直接创建非核心线程,而是把任务暂存到阻塞队列 // isRunning(c)判断线程池是否还在运行 // workQueue.offer(command)返回值表示是否成功提交到队列 if (isRunning(c) && workQueue.offer(command)) { // 成功放到队列里了,再检查一下线程池状态 int recheck = ctl.get(); // 如果线程池已经没有运行了,则尝试把新增的任务从队列移除 // remove(command)返回值表示是否移除成功 if (! isRunning(recheck) && remove(command)) reject(command); // 移除成功后,执行拒绝策略 // 检查下当前线程数是否为0,如果是的话新建一个线程 elseif (workerCountOf(recheck) == 0) addWorker(null, false); } // 线程池没有运行,或者放入队列失败(比如队列已满) // 则创建非核心线程去执行任务,这也失败就只能拒绝了 elseif (!addWorker(command, false)) reject(command); }
// 也是Runable的一种实现,所以能在线程池中被执行 publicvoidrun(){ // 有个表示状态的标识 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 执行用户的逻辑,获得返回值 // 这个步骤可能需要点时间 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
// 获取执行结果,阻塞直到状态改变 public V get()throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } }