Queues can be seen everywhere in life. You need to queue up to pay at the hospital, you need to queue up to do nucleic acid tests, you need to queue up at traffic lights for cars, etc.
The queue is a data structure in which the first-come-first-come-first-served order is placed in the front, and the second-come-first-served order is placed in the back. When dequeuing, the queue is also ranked first come first served. Implemented using arrays and linked lists. Usually used to coordinate task execution and data exchange.
LinkedBlockingQueue is an optional bounded blocking queue. Bounded means that the queue has a maximum capacity; blocking means that if the queue is full, you want to continue adding to the queue. element, then this operation will be suspended, and the addition operation will not continue until there is space in the queue. If the queue is already empty and you want to get elements from the queue, the operation will be suspended until there are elements in the queue to continue the acquisition operation.
LinkedBlockingQueue internally uses a linked list as the storage structure of elements. Two locks are used internally, respectively for storage operations and retrieval operations.
When performing access operations, the lock must be acquired first before the access operation can be performed to ensure that LinkedBlockingQueue is thread-safe.
LinkedBlockingQueue passes two Condition condition queues, one notFull condition and one notEmpty condition. When inserting elements into the queue, if it is judged that the current queue is full, the thread will be blocked through the notFull condition until other threads notify the thread that the queue can continue to insert elements. When removing elements from the queue, if it is determined that the current queue is empty, the thread will be blocked through the notEmpty condition until other threads can continue to obtain elements through this thread.
This ensures that there will be no errors in the thread's access operations. Avoid discarding inserted elements when the queue is full; also avoid getting a null value when the queue is empty.
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
In the parameterless constructor, Integer.MAX_VALUE
is used by default as the maximum capacity of the queue.
In the parameterized constructor, you can specify the maximum capacity of the queue yourself, and create the head node and tail node. Then LinkedBlockingQueue uses headed one-way linked list.
private final int capacity; /** Current number of elements */ private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; // 取锁 private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); // 存锁 private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition();
And when the object is initialized, two locks are created, which are used for storage operations and fetch operations respectively. Two conditional queues are created, respectively for empty and full queue conditions.
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); final int c; final Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
1. Get the lock
2. Determine whether the current queue is full
If the queue is full , call the await() method of the notFull condition queue to block the thread and suspend the insertion operation of the thread. Avoid internal overflow problems.
If it is not full, call the enqueue function enqueue directly to insert it into the end of the queue.
3. Check whether the queue is full at this time
If it is not full, call the signal() method of the notFull condition queue to wake up the queue blocked in the notFull condition queue thread.
4.Unlock
Check whether the number of queue elements before inserting the element is equal to 0
If it is equal to 0, call notEmpty The signal() method of the conditional queue notifies its queue that it is not empty now, and can wake up the blocked thread to obtain elements.
Why do we need to call the signal() method of the notFull condition queue? Because the locks used for queue fetching operations and storage operations are different, it means that when one thread performs a deposit operation, other threads can perform fetch operations. Let's look at the following example:
The total capacity of the queue is 5, and the current number of elements is 5. Thread A acquires the lock and wants to insert the element. But because the queue capacity is full, the lock is released and added to the condition queue, waiting to be awakened.
Thread B acquires the lock and wants to insert an element. But because the queue capacity is full, the lock is released and added to the condition queue, waiting to be awakened.
Thread C acquired the lock and took out element 1. And wake up the blocked thread A in the condition queue through the signal method of notFull. Thread A joins the synchronization queue after being awakened, but has not yet competed for the lock at this time.
Thread D acquired the lock and took out element 2. But the code that wakes up the blocked thread has not yet been executed.
Thread A competes for the lock and starts inserting elements. After inserting the element, it is checked that the number of queue elements is 4, which is less than the total capacity of the queue, so the signal method of notFull is executed to wake up thread B that is blocked in the conditional queue. After thread B is awakened, it joins the synchronization queue and starts competing for the lock.
Thread B competes for the lock and starts inserting elements. After inserting the element, it is checked that the number of queue elements is equal to 5, and no wake-up operation is performed.
这样做的目的是尽快的唤醒阻塞线程,可以更快的完成插入元素操作。因为线程存和取的操作相互之间并不是互斥的,而是独立运行的,提高吞吐量。
public E take() throws InterruptedException { final E x; final int c; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
1.获得取锁
2.判断当前队列是否为空
如果队列没有元素,调用 notEmpty 条件队列的 await() 方法,将该线程阻塞,暂停该线程的获取操作。避免获取元素出错。
如果不为空,则直接调用出队函数 dequeue 移除队列第一个元素,并返回给客户端。
3.检查此时队列是否为空
如果不为空,则调用 notEmpty 条件队列的 signal() 方法,唤醒被阻塞在 notEmpty 条件队列的线程。
4.释放锁
5.检查获取元素前的队列元素数量是否等于最大容量
等于最大容量,因为此时已经取出一个元素,因此队列处于未满的状态,可以唤醒阻塞在 notFull 条件的线程,让线程继续插入元素。
步骤 3 的目的是尽快的唤醒阻塞线程,可以更快的完成取元素操作。提高吞吐量。可以尝试自己画出流程图。
private void enqueue(Node<E> node) { last = last.next = node; }
入队函数极其简单,只要将最后一个元素的 next 指针指向当前元素即完成了插入操作。
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
我们前面说 LinkedBlockingQueue 使用的是有头链表。头节点只是作为一个标志,实际上并不是一个真正的元素。当获取元素时,将头节点的下一个节点作为头节点,将原来的头节点取消引用,被垃圾回收即可。
LinkedBlockingQueue 和 ArrayBlockingQueue 一样适用于多个线程之间需要共享数据、协调任务执行的场景。因此可以总结出以下几个应用场景:
线程池:线程池是一个常见的并发编程模型,它通过线程池中的线程执行任务。并且可以重复使用这些线程。在线程池中,可以使用 LinkedBlockingQueue 来存储需要执行的任务,以此控制任务数量和执行顺序。当线程池中的线程执行完任务之后,可以从 LinkedBlockingQueue 中取出下一个任务执行。
生产者-消费者:在生产者-消费者模型中,生产者负责生产数据,消费者负责对数据进行处理。在这种模式下,LinkedBlockingQueue 可以作为生产者与消费者之间的数据通道,保证线程安全和数据正确。
Nacos: Nacos 是一个动态服务发现、配置和服务管理平台,它使用 LinkedBlockingQueue 来实现内部的任务队列。
Tomcat:从 Tomcat 7 开始,请求队列默认使用了 LinkedBlockingQueue 实现。
Hystrix: 一个流行的容错框架,其默认使用 LinkedBlockingQueue 作为请求队列。
The above is the detailed content of How to master Java LinkedBlockingQueue. For more information, please follow other related articles on the PHP Chinese website!