LinkedBlockingQueue也是使用單向鍊錶實作的,其也有兩個Node,分別用來存放首、尾節點,並且還有一個初始值為0的原子變數count,用來記錄佇列元素個數。另外還有兩個ReentrantLock的實例,分別用來控制元素入隊和出隊的原子性,其中takeLock用來控制同時只有一個線程可以從隊列頭獲取元素,其他線程必須等待,putLock控制同時只能有一個執行緒可以取得鎖,在佇列尾部加入元素,其他執行緒必須等待。另外,notEmpty 和 notFull 是條件變量,它們內部都有一個條件隊列用來存放進隊和出隊時被阻塞的線程,其實這是生產者-消費者模型。如下是獨佔鎖的建立程式碼。
private final AtomicInteger count = new AtomicInteger(); /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
當呼叫執行緒在LinkedBlockingQueue 實例上執行take、poll 等操作時需要取得到 takeLock 鎖,從而保證同時只有一個執行緒可以操作鍊錶頭節點。另外由於條件變數 notEmpty 內部的條件佇列的維護使用的是takeLock的鎖定狀態管理機制,所以在呼叫notEmpty的await 和signal方法前呼叫執行緒必須先取得到 takeLock鎖定,否則會拋出IllegalMonitorStateException 例外。 notEmpty內部則維護一個條件佇列,當執行緒取得到takeLock 鎖定後呼叫notEmpty的await 方法時,呼叫執行緒會被阻塞,然後該執行緒會被放到notEmpty內部的條件佇列進行等待,直到有執行緒呼叫了notEmpty的signal 方法。
在LinkedBlockingQueue實例上執行put、offer等作業時需要取得到putLock鎖定,從而保證同時只有一個執行緒可以操作鍊錶尾節點。同樣由於條件變數 notFull 內部的條件佇列的維護使用的是putLock的鎖定狀態管理機制,所以在呼叫 notFull 的 await 和 signal 方法前呼叫執行緒必須先取得到putLock鎖定,否則會拋出 IllegalMonitorStateException 例外。 notFull 內部則維護一個條件佇列,當執行緒取得到putLock 鎖定後呼叫notFull的await 方法時,呼叫執行緒會被阻塞,然後該執行緒會被放到notFull 內部的條件佇列進行等待,直到有執行緒呼叫了notFull的signal 方法。如下是LinkedBlockingQueue 的無參建構子的程式碼。
如下是LinkedBlockingQueue的無參構造程式碼
public static final int MAX_VALUE = 0x7fffffff; public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalAgrumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
由這個程式碼可知,預設佇列容量為0x7fffffff,使用者也可以自己指定容量,所以從一定程度上可以說LinkedBlockingQueue是有界阻塞佇列。
offer運算
public boolean offer(E e) { //(1) if (e == null) throw new NullPointerException(); //(2) final AtomicInteger count = this.count; if (count.get() == capacity) return false; //(3) int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { //(4) if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); //(5) if (c + 1 < capacity) notFull.signal(); } } finally { //(6) putLock.unlock(); } //(7) if (c == 0) signalNotEmpty(); //(8) return c >= 0; }
程式碼(2)判斷如果目前佇列已滿則丟棄目前元素並傳回false
程式碼(3)取得到putLock 鎖,目前執行緒取得到該鎖後,則其他呼叫put和offer操的執行緒將會被阻塞(阻塞的執行緒被放到putLock鎖的AQS阻塞佇列)。
程式碼(4)這裡重新判斷目前佇列是否滿,這是因為在執行程式碼(2)和取得到 putLock 鎖定期間可能其他執行緒透過 put 或offer 操作向佇列裡面新增了新元素。重新判斯隊列確實不滿則新元素入隊,並遞增計數器。
程式碼(5)判斷如果新元素入隊後佇列還有空閒空間,則喚醒notFull的條件佇列裡面因為呼叫了notFull的await操作(例如執行put方法而佇列滿了的時候)而被阻塞的一個線程,因為隊列現在有空閒所以這裡可以提前喚醒一個入隊線程。
程式碼(6)則釋放所取得的putLock 鎖,這裡要注意,鎖的釋放一定要在finally裡面做因為即使try區塊拋出異常了,finally也是會被執行到。另外釋放鎖定後其他因為呼叫put 操作而被阻塞的執行緒將會有一個取得到該鎖定。
程式碼(7)中的c0說明在執行程式碼(6)釋放鎖定時佇列裡面至少有一個元素,佇列裡面有元素則執行signalNotEmpty操作.
以上是如何使用Java並發程式設計中的LinkedBlockingQueue佇列?的詳細內容。更多資訊請關注PHP中文網其他相關文章!