Unbounded queueNo capacity limit, only changes with storage
Bounded queueDefines the maximum capacity
All operations that add elements to the infinite queue will never block (also thread-safe), so It can grow to very large capacities. Using an infinite blocking queue BlockingQueue The most important thing when designing a producer-consumer model is that the consumer should be able to consume messages as fast as the producer adds messages to the queue. Otherwise, there may be insufficient memory and an OutOfMemory exception may be thrown.
1. Usually implemented using linked lists or arrays
2. Generally with FIFO (first in, first out) Feature, it can also be designed as a double-ended queue
3. The main operations of the queue:Entering and dequeuing
Definition:In thread communication, at any time, no matter how high the concurrency is, on a single JVM, only one thread can always queue or enqueue the queue at the same time. Dequeue operation. BlockingQueue can be shared between threads without any explicit synchronization
Types of blocking queues:
In JAVA Application scenarios:Thread pool, SpringCloud-Eureka three-level cache, Nacos, MQ, Netty, etc.
ArrayBlockingQueue:Bounded queue supported by array
Application scenario:There are many applications and producer-consumer models in the thread pool
Working principle:Based on ReentrantLock to ensure thread safety, and based on Condition to achieve blocking when the queue is full
LinkedBlockingQueue:Unbounded queue based on linked list (theoretically bounded)
##PriorityBlockingQueue:By Unbounded priority queue supported by the priority heap
DelayQueue:A time-based scheduling queue supported by the priority heap, internally implemented based on the unbounded queue PriorityQueue, and Array-based expansion implementation of unbounded queue
Instructions for use:The objects added to the queue must implement the Delayed interface, and Delayed is integrated from the Comparable interface
Application scenarios:Selling movie tickets, etc.
The queue will be prioritized according to time Sort. Delay class thread pool cycle execution.
BlockingQueueblockingQueue = new ArrayBlockingQueue<> (666);
add() | |
---|---|
put() | |
offer() | |
offer(E e, long timeout, TimeUnit unit) | |
实现:同步等待队列(CLH)+ 条件等待队列满足条件的元素在CLH队列中等待锁,不满足条件的队列挪到条件等待队列,满足条件后再从 tail 插入 CLH 队列
线程获取锁的条件:在 CLH 队列里等待的 Node 节点,并且 Node 节点的前驱节点是 Singal。条件等待队列里的线程是无法获取锁的。
/** * 构造方法 * 还有两个构造函数,一个无fair参数,一个可传入集合,创建时插入队列 * @param capacity 固定容量 * @param fair 默认是false:访问顺序未指定; true:按照FIFO顺序处理 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); // 根据fair创建对应的锁 // 条件对象,配合容器能满足业务 notEmpty = lock.newCondition(); // 出队条件对象 notFull = lock.newCondition(); // 入队条件对象 } /** * 入队方法 * 在队列的尾部插入指定的元素,如果队列已满,则等待空间可用 */ public void put(E e) throws InterruptedException { checkNotNull(e); // 检查put对象是否为空,空抛出异常 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 若未被中断尝试获取锁,详见下文 try { // 队列中元素的数量 等于 排队元素的长度 while (count == items.length) notFull.await(); // 见下文 enqueue(e); // 元素入队 } finally { lock.unlock(); } } /** * 出队方法 * 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 见下文 try { while (count == 0) notEmpty.await(); // 见下文 return dequeue(); // 元素出队 } finally { lock.unlock(); } }
令当前线程等待,直到收到信号或被中断详:与此 Condition 关联的锁被自动释放,进入等待,并且处于休眠状态,直到发生以下四种情况之一:
①其他线程调用这个Condition的 signal 方法,当前线程恰好被选为要被唤醒的线程;
②其他线程调用这个条件的 signalAll 方法
③其他线程中断当前线程,支持中断线程挂起;
④一个“虚假的唤醒”发生了。
在这些情况下,在此方法返回之前,当前线程必须重新获得与此条件相关联的锁。当线程返回时,保证它持有这个锁。
如果当前线程有以下两种情况之一:
①在进入该方法时设置中断状态;
②在等待时被中断,支持线程挂起的中断 抛出InterruptedException
BlockingQueue 可以在线程之间共享而无需任何显式同步,在生产者消费者之间,只需要将阻塞队列以参数的形式进行传递即可。它内部的机制会自动保证线程的安全性。
生产者:实现了 Runnable 接口,每个生产者生产100种商品和1个中断标记后完成线程任务
@Slf4j @Slf4j public class Producer implements Runnable{ // 作为参数的阻塞队列 private BlockingQueueblockingQueue; private final int stopTag; /** * 构造方法 * @param blockingQueue * @param stopTag */ public Producer(BlockingQueue blockingQueue,int stopTag) { this.blockingQueue = blockingQueue; this.stopTag = stopTag; } @Override public void run() { try { generateNumbers(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void generateNumbers() throws InterruptedException { // 每个生产者都随机生产10种商品 for (int i = 0; i < 10; i++) { int product = ThreadLocalRandom.current().nextInt(1000,1100); log.info("生产者{}号,生产了商品,编号为{}",Thread.currentThread().getId(),product); blockingQueue.put(product); } // 生产终止标记 blockingQueue.put(stopTag); log.info("生产者{}号,生产了第终止标记编号{}",Thread.currentThread().getId(),Thread.currentThread().getId()); } }
消费者:消费者拿到终止消费标记终止消费,否则消费商品,拿到终止标记后完成线程任务
@Slf4j public class Consumer implements Runnable{ // 作为参数的阻塞队列 private BlockingQueuequeue; private final int stopTage; public Consumer(BlockingQueue queue, int stopTage) { this.queue = queue; this.stopTage = stopTage; } @Override public void run() { try { while (true) { Integer product = queue.take(); if (product.equals(stopTage)) { log.info("{}号消费者,停止消费,因为拿到了停止消费标记",Thread.currentThread().getId()); return; } log.info("{}号消费者,拿到的商品编号:{}",Thread.currentThread().getId(),product); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
客户端类:创建与计算机 CPU 核数相同的线程数,与 16个生产者
public class ProductConsumerTest { public static void main(String[] args) { // 阻塞队列容量 int blockingQueueSize = 10; // 生产者数量 int producerSize = 16; // 消费者数量 = 计算机线程核数 8 int consumerSize = Runtime.getRuntime().availableProcessors(); // 终止消费标记 int stopTag = Integer.MAX_VALUE; BlockingQueueblockingQueue = new ArrayBlockingQueue<>(blockingQueueSize); // 创建16个生产者线程 for (int i = 0; i < producerSize; i++) { new Thread(new Producer(blockingQueue, stopTag)).start(); } // 创建8个消费者线程 for (int j = 0; j < consumerSize; j++) { new Thread(new Consumer(blockingQueue, stopTag)).start(); } } }
定义:Java 延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用 poll() 方法会返回 null 值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 来判断。延时队列不能存放空元素。
/** * 电影票类,实现了Delayed接口,重写 compareTo 和 getDelay方法 */ public class MovieTicket implements Delayed { //延迟时间 private final long delay; //到期时间 private final long expire; //数据 private final String msg; //创建时间 private final long now; public long getDelay() { return delay; } public long getExpire() { return expire; } public String getMsg() { return msg; } public long getNow() { return now; } /** * @param msg 消息 * @param delay 延期时间 */ public MovieTicket(String msg , long delay) { this.delay = delay; this.msg = msg; expire = System.currentTimeMillis() + delay; //到期时间 = 当前时间+延迟时间 now = System.currentTimeMillis(); } /** * @param msg */ public MovieTicket(String msg){ this(msg,1000); } public MovieTicket(){ this(null,1000); } /** * 获得延迟时间 用过期时间-当前时间,时间单位毫秒 * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS); } /** * 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间 * 越早过期的时间在队列中越靠前 * @param delayed * @return */ @Override public int compareTo(Delayed delayed) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - delayed.getDelay(TimeUnit.MILLISECONDS)); } }
测试类:
public static void main(String[] args) { DelayQueuedelayQueue = new DelayQueue (); MovieTicket ticket = new MovieTicket("电影票1",10000); delayQueue.put(ticket); MovieTicket ticket1 = new MovieTicket("电影票2",5000); delayQueue.put(ticket1); MovieTicket ticket2 = new MovieTicket("电影票3",8000); delayQueue.put(ticket2); log.info("message:--->入队完毕"); while( delayQueue.size() > 0 ){ try { ticket = delayQueue.take(); log.info("电影票出队:{}",ticket.getMsg()); } catch (InterruptedException e) { e.printStackTrace(); } } }
从运行结果可以看出队列是延迟出队,间隔和我们所设置的时间相同
The above is the detailed content of Java blocking queue BlockingQueue instance analysis. For more information, please follow other related articles on the PHP Chinese website!
take() | |
---|---|
poll (long timeout, TimeUnit unit) | |