> Java > java지도 시간 > 본문

Java 차단 큐 BlockingQueue 인스턴스 분석

王林
풀어 주다: 2023-04-25 15:13:15
앞으로
1143명이 탐색했습니다.

큐 유형

  • 무제한 큐용량 제한 없음, 저장소에 따라만 변경됨

  • 제한된 큐최대 용량 정의

무한 큐에 요소 추가 모든 작업이 차단되지 않습니다(스레드도 마찬가지). -안전함) 매우 큰 용량으로 확장될 수 있습니다. 무한 차단 대기열 사용 BlockingQueue 생산자-소비자 모델을 설계할 때 가장 중요한 점은 생산자가 대기열에 메시지를 추가하는 만큼 빠르게 소비자가 메시지를 소비할 수 있어야 한다는 것입니다. 그렇지 않으면 메모리가 부족하여 OutOfMemory 예외가 발생할 수 있습니다.

데이터 구조

  • 1. 일반적으로 연결 목록 또는 배열을 사용하여 구현됩니다.

  • 2 일반적으로 FIFO(선입선출) 특성을 가지며 이중 종료 대기열로 설계될 수도 있습니다

  • 3. 큐의 주요 작업 : Entering and dequeuing

Blocking Queue BlockingQueue

정의: 스레드 통신에서는 동시성이 아무리 높아도 언제든지 단일 JVM에서는 하나의 스레드만 항상 대기열 또는 대기열 제거 작업과 동시에 대기열에 들어갈 수 있습니다. BlockingQueue는 명시적인 동기화 없이 스레드 간에 공유될 수 있습니다. 차단 대기열 유형:

Wait

공통 차단 대기열

Java 차단 큐 BlockingQueue 인스턴스 분석

ArrayBlockingQueue:

배열에서 지원하는 경계 대기열

    응용 프로그램 시나리오가 많습니다. 및 스레드 풀의 생산자-소비자 모델
  • 작동 원리:
  • 스레드 안전을 보장하기 위해 ReentrantLock을 기반으로 하며 조건
    • LinkedBlockingQueue:

      무제한 대기열에 따라 대기열이 가득 찼을 때 차단을 구현합니다. 연결된 목록 기반(이론적으로 제한됨)

    Priority BlockingQueue:
  • 우선순위 힙에서 지원되는 무제한 우선순위 대기열

  • DelayQueue:
우선순위 힙에서 지원되는 시간 기반 스케줄링 대기열, 제한되지 않은 대기열을 기반으로 내부적으로 구현됨 PriorityQueue 및 배열 확장 구현을 기반으로 하는 무제한 대기열
  • Usage:
  • 대기열에 추가된 개체는 Delayed 인터페이스를 구현해야 하며 Delayed는 Comparable 인터페이스에서 통합됩니다.
  • 응용 시나리오:
  • 영화 티켓 판매
    • 작동 원리:

      대기열은 시간에 따라 내부적으로 처리됩니다. 우선 순위에 따라 정렬됩니다. 클래스 스레드 풀 주기 실행을 지연합니다.
    • 모두 put() 및 take()

      와 같은 메서드를 사용하여 BlockingQueue 인터페이스를 구현합니다. 생성 방법은 다음과 같습니다.
    • BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<> (666);
      로그인 후 복사

      BlockingQueue API

      요소 추가:

method

의미

add()

삽입에 성공하면 true를 반환하고, 그렇지 않으면 IllegalStateException 예외가 발생합니다.put()큐에 지정된 요소를 삽입합니다. 가득 차면 삽입할 공간이 있을 때까지 차단됩니다offer()삽입에 성공하면 true를 반환하고, 그렇지 않으면 false를 반환합니다offer(E e, 긴 시간 초과, TimeUnit 단위)다음을 시도해 보세요. 대기열에 요소를 삽입하면 공간 삽입이 있을 때까지 차단되며 차단에는 시간 제어가 있습니다 검색 요소: 방법 의미

take( )

