21
21
import gr .mmichaildis .amqprunner .util .PortExtractingLauncherListener ;
22
22
import io .vavr .collection .Stream ;
23
23
import io .vavr .control .Option ;
24
+ import io .vavr .control .Try ;
24
25
import lombok .extern .slf4j .Slf4j ;
25
26
import org .apache .qpid .server .SystemLauncher ;
26
27
import org .apache .qpid .server .SystemLauncherListener .DefaultSystemLauncherListener ;
27
28
28
29
import java .io .File ;
29
30
import java .net .URL ;
31
+ import java .util .Map ;
30
32
import java .util .*;
31
33
34
+ import static gr .mmichaildis .amqprunner .util .StreamHelpers .not ;
35
+ import static gr .mmichaildis .amqprunner .util .StreamHelpers .replaceWith ;
36
+ import static io .vavr .API .*;
32
37
import static java .lang .Thread .sleep ;
33
38
import static org .junit .Assert .assertNotNull ;
34
39
import static org .junit .Assert .fail ;
@@ -47,7 +52,11 @@ public class BrokerManager {
47
52
private static final Long SLEEP_STEP = 100L ;
48
53
private final String introduction ;
49
54
50
- private static ReferenceHolder refHolder ;
55
+ /**
56
+ * The path that contains the configuration file for qpid initialization.
57
+ */
58
+ private static final String INITIAL_CONFIG_PATH = "amqp.json" ;
59
+ private static final ReferenceHolder refHolder = new ReferenceHolder ();
51
60
52
61
private final String username ;
53
62
private final String password ;
@@ -82,8 +91,9 @@ public BrokerManager(final String username,
82
91
this .requestedAmqpPort = requestedAmqpPort ;
83
92
this .requestedWorkPath = requestedWorkPath ;
84
93
this .requestedLogPath = requestedLogPath ;
85
- refHolder = new ReferenceHolder ();
86
- refHolder .setCleanUpList (Collections .synchronizedList (new LinkedList <>()));
94
+
95
+ refHolder .setQueueCleanUpList (Collections .synchronizedList (new LinkedList <>()));
96
+ refHolder .setExchangeCleanUpList (Collections .synchronizedList (new LinkedList <>()));
87
97
88
98
introduction = "[BrokerManager" + (name .isEmpty () ? "" : "-" + name ) + "] " ;
89
99
// this.systemLauncher = new SystemLauncher();
@@ -97,11 +107,6 @@ public BrokerManager(final String username,
97
107
this .uuid = UUID .randomUUID ();
98
108
}
99
109
100
- /**
101
- * The path that contains the configuration file for qpid initialization.
102
- */
103
- private static final String INITIAL_CONFIG_PATH = "amqp.json" ;
104
- private static final String INITIAL_CONFIG_PATH_NETWORK = "amqpNetwork.json" ;
105
110
106
111
/**
107
112
* Start the broker with the properties that was initialized with.
@@ -160,15 +165,15 @@ public void stopBroker() {
160
165
systemLauncher .shutdown ();
161
166
log .info ("SystemLauncher shutdown complete. Cleaning up." );
162
167
163
- File db = new File (requestedWorkPath + uuid );
164
- File log = new File (requestedLogPath + uuid );
168
+ final File db = new File (requestedWorkPath + uuid );
169
+ final File log = new File (requestedLogPath + uuid );
165
170
166
171
Stream .of (db , log )
167
172
.filter (File ::exists )
168
173
.forEach (BrokerManager ::deleteFolder );
169
174
170
175
try {
171
- Thread . sleep (5000 );
176
+ sleep (5000 );
172
177
} catch (InterruptedException e ) {
173
178
e .printStackTrace ();
174
179
}
@@ -200,22 +205,27 @@ private static void deleteFolder(File folder) {
200
205
201
206
private static void deleteFile (File file , Integer retryStep ) {
202
207
try {
203
- Thread . sleep (2500 * retryStep );
208
+ sleep (2500 * retryStep );
204
209
} catch (InterruptedException e ) {
205
210
e .printStackTrace ();
206
211
}
207
- if (!file .delete () && retryStep < 3 ) {
208
- deleteFile (file , retryStep + 1 );
209
- } else {
210
- log .error ("File {} failed to be deleted after {} retries" , file , retryStep );
211
- }
212
+ Try .run (() -> sleep (2500 * retryStep ))
213
+ .filter (ignore -> retryStep < 3 )
214
+ .map (ignore -> file .delete ())
215
+ .filter (not (Boolean ::booleanValue ))
216
+ .map (replaceWith (retryStep < 3 ))
217
+ .forEach (hasMoreSteps -> Match (hasMoreSteps ).of (
218
+ Case ($ (true ), run (() -> deleteFile (file , retryStep + 1 ))),
219
+ Case ($ (false ), run (() -> log .error ("File {} failed to be deleted after {} retries" , file , retryStep )))
220
+ ));
212
221
}
213
222
214
223
/**
215
224
* Cleans up all the queues and exchanges in the broker.
216
225
*/
217
226
public void cleanUp () {
218
- refHolder .getCleanUpList ().forEach (r -> r .apply (null ));
227
+ refHolder .getQueueCleanUpList ().forEach (r -> r .apply (null ));
228
+ refHolder .getExchangeCleanUpList ().forEach (r -> r .apply (null ));
219
229
}
220
230
221
231
private Map <String , Object > createSystemConfig () {
0 commit comments