病院代を支払うために並ぶ、核酸検査をするために並ぶ、車の信号待ちに並ぶなど、行列は生活のどこにでも見られます。 、など。
キューとは、先着順が前、後順となるデータ構造です。キューから取り出すときも、キューは先着順にランク付けされます。配列とリンク リストを使用して実装されます。通常、タスクの実行とデータ交換を調整するために使用されます。
LinkedBlockingQueue は、オプションの境界付きブロッキング キューです。境界付きとは、キューの容量が最大であることを意味します。ブロックとは、キューがいっぱいの場合にキューへの追加を続行することを意味します。の場合、この操作は一時停止され、キューにスペースができるまで追加操作は続行されません。キューがすでに空で、キューから要素を取得したい場合、キューに要素ができるまで操作は一時停止され、取得操作が続行されます。
LinkedBlockingQueueは内部的に要素の記憶構造としてリンクリストを使用します。 2 つのロックが内部で、それぞれ保管操作と取得操作に使用されます。
アクセス操作を実行する場合、LinkedBlockingQueue がスレッドセーフであることを保証するために、アクセス操作を実行する前に、まずロックを取得する必要があります。
LinkedBlockingQueue は、notFull 条件と notEmpty 条件の 2 つの条件条件キューを渡します。キューに要素を挿入するときに、現在のキューがいっぱいであると判断された場合、他のスレッドがキューが要素の挿入を続行できることをスレッドに通知するまで、スレッドは notFull 条件によってブロックされます。キューから要素を削除するときに、現在のキューが空であると判断された場合、他のスレッドがこのスレッドを通じて要素を取得し続けることができるまで、スレッドは notEmpty 条件によってブロックされます。
これにより、スレッドのアクセス操作でエラーが発生しないことが保証されます。キューがいっぱいのときに挿入された要素を破棄しないでください。また、キューが空のときに null 値を取得することも避けてください。
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
パラメータなしのコンストラクターでは、Integer.MAX_VALUE
がキューの最大容量としてデフォルトで使用されます。
パラメータ化されたコンストラクターでは、キューの最大容量を自分で指定し、ヘッド ノードとテール ノードを作成できます。次に、LinkedBlockingQueue は 見出し付き一方向リンク リスト を使用します。
private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; // 取锁 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); // 存锁 private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
オブジェクトが初期化されると、2 つのロックが作成され、それぞれストレージ操作とフェッチ操作に使用されます。空のキュー条件と満杯のキュー条件にそれぞれ対応する 2 つの条件付きキューが作成されます。
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
1. ロックを取得します
2. 現在のキューがいっぱいかどうかを確認します
キューが満杯かどうかを確認しますがいっぱいの場合は、notFull 条件キューの await() メソッドを呼び出してスレッドをブロックし、スレッドの挿入操作を一時停止します。内部オーバーフローの問題を回避します。
満杯でない場合は、enqueue 関数 enqueue を直接呼び出してキューの最後に挿入します。
3. 現時点でキューが満杯かどうかを確認します
満杯でない場合は、notFull 条件キューの signal() メソッドを呼び出して、キューをウェイクアップします。キューが notFull 条件キュー スレッドでブロックされました。
4.Unlock
要素を挿入する前にキュー要素の数が 0
に等しいかどうかを確認します。条件付きキューの signal() メソッドは、そのキューが現在空ではないことを通知し、ブロックされたスレッドを起動して要素を取得できます。
notFull 条件キューの signal() メソッドを呼び出す必要があるのはなぜですか? キューのフェッチ操作とストレージ操作に使用されるロックは異なるため、1 つのスレッドがデポジット操作を実行すると、他のスレッドがフェッチ操作を実行できることを意味します。次の例を見てみましょう:
キューの合計容量は 5 で、現在の要素数は 5 です。スレッド A はロックを取得し、要素を挿入しようとしています。しかし、キューの容量がいっぱいであるため、ロックは解放されて条件キューに追加され、起動されるのを待ちます。
スレッド B はロックを取得し、要素を挿入しようとしています。しかし、キューの容量がいっぱいであるため、ロックは解放されて条件キューに追加され、起動されるのを待ちます。
スレッド C がロックを取得し、要素 1 を取り出しました。そして、notFull のシグナルメソッドを通じて、条件キュー内のブロックされたスレッド A をウェイクアップします。スレッド A は目覚めた後に同期キューに参加しますが、この時点ではまだロックを競合していません。
スレッド D がロックを取得し、要素 2 を取り出しました。ただし、ブロックされたスレッドを起動するコードはまだ実行されていません。
スレッド A はロックを競合し、要素の挿入を開始します。要素の挿入後、キュー要素の数がキューの総容量より少ない 4 つであることが確認されるため、notFull のシグナル メソッドが実行され、条件付きキューでブロックされているスレッド B がウェイクアップされます。スレッド B が起動されると、同期キューに参加し、ロックの競合を開始します。
スレッド B はロックを競合し、要素の挿入を開始します。要素の挿入後、キュー要素の数が 5 であることが確認され、ウェイクアップ操作は実行されません。
这样做的目的是尽快的唤醒阻塞线程,可以更快的完成插入元素操作。因为线程存和取的操作相互之间并不是互斥的,而是独立运行的,提高吞吐量。
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
1.获得取锁
2.判断当前队列是否为空
如果队列没有元素,调用 notEmpty 条件队列的 await() 方法,将该线程阻塞,暂停该线程的获取操作。避免获取元素出错。
如果不为空,则直接调用出队函数 dequeue 移除队列第一个元素,并返回给客户端。
3.检查此时队列是否为空
如果不为空,则调用 notEmpty 条件队列的 signal() 方法,唤醒被阻塞在 notEmpty 条件队列的线程。
4.释放锁
5.检查获取元素前的队列元素数量是否等于最大容量
等于最大容量,因为此时已经取出一个元素,因此队列处于未满的状态,可以唤醒阻塞在 notFull 条件的线程,让线程继续插入元素。
步骤 3 的目的是尽快的唤醒阻塞线程,可以更快的完成取元素操作。提高吞吐量。可以尝试自己画出流程图。
private void enqueue(Node<E> node) { last = last.next = node; }
入队函数极其简单,只要将最后一个元素的 next 指针指向当前元素即完成了插入操作。
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
我们前面说 LinkedBlockingQueue 使用的是有头链表。头节点只是作为一个标志,实际上并不是一个真正的元素。当获取元素时,将头节点的下一个节点作为头节点,将原来的头节点取消引用,被垃圾回收即可。
LinkedBlockingQueue 和 ArrayBlockingQueue 一样适用于多个线程之间需要共享数据、协调任务执行的场景。因此可以总结出以下几个应用场景:
线程池:线程池是一个常见的并发编程模型,它通过线程池中的线程执行任务。并且可以重复使用这些线程。在线程池中,可以使用 LinkedBlockingQueue 来存储需要执行的任务,以此控制任务数量和执行顺序。当线程池中的线程执行完任务之后,可以从 LinkedBlockingQueue 中取出下一个任务执行。
生产者-消费者:在生产者-消费者模型中,生产者负责生产数据,消费者负责对数据进行处理。在这种模式下,LinkedBlockingQueue 可以作为生产者与消费者之间的数据通道,保证线程安全和数据正确。
Nacos: Nacos 是一个动态服务发现、配置和服务管理平台,它使用 LinkedBlockingQueue 来实现内部的任务队列。
Tomcat:从 Tomcat 7 开始,请求队列默认使用了 LinkedBlockingQueue 实现。
Hystrix: 一个流行的容错框架,其默认使用 LinkedBlockingQueue 作为请求队列。
以上がJava LinkedBlockingQueue をマスターする方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。