큐의 헤드 요소를 가져와서 큐가 비어 있으면 삭제합니다. 그런 다음 해당 요소가 사용 가능해질 때까지 차단하고 기다립니다.poll(긴 시간 제한, TimeUnit 단위)큐의 헤드를 검색하고 제거합니다. 큐, 필요한 경우 요소를 사용할 수 있을 때까지 지정된 대기 시간을 기다리고 시간이 초과되면 null을 반환합니다.

ArrayBlockingQueue 源码简解

实现:同步等待队列(CLH)+ 条件等待队列满足条件的元素在CLH队列中等待锁,不满足条件的队列挪到条件等待队列,满足条件后再从 tail 插入 CLH 队列

线程获取锁的条件: 在 CLH 队列里等待的 Node 节点,并且 Node 节点的前驱节点是 Singal。条件等待队列里的线程是无法获取锁的。

/**
 * 构造方法
 * 还有两个构造函数,一个无fair参数,一个可传入集合,创建时插入队列
 * @param capacity 固定容量
 * @param fair 默认是false:访问顺序未指定; true:按照FIFO顺序处理
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
   if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair); // 根据fair创建对应的锁
    // 条件对象,配合容器能满足业务
    notEmpty = lock.newCondition(); // 出队条件对象
    notFull =  lock.newCondition(); // 入队条件对象
}
/**
 * 入队方法
 * 在队列的尾部插入指定的元素,如果队列已满,则等待空间可用
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e); // 检查put对象是否为空,空抛出异常
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 若未被中断尝试获取锁,详见下文
    try {
        // 队列中元素的数量 等于 排队元素的长度
        while (count == items.length)
            notFull.await(); // 见下文
        enqueue(e); // 元素入队
    } finally {
        lock.unlock();
    }
}
/**
 * 出队方法
 * 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 见下文
    try {
        while (count == 0)
            notEmpty.await(); // 见下文
        return dequeue(); // 元素出队
    } finally {
        lock.unlock();
    }
}
로그인 후 복사

令当前线程等待,直到收到信号或被中断详:与此 Condition 关联的锁被自动释放,进入等待,并且处于休眠状态,直到发生以下四种情况之一:

  • ①其他线程调用这个Condition的 signal 方法,当前线程恰好被选为要被唤醒的线程;

  • ②其他线程调用这个条件的 signalAll 方法

  • ③其他线程中断当前线程,支持中断线程挂起;

  • ④一个“虚假的唤醒”发生了。

在这些情况下,在此方法返回之前,当前线程必须重新获得与此条件相关联的锁。当线程返回时,保证它持有这个锁。

如果当前线程有以下两种情况之一:

  • ①在进入该方法时设置中断状态;

  • ②在等待时被中断,支持线程挂起的中断 抛出InterruptedException

生产者消费者模式

BlockingQueue 可以在线程之间共享而无需任何显式同步,在生产者消费者之间,只需要将阻塞队列以参数的形式进行传递即可。它内部的机制会自动保证线程的安全性。

生产者:实现了 Runnable 接口,每个生产者生产100种商品和1个中断标记后完成线程任务

@Slf4j
@Slf4j
public class Producer implements Runnable{
    // 作为参数的阻塞队列
    private BlockingQueue<Integer> blockingQueue;
    private final int stopTag;
    /**
     * 构造方法
     * @param blockingQueue
     * @param stopTag
     */
    public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) {
        this.blockingQueue = blockingQueue;
        this.stopTag = stopTag;
    }
    @Override
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
   private void generateNumbers() throws InterruptedException {
        // 每个生产者都随机生产10种商品
        for (int i = 0; i < 10; i++) {
            int product = ThreadLocalRandom.current().nextInt(1000,1100);
            log.info("生产者{}号,生产了商品,编号为{}",Thread.currentThread().getId(),product);
            blockingQueue.put(product);
        }
        // 生产终止标记
        blockingQueue.put(stopTag);
        log.info("生产者{}号,生产了第终止标记编号{}",Thread.currentThread().getId(),Thread.currentThread().getId());
    }
}
로그인 후 복사

消费者:消费者拿到终止消费标记终止消费,否则消费商品,拿到终止标记后完成线程任务

