-
Notifications
You must be signed in to change notification settings - Fork 2
/
Node.java
213 lines (191 loc) · 6.21 KB
/
Node.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
import java.util.function.*;
// N-ary Node is the basic block of an N-ary Combining tree.
// It has N places for values, allowing N threads to put in
// their value. The first thread to put its value in the node
// becomes the "active thread", while all others become
// "passive threads". Once a node is full (or timeout), the
// active thread combines the node values and pushes it to
// the parent node. The value it recieves back from parent
// node is the distributed among all threads with proper
// values. Passive threads just wait for these values to be
// distributed, and take the one for them.
class Node<T> {
static final int FREE = 0;
static final int PUSHING = 1;
static final int PULLING = 2;
static final long TIMEOUT = 100;
Node<T> parent;
int state;
int count;
T[] value;
int size;
// FREE: indicates that values are still being inserted
// PUSHING: node is full, combined value being pushed
// PULLED: pulled value from parent being given to threads
// TIMEOUT: max time active thread waits for node to fill
// parent: parent node
// state: either FREE, PUSHING, or PULLING
// count: number of values in node
// value: storage place for values
// size: max. no. of values allowed (arity of node, eg 2)
public Node() {
this(2);
}
@SuppressWarnings("unchecked")
public Node(int n) {
value = (T[]) new Object[n];
parent = null;
state = FREE;
count = 0;
size = n;
}
// Gets node value (only for root node).
public synchronized T get() {
return value[0];
}
// Sets node value (only for root node).
public synchronized void set(T x) {
value[0] = x;
count = 1;
}
// Gets current value, and then updates it.
// x: value to OP (accumulate), op: binary operator
// 1. Wait until node is free.
// 2. Perform get & op based on 3 possible cases.
// 2a. Root node
// 2b. Active thread (first to visit node)
// 2c. Passive thread (visits later)
public synchronized T getAndOp(T x,
BinaryOperator<T> op)
throws InterruptedException {
while (state!=FREE || count==size) wait(); // 1
if (parent==null) return getAndOpRoot(x, op); // 2a
if (count==0) return getAndOpActive(x, op); // 2b
return getAndOpPassive(x, op); // 2c
}
// Performs get & op for root node.
// x: value to OP (accumulate), op: binary operator
// 1. Get old value, by combining (a).
// 2. Empty the node.
// 3. Insert a OP x
// 3. Return old value.
private synchronized T getAndOpRoot(T x,
BinaryOperator<T> op)
throws InterruptedException {
T a = combine(op);
count = 0;
insert(op.apply(a, x));
return a;
}
// Performs get & op for active thread.
// x: value to OP (accumulate), op: binary operator
// 1. Insert value.
// 2. Wait until node is full, or timeout.
// 3. We have the values, so start pushing.
// 4. Combine values into one with OP.
// 5. Push combined value to parent.
// 6. Distribute recieved value for all threads.
// 7. Start the pulling process.
// 8. Decrement count (we have our pulled value).
// 9. Return pulled value.
private synchronized T getAndOpActive(T x,
BinaryOperator<T> op)
throws InterruptedException {
insert(x); // 1
waitUntilFull(TIMEOUT); // 2
state = PUSHING; // 3
T a = combine(op); // 4
T r = parent.getAndOp(a, op); // 5
distribute(r, op); // 6
state = PULLING; // 7
notifyAll(); // 7
decrementCount(); // 8
return r; // 9
}
// Performs get & op for passive thread.
// x: value to OP (accumulate), op: binary operator
// 1. Insert value.
// 2. Wait until active thread has pulled value.
// 3. Decrement count, one pulled value processed.
// 4. If count is 0, the node is free.
// 5. Return value of this thread.
private synchronized T getAndOpPassive(T x,
BinaryOperator<T> op)
throws InterruptedException {
int i = insert(x); // 1
while (state!=PULLING) wait(); // 2
decrementCount(); // 3, 4
return value[i]; // 5
}
// Inserts a value in the node (for a thread).
// x: value to insert
// 1. Wait unit node is free.
// 2. Get index to place value in.
// 3. Place the value.
// 4. Increment number of values in node.
// 5. If node is full, notify active thread.
// 6. Return index where value was placed.
public synchronized int insert(T x)
throws InterruptedException {
while (state!=FREE) wait(); // 1
int i = count; // 2
value[i] = x; // 3
incrementCount(); // 4, 5
return i; // 6
}
// Combine all values in the node (put by threads).
// op: binary operator
// 1. If OP is "+", combine will return sum.
public synchronized T combine(BinaryOperator<T> op) {
T a = value[0]; // 1
for (int i=1; i<count; i++) // 1
a = op.apply(a, value[i]); // 1
return a; // 1
}
// Distribute pulled value to all threads.
// r: pulled value, op: binary operator
// T0: active thread, T1...: passive threads
// 1. T0 receives r
// 2. T1 recieves r OP v0.
// 3. T2 recieves r OP v0 OP v1 ...
public synchronized void distribute(T r,
BinaryOperator<T> op) {
for (int i=0; i<count; i++) { // 1
T x = value[i]; // 1
value[i] = r; // 1
r = op.apply(r, x); // 2
} // 3
}
// Increment count once done with insertion.
// 1. Increment count.
// 2. If node is full, notify active thread.
private synchronized void incrementCount() {
if (++count<size) return; // 1
notifyAll(); // 2
}
// Decrement count once done with distribution.
// 1. Decrement count.
// 2. If count is zero, node is free.
private synchronized void decrementCount() {
if (--count>0) return; // 1
state = FREE; // 2
notifyAll(); // 2
}
// Wait until node is full, or timeout.
// 1. Get start time.
// 2. If node is full, exit.
// 3. Otherwise, wait with timeout.
// 4. On waking up, check current time.
// 5. Reduce timeout by the elapsed time.
// 6. If timeout done, exit (else retry 2).
private void waitUntilFull(long w)
throws InterruptedException {
long t0 = System.currentTimeMillis(); //1
while (count < size) { // 2
wait(w); // 3
long t = System.currentTimeMillis(); // 4
w -= t - t0; // 5
if (w<=0) break; // 6
}
}
}