Contents

ArrayBlockingQueue是一个有界的阻塞队列,底层实现是一个数组+two-condition并发控制,队列实现使用的是循环数组,并发控制使用ReentrantLock加notEmpty,notFull的condition,如果队列满或空,阻塞线程放入等待队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;
/**
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
transient Itrs itrs = null;

item就是实际存储的数组,takeIndex 是弹出的索引,putIndex是压入的索引,count是队列的实际存储的数量。这3个变量配合,count用于判断队列是否空,是否满,takeIndex出队列,putIndex入队列。
lock、notEmpty、notFull并发控制,iters是遍历循环,暂时不讲。

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

初始化数组大小,初始化锁,可以传入公平锁还是非公平锁,创建不空、不满条件,给管程等待队列用。
入队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and throwing an
* {@code IllegalStateException} if this queue is full.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if this queue is full
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return super.add(e);
}
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}

Add、offer(不带超时)方法如果队列满,就会返回false,不会阻塞线程。put、offer(带超时)方法会阻塞线程。offer(带超时)通过中断实现超时,从而按时间返回。
enqueue是实际入队列的方法,通过putIndex索引,加入数组,如果putIndex大于数组的长度,从0位置再开始,是一个循环队列。如果入队列了代表,出队列因为数组空的notEmpty阻塞的线程可以重新去执行了,所以notEmpty signal唤醒。 循环队列的空与满,不是通过putIndex与takIndex的位置关系判断了,而是使用了一个新的变量count去判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

与入站的构造很类似,poll锁不可中断,take与poll(带超时)都可以中断。
总结:ArrayBlocking 是一个有界循环队列,并发控制使用ReentrantLock的two-condition,队列空阻塞出队列,队列满阻塞入队列。

Contents