-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSBQuickCNN.java
151 lines (132 loc) · 4.26 KB
/
SBQuickCNN.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package skylinebreaker;
import java.util.LinkedList;
import java.util.Queue;
import preference.csv.optimize.LevelManager;
import preference.exception.PreferenceException;
import skylinebreaker.LSDAbstractTree.LSDAbstractTreeFactory;
import skylinebreaker.SBBase.NearestNeighborOfZero;
import flatlc.inputrelations.FlatLCResultSetWrapper;
import flatlc.levels.FlatLevelCombination;
public final class SBQuickCNN {
public static FlatLevelCombination[] evaluate(final int processes, final LSDAbstractTreeFactory treeFactory, final FlatLCResultSetWrapper anti, final LevelManager manager) throws PreferenceException, InterruptedException
{
SBSingleCNN.nnzero = new NearestNeighborOfZero();
final Queue<Integer> notification = new LinkedList<Integer>();
final class LSDWorker extends Thread
{
FlatLevelCombination[] data = null;
private final FlatLevelCombination init_value;
final int number;
public LSDWorker(final int number, final FlatLevelCombination init_value) throws PreferenceException
{
this.number = number;
this.init_value = init_value;
}
@Override
public void run()
{
try
{
final LSDAbstractTree tree = treeFactory.get(manager, init_value);
while(true)
{
FlatLevelCombination element;
synchronized(anti)
{
if(!anti.hasNext()) break;
element = (FlatLevelCombination) anti.next();
}
tree.add(element);
}
data = new SBSingleCNN(tree, processes).computeSkyline();
}
catch (PreferenceException | InterruptedException e1)
{
e1.printStackTrace();
}
synchronized(notification) { notification.add(number); notification.notifyAll(); }
}
}
//final int processes = Runtime.getRuntime().availableProcessors();
final LSDWorker[] threads = new LSDWorker[processes];
for(int i = 0; i < processes; ++i) threads[i] = new LSDWorker(i, (FlatLevelCombination) anti.next());
for(int i = 0; i < processes; ++i) threads[i].start();
final SkylineMergeQueue skylineMergeQueue = new SkylineMergeQueue(processes);
int finishedThreads = 0;
while(finishedThreads != threads.length)
{
int threadNum;
synchronized(notification)
{
while(notification.isEmpty()) notification.wait();
threadNum = notification.poll();
}
++finishedThreads;
final LSDWorker thread = threads[threadNum];
thread.join();
final FlatLevelCombination[] localSkyline = thread.data;
if(localSkyline != null) skylineMergeQueue.add(localSkyline);
}
skylineMergeQueue.stop();
for(SkylineMergeQueue.SkylineMergeWorker worker : skylineMergeQueue.threads)
{
worker.interrupt();
worker.join();
}
assert skylineMergeQueue.computedLocalSkylines.size() == 1;
return skylineMergeQueue.computedLocalSkylines.poll();
}
final static class SkylineMergeQueue
{
final Queue<FlatLevelCombination[]> computedLocalSkylines = new LinkedList<FlatLevelCombination[]>();
final SkylineMergeWorker[] threads;
private boolean running = true;
public SkylineMergeQueue(final int maxThreads)
{
this.threads = new SkylineMergeWorker[Math.max(maxThreads/2, 1)];
for(int i = 0; i < threads.length; ++i)
{
threads[i] = new SkylineMergeWorker();
threads[i].start();
}
}
public void stop()
{
running = false;
}
public void add(final FlatLevelCombination[] node)
{
synchronized(computedLocalSkylines) {
computedLocalSkylines.add(node);
computedLocalSkylines.notify();
}
}
class SkylineMergeWorker extends Thread
{
@Override
public void run()
{
try
{
while(running || !computedLocalSkylines.isEmpty())
{
FlatLevelCombination[] a;
FlatLevelCombination[] b;
synchronized(computedLocalSkylines)
{
while(computedLocalSkylines.size() < 2)
computedLocalSkylines.wait();
a = computedLocalSkylines.poll();
b = computedLocalSkylines.poll();
}
FlatLevelCombination[] c = SBBase.combineLocalSkyline(a, b);
add(c);
}
}
catch (InterruptedException ignored)
{
}
}
}
}
}