@Slf4j
public class Consumer implements Runnable{
    // 作为参数的阻塞队列
    private BlockingQueue<Integer> queue;
    private final int stopTage;
    public Consumer(BlockingQueue<Integer> queue, int stopTage) {
        this.queue = queue;
        this.stopTage = stopTage;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Integer product = queue.take();
                if (product.equals(stopTage)) {
                    log.info("{}号消费者,停止消费,因为拿到了停止消费标记",Thread.currentThread().getId());
                    return;
                }
                log.info("{}号消费者,拿到的商品编号:{}",Thread.currentThread().getId(),product);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
로그인 후 복사

客户端类: 创建与计算机 CPU 核数相同的线程数,与 16个生产者

public class ProductConsumerTest {
    public static void main(String[] args) {
        // 阻塞队列容量
        int blockingQueueSize = 10;
        // 生产者数量
        int producerSize = 16;
        // 消费者数量 = 计算机线程核数 8
        int consumerSize = Runtime.getRuntime().availableProcessors();
        // 终止消费标记
        int stopTag = Integer.MAX_VALUE;
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize);
        // 创建16个生产者线程
        for (int i = 0; i < producerSize; i++) {
            new Thread(new Producer(blockingQueue, stopTag)).start();
        }
        // 创建8个消费者线程
        for (int j = 0; j < consumerSize; j++) {
            new Thread(new Consumer(blockingQueue, stopTag)).start();
        }
    }
}
로그인 후 복사

延迟队列 DelayQueue

定义: Java 延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用 poll() 方法会返回 null 值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 来判断。延时队列不能存放空元素。

/**
 * 电影票类,实现了Delayed接口,重写 compareTo 和 getDelay方法
 */
public class MovieTicket implements Delayed {
    //延迟时间
    private final long delay;
    //到期时间
    private final long expire;
    //数据
    private final String msg;
    //创建时间
    private final long now;
    public long getDelay() {
        return delay;
    }
    public long getExpire() {
        return expire;
    }
    public String getMsg() {
        return msg;
    }
    public long getNow() {
        return now;
    }
    /**
     * @param msg 消息
     * @param delay 延期时间
     */
    public MovieTicket(String msg , long delay) {
        this.delay = delay;
        this.msg = msg;
        expire = System.currentTimeMillis() + delay;    //到期时间 = 当前时间+延迟时间
        now = System.currentTimeMillis();
    }
    /**
     * @param msg
     */
    public MovieTicket(String msg){
        this(msg,1000);
    }
    public MovieTicket(){
        this(null,1000);
    }
    /**
     * 获得延迟时间   用过期时间-当前时间,时间单位毫秒
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire
                - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }
    /**
     * 用于延迟队列内部比较排序  当前时间的延迟时间 - 比较对象的延迟时间
     * 越早过期的时间在队列中越靠前
     * @param delayed
     * @return
     */
    @Override
    public int compareTo(Delayed delayed) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS)
                - delayed.getDelay(TimeUnit.MILLISECONDS));
    }
}
로그인 후 복사

测试类:

public static void main(String[] args) {
    DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>();
    MovieTicket ticket = new MovieTicket("电影票1",10000);
    delayQueue.put(ticket);
    MovieTicket ticket1 = new MovieTicket("电影票2",5000);
    delayQueue.put(ticket1);
    MovieTicket ticket2 = new MovieTicket("电影票3",8000);
    delayQueue.put(ticket2);
    log.info("message:--->入队完毕");
    while( delayQueue.size() > 0 ){
        try {
            ticket = delayQueue.take();
            log.info("电影票出队:{}",ticket.getMsg());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
로그인 후 복사

从运行结果可以看出队列是延迟出队,间隔和我们所设置的时间相同

위 내용은 Java 차단 큐 BlockingQueue 인스턴스 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:yisu.com
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿
회사 소개 부인 성명 Sitemap
PHP 중국어 웹사이트:공공복지 온라인 PHP 교육,PHP 학습자의 빠른 성장을 도와주세요!