Contents

基于优先级队列的无界队列

1
2
3
4
/**
* Creates a new {@code DelayQueue} that is initially empty.
*/
public DelayQueue() {}

构造器为空

1
2
3
4
5
6
7
8
9
10
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;

/**
* Condition signalled when a newer element becomes available
* at the head of the queue or a new thread may need to
* become leader.
*/
private final Condition available = lock.newCondition();

一个内部的锁,一个存储用的优先级队列,存储对象E,要继承Delayed,一个leader用于当前优先级最高要执行的线程。available用于等待的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Inserts the specified element into this delay queue.
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

主要的插入的方法是offer,使用lock锁住,存取使用同一个锁,将新元素添加之前的优先级队列中,如果添加的新元素根据优先级排列在第一位,就要清空原来要执行leader线程,不然你没法作为优先级最好的去执行,然后随便唤醒一个线程就可以执行优先级最高的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

Take是获取到时间的元素,不断自旋,进来先取出优先级队列的第一个,如果优先级队列是空,就在available变量里等待。
如果队列里有元素,就计算纳秒的延迟时间。如果这个时间小于0,就弹出元素,然后返回,这个是推出自旋的出口。
如果还没到延迟时间,就看leader是不是有值,代表只不是有其他正要执行的线程,如果有就等待,如果没有,就将当前线程作为执行线程,同时等待延迟时间,到时间后唤醒,重新自旋,判断是否退出。
这个主要问题就是插入优先级高于正要执行的当前线程,就需要重置leader重新判断哪个实现执行。
最后没有leader线程再执行,就再唤醒下一个available的等待线程
总结:基于优先级队列,需要自己继承Delay,实现比较、计算延迟时间的方法。通过等待与唤醒来按时间执行。

Contents