スレッドプールの作成については前の記事で分析しましたが、スレッドプールにはプリセットテンプレートと柔軟なカスタマイズをサポートするためのさまざまなパラメータの両方があることがわかりました。
この記事では、スレッド プールのライフ サイクルに焦点を当て、スレッド プール実行タスクのプロセスを分析します。
まず、スレッド プール コードで実行される 2 つのパラメーターを理解します。
runState: スレッド プールの実行ステータス
workerCount: ワーカー スレッドの数
スレッド プールは 32 ビット int を使用します。 runState と workerCount を同時に保存します。上位 3 ビットが runState で、残りの 29 ビットが workerCount です。 RunStateOf と workerCountOf は、runState と workerCount を取得するためにコード内で繰り返し使用されます。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池状态 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING: 新しいタスクを受信でき、待機キュー内のタスクは実行できます。
SHUTDOWN: 新しいタスクを受信できず、待機キュー内のタスクは実行できます。
STOP: 新しいタスクを受信できません。待機キュー内のタスクは実行できません。実行中のタスクをすべて終了してください
TIDYING: すべてのタスクが終了しました。terminated() を実行します
TERMINATED:terminated() の実行が完了しました
スレッドプールのステータスが開始されます。デフォルトでは RUNNING から TERMINATED ステータスで終了します。各状態を中間で通過する必要はありませんが、状態を戻すことはできません。状態変更の可能なパスと条件は次のとおりです。
図 1 スレッド プールの状態変更パス
スレッド プールは、Worker クラスによるタスクの実行を担当します。 Java の同時実行につながります。 フレームワークの核となるのは AQS です。
// ctl操作 private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
AQS についてはここでは説明しません。ワーカーがスレッドをラップし、タスクを実行することだけを知っておく必要があります。
execute を呼び出すと、スレッド プールの状況に基づいてワーカーが作成されます。次の 4 つの状況を要約できます:
図 2 スレッド プール内のワーカーの 4 つの可能性
AbstractQueuedSynchronizer,简称AQS,是Java并发包里一系列同步工具的基础实现,原理是根据状态位来控制线程的入队阻塞、出队唤醒来处理同步。
マーク 1 は最初の状況に対応します。この状況では、core に渡す addWorker に注意してください。core=true は corePoolSize、core=false は MaximumPoolSize です。追加するときは、workerCount が許容最大値を超えているかどうかを確認する必要があります。
マーク 2 は 2 番目の状況に対応し、スレッド プールが実行されているかどうかを確認し、タスクを待機キューに追加します。マーク 3 は、スレッド プールのステータスを再度チェックします。スレッド プールが突然非実行状態になった場合は、待機キューに追加されたばかりのタスクを削除し、処理のために RejectedExecutionHandler に渡します。マーク 4 はワーカーが存在しないことを検出したため、最初に空のタスクを持つワーカーを追加します。
マーク 5 は 3 番目の状況に対応しており、待機キューにこれ以上タスクを追加することはできません。
マーク 6 は、addWorker のコアが false で渡され、戻り呼び出しが失敗します。これは、workerCount が MaximumPoolSize を超えていることを意味するため、処理のために RejectedExecutionHandler に渡されます。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //1 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //2 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //3 reject(command); else if (workerCountOf(recheck) == 0) //4 addWorker(null, false); } //5 else if (!addWorker(command, false)) //6 reject(command); }
1 とマークされたコードの最初の部分には、workerCount に 1 を追加するという単純な目的があります。なぜコードの作成にこれほど時間がかかったのかというと、スレッド プールのステータスが常に変化しており、並行環境では変数の同期を確保する必要があるためです。外側のループはスレッド プールのステータスを決定し、タスクは空ではなく、キューも空ではありません。内側のループは CAS メカニズムを使用して、workerCount が正しくインクリメントされるようにします。 CAS を理解していない場合は、その後の workerCount の増減に CAS が使用されるノンブロッキング同期メカニズムについて学ぶことができます。
2 とマークされた 2 番目のコードは比較的単純です。新しい Worker オブジェクトを作成し、Worker をワーカーに追加します (コレクションを設定)。追加が成功したら、ワーカーでスレッドを開始します。最後に、スレッドの起動が成功したかどうかを判断し、失敗した場合は、addWorkerFailed を直接呼び出します。
private boolean addWorker(Runnable firstTask, boolean core) { //1 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //2 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
addWorkerFailed は、すでにインクリメントされている workerCount を減らし、tryTerminate を呼び出してスレッド プールを終了します。
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
Workerはコンストラクタ内でThreadFactoryを使ってThreadを作成し、runメソッド内でrunWorkerを呼び出し、実際にタスクが実行される場所と思われます。
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); }
マーク 1 はループに入り、null が返されるまで getTask から実行するタスクを取得します。ここではスレッドの再利用の効果が得られ、スレッドが複数のタスクを処理できるようになります。
マーク 2 はより複雑な判断で、STOP 状態ではスレッド プールが中断され、非 STOP 状態ではスレッドが中断されないことを保証します。 Java の割り込みメカニズムがわからない場合は、Java スレッドを正しく終了する方法についてのこの記事をお読みください。
Mark 3 は run メソッドを呼び出し、実際にタスクを実行します。 beforeExecute と afterExecute の 2 つのメソッドが実行前と実行後に提供され、サブクラスによって実装されます。
マーク 4 の completedTasks は、ワーカーが実行したタスクの数をカウントし、最終的に completedTaskCount 変数に蓄積されます。対応するメソッドを呼び出して統計情報を返すことができます。
変数 completedAbruptly は、ワーカーが異常終了したかどうかを示します。これは、後続のメソッドがこの変数を必要とすることを意味します。
Mark 6 は processWorkerExit を呼び出して終了します。これは後で分析されます。
次に、待機キューからタスクを取得するワーカーの getTask メソッドを見てみましょう:
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); //1 // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); //2 // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } //3 try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
标记1检查线程池的状态,这里就体现出SHUTDOWN和STOP的区别。如果线程池是SHUTDOWN状态,还会先处理完等待队列的任务;如果是STOP状态,就不再处理等待队列里的任务了。
标记2先看allowCoreThreadTimeOut这个变量,false时worker空闲,也不会结束;true时,如果worker空闲超过keepAliveTime,就会结束。接着是一个很复杂的判断,好难转成文字描述,自己看吧。注意一下wc>maximumPoolSize,出现这种可能是在运行中调用setMaximumPoolSize,还有wc>1,在等待队列非空时,至少保留一个worker。
标记3是从等待队列取任务的逻辑,根据timed分为等待keepAliveTime或者阻塞直到有任务。
最后来看结束worker需要执行的操作:
private void processWorkerExit(Worker w, boolean completedAbruptly) { //1 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); //2 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //3 tryTerminate(); int c = ctl.get(); //4 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
正常情况下,在getTask里就会将workerCount减一。标记1处用变量completedAbruptly判断worker是否异常退出,如果是,需要补充对workerCount的减一。
标记2将worker处理任务的数量累加到总数,并且在集合workers中去除。
标记3尝试终止线程池,后续会研究。
标记4处理线程池还是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker。如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
总结一下worker:线程池启动后,worker在池内创建,包装了提交的Runnable任务并执行,执行完就等待下一个任务,不再需要时就结束。
线程池的关闭不是一关了事,worker在池里处于不同状态,必须安排好worker的”后事”,才能真正释放线程池。ThreadPoolExecutor提供两种方法关闭线程池:
shutdown:不能再提交任务,已经提交的任务可继续运行;
shutdownNow:不能再提交任务,已经提交但未执行的任务不能运行,在运行的任务可继续运行,但会被中断,返回已经提交但未执行的任务。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //1 安全策略机制 advanceRunState(SHUTDOWN); //2 interruptIdleWorkers(); //3 onShutdown(); //4 空方法,子类实现 } finally { mainLock.unlock(); } tryTerminate(); //5 }
shutdown将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); //1 } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
shutdownNow和shutdown类似,将线程池切换为STOP状态,中断目标是所有worker。drainQueue会将等待队列里未执行的任务返回。
interruptIdleWorkers和interruptWorkers实现原理都是遍历workers集合,中断条件符合的worker。
上面的代码多次出现调用tryTerminate,这是一个尝试将线程池切换到TERMINATED状态的方法。
final void tryTerminate() { for (;;) { int c = ctl.get(); //1 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //2 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } //3 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
标记1检查线程池状态,下面几种情况,后续操作都没有必要,直接return。
RUNNING(还在运行,不能停)
TIDYING或TERMINATED(已经没有在运行的worker)
SHUTDOWN并且等待队列非空(执行完才能停)
标记2在worker非空的情况下又调用了interruptIdleWorkers,你可能疑惑在shutdown时已经调用过了,为什么又调用,而且每次只中断一个空闲worker?你需要知道,shutdown时worker可能在执行中,执行完阻塞在队列的take,不知道要结束,所有要补充调用interruptIdleWorkers。每次只中断一个是因为processWorkerExit时,还会执行tryTerminate,自动中断下一个空闲的worker。
标记3是最终的状态切换。线程池会先进入TIDYING状态,再进入TERMINATED状态,中间提供了terminated这个空方法供子类实现。
调用关闭线程池方法后,需要等待线程池切换到TERMINATED状态。awaitTermination检查限定时间内线程池是否进入TERMINATED状态,代码如下:
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }
以上就是Java 线程池执行原理分析 的内容,更多相关内容请关注PHP中文网(m.sbmmt.com)!