首頁 > Java > java教程 > 主體

支援生產阻塞的Java執行緒池

高洛峰
發布: 2017-02-07 14:52:37
原創
1355 人瀏覽過

通常來說,生產任務的速度大於消費的速度。一個細節問題是,佇列長度,以及如何匹配生產和消費的速度。

一個典型的生產者-消費者模型如下:

支援生產阻塞的Java執行緒池

在並發環境下利用J.U.C提供的Queue實作可以很方便地確保生產和消費過程中的執行緒安全。這裡要注意的是,Queue必須設定初始容量,防止生產者生產過快導致佇列長度暴漲,最終觸發OutOfMemory。


對於一般的生產快於消費的情況。當隊列已滿時,我們並不希望有任何任務被忽略或無法執行,此時生產者可以等待片刻再提交任務,更好的做法是,把生產者阻塞在提交任務的方法上,待佇列未滿時繼續提交任務,這樣就沒有浪費的空轉時間了。阻塞這一點也很容易,BlockingQueue就是為此打造的,ArrayBlockingQueue和LinkedBlockingQueue在建構時都可以提供容量做限制,其中LinkedBlockingQueue是在實際操作佇列時在每次拿到鎖以後判斷容量。

更進一步,當隊列為空時,消費者拿不到任務,可以等一會兒再拿,更好的做法是,用BlockingQueue的take方法,阻塞等待,當有任務時便可以立即獲得執行,建議呼叫take的帶超時參數的重載方法,超時後執行緒退出。這樣當生產者事實上已經停止生產時,不至於讓消費者無限等待。

於是一個高效率的支援阻塞的生產消費模型就實現了。

等一下,既然J.U.C已經幫我們實作了執行緒池,為什麼還要採用這套東西?直接用ExecutorService不是比較方便?

我們來看一下ThreadPoolExecutor的基本結構:

支援生產阻塞的Java執行緒池

可以看到,在ThreadPoolExecutor中,BlockingQueue和Consumer部分已經幫我們實現好了,並且直接採用線程池的實現還有很多優勢,例如線程數的動態調整等。


但問題在於,即便你在構造ThreadPoolExecutor時手動指定了一個BlockingQueue作為隊列實現,事實上當隊列滿時,execute方法並不會阻塞,原因在於ThreadPoolExecutor調用的是BlockingQueue非阻塞的方法:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}
登入後複製

這時候就需要做一些事情來達成一個結果:當生產者提交任務,而隊列已滿時,能夠讓生產者阻塞住,等待任務被消費。

關鍵在於,在並發環境下,佇列滿不能由生產者去判斷,不能呼叫ThreadPoolExecutor.getQueue().size()來判斷佇列是否滿。

執行緒池的實作中,當佇列滿時會呼叫建構時傳入的RejectedExecutionHandler去拒絕任務的處理。預設的實作是AbortPolicy,直接拋出一個RejectedExecutionException。

幾種拒絕策略在這裡就不贅述了,這裡和我們的需求比較接近的是CallerRunsPolicy,這種策略會在隊列滿時,讓提交任務的線程去執行任務,相當於讓生產者臨時去乾了消費者幹的活兒,這樣生產者雖然沒有被阻塞,但提交任務也會被暫停。

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a <tt>CallerRunsPolicy</tt>.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller&#39;s thread, unless the executor
     * has been shut down, in which case the task is discarded.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
登入後複製

但這種策略也有隱患,當生產者較少時,生產者消費任務的時間裡,消費者可能已經把任務都消費完了,隊列處於空狀態,當生產者執行完任務後才能再繼續生產任務,這個過程中可能導致消費者執行緒的飢餓。

參考類似的思路,最簡單的做法,我們可以直接定義一個RejectedExecutionHandler,當隊列滿時改為調用BlockingQueue.put來實現生產者的阻塞:

new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (!executor.isShutdown()) {
                        try {
                                executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                                // should not be interrupted
                        }
                }
        }
};
登入後複製

這樣,我們就無需再關心Queue和Consumer的邏輯,只要把精力集中在生產者和消費者執行緒的實作邏輯上,只管往執行緒池提交任務就行了。

相比最初的設計,這種方式的程式碼量能減少不少,而且能避免並發環境的許多問題。當然,你也可以採取另外的手段,例如在提交時採用信號量做入口限制等,但是如果僅僅是要讓生產者阻塞,那就顯得複雜了。

更多支援生產阻塞的Java線程池相關文章請關注PHP中文網!

相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!