Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
366 views
in Technique[技术] by (71.8m points)

延迟队列DelayQueue的getDelay()方法,调用频率怎么会越来越快?

问题描述

java延迟队列DelayQueue的getDelay()方法,调用频率怎么会越来越快?

代码如下,设为过期时间20秒,刚开始差不多没有2,3毫秒调用一次getDelay()方法,到最后每毫秒会调用数十次。


public class DelayedQueueTest {
    public static void main(String[] args) throws InterruptedException {
        Item item1 = new Item("item1", 20, TimeUnit.SECONDS);
        DelayQueue<Item> queue = new DelayQueue<>();
        queue.put(item1);
        System.out.println("begin time:" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        Item take = queue.take();
        System.out.format("name:{%s}, time:{%s}
", take.name, LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
    }
}

class Item implements Delayed {
    private long time;
    String name;

    public Item(String name, long time, TimeUnit unit) {
        this.name = name;
        this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        // 从这个打印可以看出,随着时间靠近队列首位元素,频率越来越快。
        System.out.println(time - System.currentTimeMillis());
        return time - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        Item item = (Item) o;
        long diff = this.time - item.time;
        if (diff <= 0) {
            return -1;
        } else {
            return 1;
        }
    }

    @Override
    public String toString() {
        return "Item{" +
                "time=" + time +
                ", name='" + name + ''' +
                '}';
    }

看了源码,是一个死循环(for (;;) )里面调用的,但是没看出来怎么控制调用频率的?

一时有些懵逼,求大佬指点。。。

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

available.awaitNanos(delay);

这行代码干的好事,继续往下走。

public final long awaitNanos(long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        nanosTimeout = deadline - System.nanoTime();
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime();
}

其中的这两行代码:

if (nanosTimeout >= spinForTimeoutThreshold)
    LockSupport.parkNanos(this, nanosTimeout);

spinForTimeoutThreshold是个常量,值为1000,也就是说awaitNanos参数超过1ms时,会额外调用一个park方法,增加方法耗时,少于1ms时,不再调用,循环频率快很多。

源码位于java8.java.util.concurrent.locks.AbstractQueuedSynchronizer


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...