登录

java - DelayQueue中take方法的相关疑问(内存泄漏)

DelayQueue 学习中遇到的疑问

环境:jdk1.8.0_73

在学习DelayQueuetake()方法时,关于源码有多处不理解,特求助。

先贴源码

public E take() throws InterruptedException {
    //获取锁
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lockInterruptibly();
    try {
        //无限尝试 
        for (;;) {
            //获取 priorityQueue 的第一个元素,不会移除
            E first = q.peek();
            //如果对应的 priorityQueue 中没有元素,也就是first为空,那么就等待(阻塞等待唤醒)
            if (first == null)
                available.await();
            else {
                //priorityQueue中有元素,获取first的过期时间
                long delay = first.getDelay(NANOSECONDS);
                //如果已经过期直接取出
                if (delay <= 0)
                    return q.poll();
                //1111111111111111111  此处疑问
                first = null; // don't retain ref while waiting
                //222222222222  此处疑问
                if (leader != null)
                    available.await();
                else {
                    //如果 leader 为空,也就是没有线程等待获取队列头元素,获取当前线程,并且将leader设置为当前线程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        //333333333333
                        //等待第一个元素过期
                        available.awaitNanos(delay);
                    } finally {
                        //如果leader是当前线程,leader置空
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        //for 循环结束后 leader 为空 并且priorityQueue中有元素 唤醒等待队列。
        if (leader == null && q.peek() != null)
            available.signal();
        //释放锁
        lock.unlock();
    }
}

顺便贴上部分变量的说明

    /**
 * Thread designated to wait for the element at the head of
 * the queue.  This variant of the Leader-Follower pattern
 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
 * minimize unnecessary timed waiting.  When a thread becomes
 * the leader, it waits only for the next delay to elapse, but
 * other threads await indefinitely.  The leader thread must
 * signal some other thread before returning from take() or
 * poll(...), unless some other thread becomes leader in the
 * interim.  Whenever the head of the queue is replaced with
 * an element with an earlier expiration time, the leader
 * field is invalidated by being reset to null, and some
 * waiting thread, but not necessarily the current leader, is
 * signalled.  So waiting threads must be prepared to acquire
 * and lose leadership while waiting.
 */
private Thread leader = null;

  /**
 * Condition signalled when a newer element becomes available
 * at the head of the queue or a new thread may need to
 * become leader.
 */
private final Condition available = lock.newCondition();

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();

首先说说我的理解:

首先leader是准备获取头元素的线程,但是不稳定有可能会被质空,如果leader不为空则说明有线程正在获取头元素,所以其他线程会阻塞,如果leader为空,那么就是无线程在尝试获取头元素,之所以不稳定是因为在添加元素的时候(add()),有可能导致头元素变动,所以如果添加的时候导致头元素变动,那么leader会被置空。(如果理解不正确希望指正)

我的问题

我有两个问题:

1、第一个问题

先说数字2处的问题,我不懂为什么这个地方需要判断leader是否为空,在我的理解里,这个地方leader是一定为空的。

理由如下:

我查找了一下所有leader变量被更改的地方总计有5处,offer()方法一处,也就是之前说的头元素变动会把leader置空,take()方法两处,poll(long timeout, TimeUnit unit)方法两处。但是仔细观察的话会发现不管是take()还是poll()方法都是先获得锁,再for循环,而且整个方法结束后leader变量会被置空,也就是说不管是take还是poll方法只要是执行过leader就会被置空,也许有人说万一在leader赋值后抛异常怎么办?从代码来看只能从代码3的地方抛异常,为什么呢,因为如果3不抛异常,那么leader立马就被置空。综上所述,既然take()方法先获取到了锁,那么为什么leader还会为非空!也就是为什么要在2的地方进行判断非空。

2、第二个问题

数字1处的问题,为什么此处first置空

我查询了相关资料,发现在jdk1.7中是没有这段代码的,资料表述的意思是防止内存泄漏,first变量被gc回收。

资料地址 http://www.jianshu.com/p/e0bcc9eae0ae

很抱歉没找到官方说明。

引用自资料中相关说明

take方法中为什么释放first元素
first = null; // don't retain ref while waiting
我们可以看到doug lea后面写的注释,那么这段代码有什么用呢?

想想假设现在延迟队列里面有三个对象。

从第二条我就看不懂了

线程B进来获取first,进入else的阻塞操作,然后无限期等待

在我理解,线程进来的时候,线程A已经释放锁了,因为ReentranceLock是排他锁,并非共享锁,所以之前的first变量指向的是线程A取走的那个元素,而且方法出栈后,相关变量应该是会被gc的。那么线程B获取first的时候,应该是指向不同的元素了吧,所以为啥线程B还会持有之前的first变量。

求大神详解!我是哪里理解的不对,还是有什么基础概念想错了,还是说忽略了某些部分。

# Java
阿神 阿神 2386 天前 579 次浏览

全部回复(4) 我要回复

  • 迷茫

    迷茫2017-04-18 10:25:27

    自问自答

    缺失的知识点是:

    Condition.await()会自行释放锁。

    参考资料

    https://segmentfault.com/q/1010000002390706
    http://stackoverflow.com/questions/27058828/why-await-of-condition-releases-the-lock-but-signal-does-not

    以下摘取部分内容

    jdk1.8 await方法的说明

    The lock associated with this Condition is atomically released and the current thread becomes disabled for thread scheduling purposes and lies dormant until one of four things happens:

    • Some other thread invokes the signal() method for this Condition and the current thread happens to be chosen as the thread to be awakened; or

    • Some other thread invokes the signalAll() method for this Condition; or

    • Some other thread interrupts the current thread, and interruption of thread suspension is supported; or

    • A "spurious wakeup" occurs.

    In all cases, before this method can return the current thread must re-acquire the lock associated with this condition. When the thread returns it is guaranteed to hold this lock.

    评论

    the API would become confusing: there would be more than one method releasing the lock

    自行的测试代码

    public class ConditionReleaseTest {
    
    public static void main(String[] args) {
    
        ReentrantLock lock = new ReentrantLock();
    
        final Condition con1 = lock.newCondition();
    
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
    
                lock.lock();
                try {
                    System.out.println("线程1获取锁,condition wait 10s");
                    con1.await(10, TimeUnit.SECONDS);
                    System.out.println("线程1获取锁,condition wait 10s 结束 假定拥有锁?");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("线程1释放锁");
                    lock.unlock();
    
                }
            }
        });
    
        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
    
                lock.lock();
                try {
                    Thread.sleep(100L);
                    System.out.println("线程2 获得锁 睡眠20秒 拥有锁");
                    Thread.sleep(20000L);
                } catch (Exception e) {
                } finally {
                    System.out.println("线程2释放锁");
                    lock.unlock();
                }
            }
        });
        thread1.start();
        thread2.start();
    }
    }
    

    运行结果:

    线程1获取锁,condition wait 10s
    线程2 获得锁 睡眠20秒 拥有锁
    线程2释放锁
    线程1获取锁,condition wait 10s 结束 假定拥有锁?
    线程1释放锁
    

    在有以上知识点的基础上,我所有的疑问都可以解释的通了。

    first变量会引起内存泄漏

    感谢所有回答的人。

    @iMouseWu @kevinz @scort

    ps: peek()只是查询第一个元素,不会从队列里取出。

    回复
    0
  • 天蓬老师

    天蓬老师2017-04-18 10:25:27

    1. 线程一执行了available.awaitNanos(delay);这个时候会释放锁,这个线程二执行take()方法,那么leader != null

    2. peek()方法是取出队列的第一条数据(注意和poll的区别)

    回复
    0
  • 迷茫

    迷茫2017-04-18 10:25:27

    1. 也许你是对的,我也测试了一下正常情况没发现leader不为null场景

    2. 这个我猜测可能是为了first能更早的被gc回收,因为在之后是有await在跳出作用域的时间并不确定

    回复
    0
  • 伊谢尔伦

    伊谢尔伦2017-04-18 10:25:27

    看起来 @iMouseWu 的1 和 @kevinz 的2 就是你问题的答案啦。

    回复
    0
  • 取消 回复 发送