首頁 > Java > java教程 > Java 執行緒池框架核心程式碼分析

Java 執行緒池框架核心程式碼分析

伊谢尔伦
發布: 2016-12-05 11:44:59
原創
1314 人瀏覽過

多執行緒程式設計中,為每個任務分配一個執行緒是不切實際的,執行緒所建立的開銷和資源消耗都是很高的。線程池應運而生,成為我們管理線程的利器。 Java 透過Executor接口,提供了一種標準的方法將任務的提交過程和執行過程解耦開來,並用Runnable表示任務。

下面,我們來分析一下 Java 執行緒池框架的實作ThreadPoolExecutor。

下面的分析是基於JDK1.7

生命週期

ThreadPoolExecutor中,使用CAPACITY的高3位來表示運行狀態,分別是:

RUNNING:接收新任務,並且處理任務隊列中的任務隊列中的任務不接收新任務,但處理任務佇列的任務 
STOP:不接收新任務,不出來任務佇列,同時中斷所有進行中的任務 
TIDYING:所有任務已經被終止,工作執行緒數量為0,到達該狀態會執行terminated() 
TERMINATED:terminated()執行完畢 

Java 執行緒池框架核心程式碼分析

ThreadPoolExecutor中用原子類別來表示狀態位元

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
登入後複製
,則值為0) 

maximumPoolSize:最大的執行緒數量,受限於CAPACITY 

keepAliveTime:對應執行緒的存活時間,時間單位由TimeUnit指定 

workQueue:工作佇列,儲存待執行的任務線程池滿後會觸發 

線程池的最大容量:CAPACITY中的前三位用作標誌位,也就是說工作線程的最大容量為(2^29)-1

四種型號

CachedThreadPool:一個可快取的線程池,如果線程池的當前規模超過了處理需求時,那麼將回收空閒的線程,當需求增加時,則可以添加新的線程,線程池的規模不存在任何的限制。 
FixedThreadPool:一個固定大小的執行緒池,提交一個任務時就會建立一個線程,直到達到執行緒池的最大數量,此時執行緒池的大小將不再改變。 
SingleThreadPool:一個單一執行緒的執行緒池,它只有一個工作執行緒來執行任務,可以確保按照任務在佇列中的順序來串列執行,如果這個執行緒異常結束會建立一個新的執行緒來執行任務。 
ScheduledThreadPool:固定大小的執行緒池,並且以延遲或定時的方式來執行任務,類似於Timer。當狀態是否處於RUNNING 

如果否,則拒絕該任務 

如果是,判斷目前執行緒數量是否為0,如果為0,就增加一個工作執行緒。

開啟普通執行緒執行任務addWorker(command, false),開啟失敗就拒絕該任務 

從上面的分析可以總結出執行緒池運作的四個階段:

poolSize < corePoolSize 且佇列為空,此時會新建執行緒來處理提交的任務 
poolSize == corePoolSize,此時提交的任務進入工作隊列,工作執行緒從佇列中取得任務執行,此時佇列不為空且未滿。
poolSize == corePoolSize,且佇列已滿,此時也會新建執行緒來處理提交的任務,但是poolSize < maxPoolSize 

poolSize == maxPoolSize,並且佇列已滿,此時會觸發拒絕策略 

拒絕策略

我們提到任務無法執行會被拒絕,RejectedExecutionHandler是處理被拒絕任務的介面。以下是四種拒絕策略。

AbortPolicy:預設策略,終止任務,拋出RejectedException 
CallerRunsPolicy:在呼叫者執行目前任務,不拋棄異常 
DiscardPolicy:丟棄策略,直接丟棄任務,不丟棄目前任務,不拋異常

執行緒池中的Worker

Worker繼承了AbstractQueuedSynchronizer和Runnable,前者給Worker提供鎖的功能,後者執行工作執行緒的主要方法runWorker(Workerker w)(從任務佇列任務執行)。 Worker 引用存在workers集合裡面,用mainLock守護。

private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
登入後複製

核心函數 runWorker

下面是簡化的邏輯,注意:每個工作執行緒的run都執行下面的函數

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;    while (task != null || (task = getTask()) != null) {
        w.lock();
        beforeExecute(wt, task);        
        task.run();
        afterExecute(task, thrown);
        w.unlock();
    }
    processWorkerExit(w, completedAbruptly);
}
登入後複製

从getTask()中获取任务
锁住 worker
执行beforeExecute(wt, task),这是ThreadPoolExecutor提供给子类的扩展方法
运行任务,如果该worker有配置了首次任务,则先执行首次任务且只执行一次。
执行afterExecute(task, thrown);
解锁 worker
如果获取到的任务为 null,关闭 worker
获取任务 getTask

线程池内部的任务队列是一个阻塞队列,具体实现在构造时传入。

private final BlockingQueue<Runnable> workQueue;
登入後複製

getTask()从任务队列中获取任务,支持阻塞和超时等待任务,四种情况会导致返回null,让worker关闭。

现有的线程数量超过最大线程数量
线程池处于STOP状态
线程池处于SHUTDOWN状态且工作队列为空
线程等待任务超时,且线程数量超过保留线程数量
核心逻辑:根据timed在阻塞队列上超时等待或者阻塞等待任务,等待任务超时会导致工作线程被关闭。

timed = allowCoreThreadTimeOut || wc > corePoolSize;Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();
登入後複製

在以下两种情况下等待任务会超时:

允许核心线程等待超时,即allowCoreThreadTimeOut(true) 
当前线程是普通线程,此时wc > corePoolSize 
工作队列使用的是BlockingQueue,这里就不展开了,后面再写一篇详细的分析。

总结

ThreadPoolExecutor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。 
Executors提供了四种基于ThreadPoolExecutor构造线程池模型的方法,除此之外,我们还可以直接继承ThreadPoolExecutor,重写beforeExecute和afterExecute方法来定制线程池任务执行过程。 
使用有界队列还是无界队列需要根据具体情况考虑,工作队列的大小和线程的数量也是需要好好考虑的。 
拒绝策略推荐使用CallerRunsPolicy,该策略不会抛弃任务,也不会抛出异常,而是将任务回退到调用者线程中执行。


相關標籤:
來源:php.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板