隊列在生活中隨處可見,醫院繳費需要排隊、做核酸需要排隊、汽車等紅綠燈需要排隊等等。
佇列是一個按照先來到就排在前面,後來到排在後面的資料結構,並且出隊的時候也是按照先來到先出隊。使用數組和鍊錶進行實作。通常用於協調任務的執行和資料的交換。
LinkedBlockingQueue 是一個可選有界阻塞佇列,有界指的是佇列存在一個最大容量;阻塞指的是如果佇列已經滿了,想要繼續往佇列新增元素的話,那麼這個操作將會被暫停,直到佇列中有空位才會繼續完成新增操作。如果佇列已經為空,想要從佇列中取得元素,那麼這個操作將會被暫停,直接佇列中存在元素才會繼續完成取得操作。
LinkedBlockingQueue 內部使用鍊錶作為元素的儲存結構。內部使用了兩個鎖,分別使用於儲存操作和取操作。
執行存取操作時,都必須先取得鎖,才可以執行存取操作,並保證 LinkedBlockingQueue 是執行緒安全性。
LinkedBlockingQueue 透過兩個 Condition 條件佇列,一個 notFull 條件,一個 notEmpty 條件。在對佇列進行插入元素操作時,判斷目前佇列已經滿,則透過 notFull 條件將執行緒阻塞,直到其他執行緒通知該執行緒佇列可以繼續插入元素。在對佇列進行移除元素操作時,判斷目前佇列已經空,則透過 notEmpty 條件阻塞線程,直到其他執行緒通過該執行緒可以繼續取得元素。
這樣保證執行緒的存取操作不會出現錯誤。避免佇列在滿時,丟棄插入的元素;也避免在佇列空時取到一個 null 值。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
無參考建構子中,預設使用 Integer.MAX_VALUE
作為佇列的最大容量。
有參構造函數中,可以自行指定佇列的最大容量,並且建立了頭節點和尾節點。那麼 LinkedBlockingQueue 使用的是有頭單向鍊錶。
private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; // 取锁 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); // 存锁 private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
並且在物件初始化時,創建了兩個鎖,分別使用於儲存操作和取操作。創建了兩個條件隊列,分別用於隊列空和滿的情況。
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
1.取得鎖定
2.判斷目前佇列是否已經滿了
如果佇列已經滿了,呼叫notFull 條件佇列的await() 方法,將該執行緒阻塞,暫停該執行緒的插入操作。避免內部溢出的問題。
如果沒有滿,則直接呼叫入隊函數 enqueue 插入到佇列結尾。
3.檢查此時佇列是否已滿
如果未滿,則呼叫notFull 條件佇列的signal() 方法,喚醒被阻塞在notFull 條件佇列的線程。
4.解鎖
檢查插入元素前的佇列元素數量是否等於0
等於0,則呼叫notEmpty條件佇列的signal() 方法,通知其佇列現在不為空,可以喚醒阻塞執行緒取得元素。
為什麼需要呼叫 notFull 條件佇列的 signal() 方法? 因為佇列取操作和儲存操作所使用的鎖是不一樣的,那麼就說明,當一個執行緒執行存入操作時,其他執行緒是可以執行取出操作的。我們來看下面這個範例:
佇列總容量為 5,目前元素數量為5。線程 A 獲取了存鎖,想要插入了元素。但是因為佇列容量已滿,釋放鎖定,並且加入到條件佇列中,等待被喚醒。
線程 B 取得了存鎖,想要插入了元素。但是因為佇列容量已滿,釋放鎖定,並且加入到條件佇列中,等待被喚醒。
線程 C 取得了取鎖,取出了元素 1。並且透過 notFull 的 signal 方法喚醒條件佇列中被阻塞的執行緒 A。執行緒 A 被喚醒後加入到同步佇列中,但此時還沒有競爭到鎖定。
線程 D 取得了取鎖,取出了元素 2。但是還沒有執行到喚醒阻塞執行緒的程式碼。
執行緒 A 競爭到鎖定,開始執行插入元素操作。將元素插入之後,檢查到佇列元素數量為 4,小於佇列的總容量,因此執行 notFull 的 signal 方法喚醒條件佇列中被阻塞的執行緒 B。執行緒 B 被喚醒後加入到同步佇列中,開始競爭鎖定。
執行緒 B 競爭到鎖定,開始執行插入元素操作。將元素插入之後,檢查到佇列元素數量等於 5,不進行喚醒操作。
这样做的目的是尽快的唤醒阻塞线程,可以更快的完成插入元素操作。因为线程存和取的操作相互之间并不是互斥的,而是独立运行的,提高吞吐量。
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
1.获得取锁
2.判断当前队列是否为空
如果队列没有元素,调用 notEmpty 条件队列的 await() 方法,将该线程阻塞,暂停该线程的获取操作。避免获取元素出错。
如果不为空,则直接调用出队函数 dequeue 移除队列第一个元素,并返回给客户端。
3.检查此时队列是否为空
如果不为空,则调用 notEmpty 条件队列的 signal() 方法,唤醒被阻塞在 notEmpty 条件队列的线程。
4.释放锁
5.检查获取元素前的队列元素数量是否等于最大容量
等于最大容量,因为此时已经取出一个元素,因此队列处于未满的状态,可以唤醒阻塞在 notFull 条件的线程,让线程继续插入元素。
步骤 3 的目的是尽快的唤醒阻塞线程,可以更快的完成取元素操作。提高吞吐量。可以尝试自己画出流程图。
private void enqueue(Node<E> node) { last = last.next = node; }
入队函数极其简单,只要将最后一个元素的 next 指针指向当前元素即完成了插入操作。
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
我们前面说 LinkedBlockingQueue 使用的是有头链表。头节点只是作为一个标志,实际上并不是一个真正的元素。当获取元素时,将头节点的下一个节点作为头节点,将原来的头节点取消引用,被垃圾回收即可。
LinkedBlockingQueue 和 ArrayBlockingQueue 一样适用于多个线程之间需要共享数据、协调任务执行的场景。因此可以总结出以下几个应用场景:
线程池:线程池是一个常见的并发编程模型,它通过线程池中的线程执行任务。并且可以重复使用这些线程。在线程池中,可以使用 LinkedBlockingQueue 来存储需要执行的任务,以此控制任务数量和执行顺序。当线程池中的线程执行完任务之后,可以从 LinkedBlockingQueue 中取出下一个任务执行。
生产者-消费者:在生产者-消费者模型中,生产者负责生产数据,消费者负责对数据进行处理。在这种模式下,LinkedBlockingQueue 可以作为生产者与消费者之间的数据通道,保证线程安全和数据正确。
Nacos: Nacos 是一个动态服务发现、配置和服务管理平台,它使用 LinkedBlockingQueue 来实现内部的任务队列。
Tomcat:从 Tomcat 7 开始,请求队列默认使用了 LinkedBlockingQueue 实现。
Hystrix: 一个流行的容错框架,其默认使用 LinkedBlockingQueue 作为请求队列。
以上是怎麼掌握Java LinkedBlockingQueue的詳細內容。更多資訊請關注PHP中文網其他相關文章!