Warteschlangen gibt es überall im Leben. Man muss sich anstellen, um im Krankenhaus zu bezahlen, man muss anstehen, um Nukleinsäuretests durchzuführen, man muss an der Ampel anstehen, um Autos zu bekommen usw.
Die Warteschlange ist eine Datenstruktur, in der die Warteschlange „Wer zuerst kommt, mahlt zuerst“ vorne und die Warteschlange „Wer zuerst kommt, mahlt zuerst“ hinten platziert ist. Implementiert mithilfe von Arrays und verknüpften Listen. Wird normalerweise zur Koordinierung der Aufgabenausführung und des Datenaustauschs verwendet.
LinkedBlockingQueue ist eine optionale begrenzte Blockierungswarteschlange. Blockieren bedeutet, dass dieser Vorgang angehalten wird, wenn die Warteschlange voll ist und Sie weiterhin Elemente zur Warteschlange hinzufügen möchten Der Additionsvorgang wird erst fortgesetzt, wenn Platz in der Warteschlange vorhanden ist. Wenn die Warteschlange bereits leer ist und Sie ein Element aus der Warteschlange abrufen möchten, wird der Vorgang angehalten, bis Elemente in der Warteschlange vorhanden sind, um den Erfassungsvorgang fortzusetzen.
LinkedBlockingQueue verwendet intern eine verknüpfte Liste als Speicherstruktur von Elementen. Intern werden zwei Schlösser für Einlagerungs- und Auslagerungsvorgänge verwendet.
Beim Durchführen von Zugriffsvorgängen muss zuerst die Sperre erworben werden, bevor der Zugriffsvorgang ausgeführt werden kann, um sicherzustellen, dass LinkedBlockingQueue threadsicher ist.
LinkedBlockingQueue übergibt zwei Condition-Bedingungswarteschlangen, eine notFull-Bedingung und eine notEmpty-Bedingung. Wenn beim Einfügen von Elementen in die Warteschlange festgestellt wird, dass die aktuelle Warteschlange voll ist, wird der Thread durch die NotFull-Bedingung blockiert, bis andere Threads den Thread darüber informieren, dass die Warteschlange weiterhin Elemente einfügen kann. Wenn beim Entfernen von Elementen aus der Warteschlange festgestellt wird, dass die aktuelle Warteschlange leer ist, wird der Thread durch die notEmpty-Bedingung blockiert, bis andere Threads weiterhin Elemente über diesen Thread abrufen können.
Dadurch wird sichergestellt, dass es bei den Zugriffsvorgängen des Threads zu keinen Fehlern kommt. Vermeiden Sie das Verwerfen eingefügter Elemente, wenn die Warteschlange voll ist. Vermeiden Sie auch das Erhalten eines Nullwerts, wenn die Warteschlange leer ist.
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
Im Parameterlosen Konstruktor wird Integer.MAX_VALUE
standardmäßig als maximale Kapazität der Warteschlange verwendet.
Im parametrisierten Konstruktor können Sie die maximale Kapazität der Warteschlange selbst angeben und den Kopfknoten und den Endknoten erstellen. Dann verwendet LinkedBlockingQueue eine einseitig verknüpfte Liste mit Kopf.
private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; // 取锁 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); // 存锁 private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
Und wenn das Objekt initialisiert wird, werden zwei Sperren erstellt, die für Speichervorgänge bzw. Abrufvorgänge verwendet werden. Es werden zwei bedingte Warteschlangen erstellt, jeweils für die Bedingungen einer leeren und vollen Warteschlange.
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
1. Ermitteln Sie die Sperre
2. Bestimmen Sie, ob die aktuelle Warteschlange voll ist. Rufen Sie die Methode „await()“ der bedingten Warteschlange „notFull“ auf, um den Thread zu blockieren und anzuhalten der Fadeneinführvorgang. Vermeiden Sie interne Überlaufprobleme.
Wenn es nicht voll ist, rufen Sie direkt die
enqueue-Funktion enqueue3. Überprüfen Sie, ob die Warteschlange zu diesem Zeitpunkt voll ist.
4. Entsperren
Überprüfen Sie, ob die Anzahl der Warteschlangenelemente vor dem Einfügen des Elements gleich 0 ist Die Warteschlange ist jetzt nicht leer und Sie können den blockierenden Thread aktivieren, um das Element abzurufen.
Warum müssen wir die signal()-Methode der notFull-Bedingungswarteschlange aufrufen?
Da die für Warteschlangenabrufvorgänge und Speichervorgänge verwendeten Sperren unterschiedlich sind, bedeutet dies, dass andere Threads Abrufvorgänge ausführen können, wenn ein Thread einen Einzahlungsvorgang ausführt. Schauen wir uns das folgende Beispiel an:Die Gesamtkapazität der Warteschlange beträgt 5 und die aktuelle Anzahl der Elemente beträgt 5. Thread A erhält die Sperre und möchte das Element einfügen. Da die Warteschlangenkapazität jedoch voll ist, wird die Sperre aufgehoben und zur Bedingungswarteschlange hinzugefügt, die darauf wartet, geweckt zu werden.
Thread B erhält die Sperre und möchte das Element einfügen. Da die Warteschlangenkapazität jedoch voll ist, wird die Sperre aufgehoben und zur Bedingungswarteschlange hinzugefügt, die darauf wartet, geweckt zu werden.
Thread C hat die Sperre erhalten und Element 1 entfernt. Und wecken Sie den blockierten Thread A in der Bedingungswarteschlange über die Signalmethode von notFull auf. Thread A tritt der Synchronisationswarteschlange bei, nachdem er aufgeweckt wurde, hat aber zu diesem Zeitpunkt noch nicht um die Sperre konkurriert.
Thread D hat die Sperre erhalten und Element 2 herausgenommen. Der Code, der den blockierten Thread aufweckt, wurde jedoch noch nicht ausgeführt.
Thread A konkurriert um die Sperre und beginnt mit dem Einfügen von Elementen. Nach dem Einfügen des Elements wird überprüft, ob die Anzahl der Warteschlangenelemente 4 beträgt, was weniger als die Gesamtkapazität der Warteschlange ist. Daher wird die Signalmethode von notFull ausgeführt, um Thread B aufzuwecken, der in der bedingten Warteschlange blockiert ist. Nachdem Thread B aktiviert wurde, tritt er der Synchronisationswarteschlange bei und beginnt, um die Sperre zu konkurrieren.
Thread B konkurriert um die Sperre und beginnt mit dem Einfügen von Elementen. Nach dem Einfügen des Elements wird überprüft, ob die Anzahl der Warteschlangenelemente gleich 5 ist, und es wird kein Weckvorgang durchgeführt.
这样做的目的是尽快的唤醒阻塞线程,可以更快的完成插入元素操作。因为线程存和取的操作相互之间并不是互斥的,而是独立运行的,提高吞吐量。
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
1.获得取锁
2.判断当前队列是否为空
如果队列没有元素,调用 notEmpty 条件队列的 await() 方法,将该线程阻塞,暂停该线程的获取操作。避免获取元素出错。
如果不为空,则直接调用出队函数 dequeue 移除队列第一个元素,并返回给客户端。
3.检查此时队列是否为空
如果不为空,则调用 notEmpty 条件队列的 signal() 方法,唤醒被阻塞在 notEmpty 条件队列的线程。
4.释放锁
5.检查获取元素前的队列元素数量是否等于最大容量
等于最大容量,因为此时已经取出一个元素,因此队列处于未满的状态,可以唤醒阻塞在 notFull 条件的线程,让线程继续插入元素。
步骤 3 的目的是尽快的唤醒阻塞线程,可以更快的完成取元素操作。提高吞吐量。可以尝试自己画出流程图。
private void enqueue(Node<E> node) { last = last.next = node; }
入队函数极其简单,只要将最后一个元素的 next 指针指向当前元素即完成了插入操作。
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
我们前面说 LinkedBlockingQueue 使用的是有头链表。头节点只是作为一个标志,实际上并不是一个真正的元素。当获取元素时,将头节点的下一个节点作为头节点,将原来的头节点取消引用,被垃圾回收即可。
LinkedBlockingQueue 和 ArrayBlockingQueue 一样适用于多个线程之间需要共享数据、协调任务执行的场景。因此可以总结出以下几个应用场景:
线程池:线程池是一个常见的并发编程模型,它通过线程池中的线程执行任务。并且可以重复使用这些线程。在线程池中,可以使用 LinkedBlockingQueue 来存储需要执行的任务,以此控制任务数量和执行顺序。当线程池中的线程执行完任务之后,可以从 LinkedBlockingQueue 中取出下一个任务执行。
生产者-消费者:在生产者-消费者模型中,生产者负责生产数据,消费者负责对数据进行处理。在这种模式下,LinkedBlockingQueue 可以作为生产者与消费者之间的数据通道,保证线程安全和数据正确。
Nacos: Nacos 是一个动态服务发现、配置和服务管理平台,它使用 LinkedBlockingQueue 来实现内部的任务队列。
Tomcat:从 Tomcat 7 开始,请求队列默认使用了 LinkedBlockingQueue 实现。
Hystrix: 一个流行的容错框架,其默认使用 LinkedBlockingQueue 作为请求队列。
Das obige ist der detaillierte Inhalt vonSo beherrschen Sie Java LinkedBlockingQueue. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!