Contents

基于链表的有界阻塞队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Linked list node class
*/
static class Node<E> {
E item;

/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;

Node(E x) { item = x; }
}

基于单链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;

/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

capacity是用于记录容量 ,首尾两个指针,head和last。基于2个锁,一个takeLock锁,notEmpty条件变量,在空时候用。一个putLock锁,notFull条件变量,在满的时候用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

初始化可以赋值容量,新建哨兵头指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
/**
* Links node at end of queue.
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}

如果队列容量等于最大容量,线程要在notFull 条件变量下等待。
如果没满,就调用enqueue,进行入站。
如果添加队列后小于容量,就在notFull里唤醒一个线程。
这里面使用putLock进行加锁,不影响出站。
由于是2个锁,只有在容量空的时候,才会去唤醒takeLock的notEmpty等待队列。
使用的模型与一个锁的不同,2个锁的模型,需要通过原子操作的count来判断是不是唤醒,小于容量就可以唤醒notFull等待队列,最后如果容量等于0,就去takeLock的锁去唤醒notEmpty等待队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

出队列,与入队列思路相同,使用takeLock锁,如果count为0,就进入notEmpty队列等待。
dequeue就是将头删除,原来的first元素,变成哨兵指针。
如果count 大于1还可以继续take,唤醒notEmpty队列的线程。最后如果修改之前count等于容量,唤醒notFull队列,还可以put。
offer和poll和put和take原理相同,只是他们不是阻塞的,直接返回布尔值。

总结:基于单链表的阻塞单端队列,入队出队2个独立的锁,通过原子操作count,来处理状态。

Contents