-
Notifications
You must be signed in to change notification settings - Fork 0
/
LockedQueue.java
154 lines (142 loc) · 4.18 KB
/
LockedQueue.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import java.util.*;
import java.util.concurrent.locks.*;
// Locked Queue uses locks and conditions to block
// when queue is empty, or it is full. Just as
// locks are inherently vulnerable to deadlock,
// Condition objects are inherently vulnerable to
// lost wakeups in which one or more threads wait
// forever without realizing that the condition
// for which they are waiting has become true.
//
// This queue signals "not empty" whenever an item
// is added to the queue, and "not full" whenever
// an item is removed from the queue. However,
// consider an optimization, where you only signal
// "not empty" if the queue was empty. Bang! Lost
// wakeup is suddenly possible.
//
// To see how that is possible, consider 2
// consumers A & B and 2 producers C & D. When
// queue is empty and both A & B have to remove(),
// they are blocked until C or D can add(). If C
// add()s, followed by D, only 1 "not empty"
// condition would be active causing C to wakeup,
// but not D.
//
// Hence, one needs to be careful when working with
// both locks and condition objects.
//
// The functionality of this queue is similar to
// BlockingQueue and does not suffer from the lost
// wakeup problem.
class LockedQueue<T> extends AbstractQueue<T> {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
int head, tail, size;
final T[] items;
// lock: central (coarse) lock for queue
// notFull: condition indicating queue not full
// notEmpty: condition indicating queue not empty
// head: items are removed from here
// tail: iterms are added to here
// size: number of items in queue
// items: space for items in queue
@SuppressWarnings("unchecked")
public LockedQueue(int capacity) {
items = (T[]) new Object[capacity];
}
// 1. Acquire lock before any action.
// 2. Wait for queue being not full.
// 3. Add item to queue.
// 4. Release the lock.
@Override
public boolean add(T x) {
lock.lock(); // 1
try { // 2
while (size == items.length) notFull.await();
addUnchecked(x); // 3
}
catch(InterruptedException e) {}
finally {
lock.unlock(); // 4
}
return true;
}
// 1. Acquire lock before any action.
// 2. Wait for queue being not empty.
// 3. Remove item from queue.
// 4. Release the lock.
@Override
public T remove() {
T x = null;
lock.lock(); // 1
try {
while (size == 0) notEmpty.await(); // 2
x = removeUnchecked(); // 3
}
catch(InterruptedException e) {}
finally {
lock.unlock(); // 4
}
return x;
}
// 1. Store item in queue, while locking.
// 2. If no space available, return false.
@Override
public boolean offer(T x) {
lock.lock(); // 1
if (size == items.length) return false; // 2
addUnchecked(x); // 1
lock.unlock(); // 1
return true;
}
// 1. Peek item in queue, without removing.
// 2. If no item exists, return null.
@Override
public T peek() {
return size > 0? items[head] : null; // 1, 2
}
// 1. Remove item from queue, while locking.
// 2. If no items exists, return null.
@Override
public T poll() {
lock.lock(); // 1
if (size == 0) return null; // 2
T x = removeUnchecked(); // 1
lock.unlock(); // 1
return x;
}
// 1. Store item at the tail end.
// 2. Move tail to the next free slot.
// 3. Signal that queue is not empty.
private void addUnchecked(T x) {
items[tail] = x; // 1
if (++tail == items.length) tail = 0; // 2
++size; // 2
notEmpty.signal(); // 3
}
// 1. Fetch item at the head of queue.
// 2. Move head to the next item.
// 3. Signal that queue is not full.
private T removeUnchecked() {
T x = items[head]; // 1
if (++head == items.length) head = 0; // 2
--size; // 2
notFull.signal(); // 3
return x;
}
@Override
public Iterator<T> iterator() {
lock.lock();
Collection<T> a = new ArrayList<>();
for (int i=0; i<size; i++)
a.add(items[(head+i) % size]);
lock.unlock();
return a.iterator();
}
@Override
public int size() {
return size;
}
}