From 3304e6ffe069a6015e04de23a4cebfd7c2657e8f Mon Sep 17 00:00:00 2001 From: Franck LECUYER Date: Fri, 19 Feb 2021 08:49:47 +0100 Subject: [PATCH 1/5] Modifications after some code review remarks Signed-off-by: Franck LECUYER --- .../service/SecurityAnalysisService.java | 2 +- .../SecurityAnalysisWorkerService.java | 120 +++++++++--------- 2 files changed, 61 insertions(+), 61 deletions(-) diff --git a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisService.java b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisService.java index dd8621f7..d4e339e1 100644 --- a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisService.java +++ b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisService.java @@ -74,6 +74,6 @@ public Mono setStatus(UUID resultUuid, String status) { public Mono stop(UUID resultUuid, String receiver) { return Mono.fromRunnable(() -> - cancelPublisherService.publish(new SecurityAnalysisCancelContext(resultUuid, receiver))).then(); + cancelPublisherService.publish(new SecurityAnalysisCancelContext(resultUuid, receiver))); } } diff --git a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java index 0a8503b6..231acdae 100644 --- a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java +++ b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java @@ -36,6 +36,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; +import java.util.logging.Level; import java.util.stream.Collectors; import static org.gridsuite.securityanalysis.server.service.SecurityAnalysisStoppedPublisherService.CANCEL_MESSAGE; @@ -141,79 +142,78 @@ private Mono run(SecurityAnalysisRunContext context, UUI return Mono.zip(network, contingencies) .flatMap(tuple -> { - SecurityAnalysis securityAnalysis = configService.getSecurityAnalysisFactory().create(tuple.getT1(), LocalComputationManager.getDefault(), 0); - CompletableFuture future = securityAnalysis.run(VariantManagerConstants.INITIAL_VARIANT_ID, context.getParameters(), n -> tuple.getT2()); - if (resultUuid != null) { - futures.put(resultUuid, future); - } if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) { return Mono.empty(); } else { + SecurityAnalysis securityAnalysis = configService.getSecurityAnalysisFactory().create(tuple.getT1(), LocalComputationManager.getDefault(), 0); + CompletableFuture future = securityAnalysis.run(VariantManagerConstants.INITIAL_VARIANT_ID, context.getParameters(), n -> tuple.getT2()); + if (resultUuid != null) { + futures.put(resultUuid, future); + } return Mono.fromCompletionStage(future); } }); } @Bean - public Consumer> consumeRun() { - return message -> { - - SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper); - runRequests.add(resultContext.getResultUuid()); - - run(resultContext.getRunContext(), resultContext.getResultUuid()) - .flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result) - .then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name())) - .then(Mono.just(result))) - .doOnSuccess(result -> { - if (result != null) { // result available - resultPublisherService.publish(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver()); - LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid()); - } else { // result not available : stop computation request - if (cancelComputationRequests.get(resultContext.getResultUuid()) != null) { - stoppedPublisherService.publishCancel(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver()); - } - } - }) - .onErrorResume(throwable -> { - if (!(throwable instanceof CancellationException)) { - LOGGER.error(FAIL_MESSAGE, throwable); - stoppedPublisherService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(), throwable.getMessage()); - return resultRepository.delete(resultContext.getResultUuid()).then(Mono.empty()); - } - return Mono.empty(); - }) - .doFinally(s -> { - futures.remove(resultContext.getResultUuid()); - cancelComputationRequests.remove(resultContext.getResultUuid()); - runRequests.remove(resultContext.getResultUuid()); - }) - .subscribe(); - }; + public Consumer>> consumeRun() { + return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE) + .flatMap(message -> { + SecurityAnalysisResultContext resultContext = SecurityAnalysisResultContext.fromMessage(message, objectMapper); + runRequests.add(resultContext.getResultUuid()); + + return run(resultContext.getRunContext(), resultContext.getResultUuid()) + .flatMap(result -> resultRepository.insert(resultContext.getResultUuid(), result) + .then(resultRepository.insertStatus(resultContext.getResultUuid(), SecurityAnalysisStatus.COMPLETED.name())) + .then(Mono.just(result))) + .doOnSuccess(result -> { + if (result != null) { // result available + resultPublisherService.publish(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver()); + LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid()); + } + }) + .onErrorResume(throwable -> { + if (!(throwable instanceof CancellationException)) { + LOGGER.error(FAIL_MESSAGE, throwable); + stoppedPublisherService.publishFail(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver(), throwable.getMessage()); + return resultRepository.delete(resultContext.getResultUuid()).then(Mono.empty()); + } + return Mono.empty(); + }) + .doFinally(s -> { + futures.remove(resultContext.getResultUuid()); + cancelComputationRequests.remove(resultContext.getResultUuid()); + runRequests.remove(resultContext.getResultUuid()); + }); + }) + .onErrorContinue((t, r) -> LOGGER.error("Exception in consumeRun", t)) + .subscribe(); } @Bean - public Consumer> consumeCancel() { - return message -> { - SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message); + public Consumer>> consumeCancel() { + return f -> f.log(CATEGORY_BROKER_INPUT, Level.FINE) + .flatMap(message -> { + SecurityAnalysisCancelContext cancelContext = SecurityAnalysisCancelContext.fromMessage(message); - if (runRequests.contains(cancelContext.getResultUuid())) { - cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext); - } + if (runRequests.contains(cancelContext.getResultUuid())) { + cancelComputationRequests.put(cancelContext.getResultUuid(), cancelContext); + } - // find the completableFuture associated with result uuid - CompletableFuture future = futures.get(cancelContext.getResultUuid()); - if (future != null) { - future.cancel(true); // cancel computation in progress - - resultRepository.delete(cancelContext.getResultUuid()) - .doOnSuccess(unused -> { - stoppedPublisherService.publishCancel(cancelContext.getResultUuid(), cancelContext.getReceiver()); - LOGGER.info(CANCEL_MESSAGE + " (resultUuid='{}')", cancelContext.getResultUuid()); - }) - .doOnError(throwable -> LOGGER.error(throwable.toString(), throwable)) - .subscribe(); - } - }; + // find the completableFuture associated with result uuid + CompletableFuture future = futures.get(cancelContext.getResultUuid()); + if (future != null) { + future.cancel(true); // cancel computation in progress + + return resultRepository.delete(cancelContext.getResultUuid()) + .doOnSuccess(unused -> { + stoppedPublisherService.publishCancel(cancelContext.getResultUuid(), cancelContext.getReceiver()); + LOGGER.info(CANCEL_MESSAGE + " (resultUuid='{}')", cancelContext.getResultUuid()); + }); + } + return Mono.empty(); + }) + .onErrorContinue((t, r) -> LOGGER.error("Exception in consumeCancel", t)) + .subscribe(); } } From 8f06d0a25bd77f7e5e9636a492af969f1a797669 Mon Sep 17 00:00:00 2001 From: Franck LECUYER Date: Fri, 19 Feb 2021 10:46:56 +0100 Subject: [PATCH 2/5] Modifications after some code review remarks Signed-off-by: Franck LECUYER --- .../server/service/SecurityAnalysisWorkerService.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java index 231acdae..c4b85caa 100644 --- a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java +++ b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java @@ -186,7 +186,11 @@ public Consumer>> consumeRun() { runRequests.remove(resultContext.getResultUuid()); }); }) - .onErrorContinue((t, r) -> LOGGER.error("Exception in consumeRun", t)) + .onErrorContinue((t, r) -> { + if (!(t instanceof CancellationException)) { + LOGGER.error("Exception in consumeRun", t); + } + }) .subscribe(); } From 04111fd819f44e8bb07ba8d62bcffdad384f3941 Mon Sep 17 00:00:00 2001 From: Franck LECUYER Date: Wed, 24 Feb 2021 11:35:37 +0100 Subject: [PATCH 3/5] Add some logs Add some tests for computation cancel requests Signed-off-by: Franck LECUYER --- .../SecurityAnalysisWorkerService.java | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java index c4b85caa..84a03fc8 100644 --- a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java +++ b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java @@ -104,13 +104,19 @@ private Mono getNetwork(UUID networkUuid) { .subscribeOn(Schedulers.boundedElastic()); } - private Mono getNetwork(UUID networkUuid, List otherNetworkUuids) { + private Mono getNetwork(UUID networkUuid, List otherNetworkUuids, UUID resultUuid) { Mono network = getNetwork(networkUuid); if (otherNetworkUuids.isEmpty()) { return network; } else { Mono> otherNetworks = Flux.fromIterable(otherNetworkUuids) - .flatMap(this::getNetwork) + .flatMap(uuid -> { + if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) { + return Mono.empty(); + } else { + return getNetwork(uuid); + } + }) .collectList(); return Mono.zip(network, otherNetworks) .map(t -> { @@ -134,7 +140,19 @@ private Mono run(SecurityAnalysisRunContext context, UUI LOGGER.info("Run security analysis on contingency lists: {}", context.getContingencyListNames().stream().map(SecurityAnalysisWorkerService::sanitizeParam).collect(Collectors.toList())); - Mono network = getNetwork(context.getNetworkUuid(), context.getOtherNetworkUuids()); + if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) { + return Mono.empty(); + } + + LOGGER.info("Loading networks"); + + Mono network = getNetwork(context.getNetworkUuid(), context.getOtherNetworkUuids(), resultUuid); + + if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) { + return Mono.empty(); + } + + LOGGER.info("Loading contingencies"); Mono> contingencies = Flux.fromIterable(context.getContingencyListNames()) .flatMap(contingencyListName -> actionsService.getContingencyList(contingencyListName, context.getNetworkUuid())) @@ -150,6 +168,7 @@ private Mono run(SecurityAnalysisRunContext context, UUI if (resultUuid != null) { futures.put(resultUuid, future); } + LOGGER.info("Starting security analysis computation"); return Mono.fromCompletionStage(future); } }); @@ -170,6 +189,11 @@ public Consumer>> consumeRun() { if (result != null) { // result available resultPublisherService.publish(resultContext.getResultUuid(), resultContext.getRunContext().getReceiver()); LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid()); + } else { // result not available : stop computation request + if (cancelComputationRequests.get(resultContext.getResultUuid()) != null) { + stoppedPublisherService.publish(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver()); + LOGGER.info("Security analysis stopped (resultUuid='{}')", resultContext.getResultUuid()); + } } }) .onErrorResume(throwable -> { From 2ee713b01513d3c3602e36088a6f8ebf4de7d415 Mon Sep 17 00:00:00 2001 From: Franck LECUYER Date: Fri, 12 Mar 2021 13:23:51 +0100 Subject: [PATCH 4/5] Resolving conflict and adaptation to other merge PR 'bug fix running status remain when error occur' Signed-off-by: Franck LECUYER --- .../server/service/SecurityAnalysisWorkerService.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java index 84a03fc8..5bd34d6a 100644 --- a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java +++ b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java @@ -191,7 +191,7 @@ public Consumer>> consumeRun() { LOGGER.info("Security analysis complete (resultUuid='{}')", resultContext.getResultUuid()); } else { // result not available : stop computation request if (cancelComputationRequests.get(resultContext.getResultUuid()) != null) { - stoppedPublisherService.publish(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver()); + stoppedPublisherService.publishCancel(resultContext.getResultUuid(), cancelComputationRequests.get(resultContext.getResultUuid()).getReceiver()); LOGGER.info("Security analysis stopped (resultUuid='{}')", resultContext.getResultUuid()); } } @@ -210,11 +210,6 @@ public Consumer>> consumeRun() { runRequests.remove(resultContext.getResultUuid()); }); }) - .onErrorContinue((t, r) -> { - if (!(t instanceof CancellationException)) { - LOGGER.error("Exception in consumeRun", t); - } - }) .subscribe(); } From c0ae9a9e1a33b75b7b2c12481632403d89ffa274 Mon Sep 17 00:00:00 2001 From: Franck LECUYER Date: Wed, 24 Mar 2021 09:19:11 +0100 Subject: [PATCH 5/5] Modifications after some code review remarks Signed-off-by: Franck LECUYER --- .../SecurityAnalysisWorkerService.java | 52 ++++++++----------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java index 5bd34d6a..2ebaf0a8 100644 --- a/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java +++ b/src/main/java/org/gridsuite/securityanalysis/server/service/SecurityAnalysisWorkerService.java @@ -104,30 +104,24 @@ private Mono getNetwork(UUID networkUuid) { .subscribeOn(Schedulers.boundedElastic()); } - private Mono getNetwork(UUID networkUuid, List otherNetworkUuids, UUID resultUuid) { + private Mono getNetwork(UUID networkUuid, List otherNetworkUuids) { Mono network = getNetwork(networkUuid); if (otherNetworkUuids.isEmpty()) { return network; } else { Mono> otherNetworks = Flux.fromIterable(otherNetworkUuids) - .flatMap(uuid -> { - if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) { - return Mono.empty(); - } else { - return getNetwork(uuid); - } - }) - .collectList(); + .flatMap(this::getNetwork) + .collectList(); return Mono.zip(network, otherNetworks) - .map(t -> { - // creation of the merging view - List networks = new ArrayList<>(); - networks.add(t.getT1()); - networks.addAll(t.getT2()); - MergingView mergingView = MergingView.create("merge", "iidm"); - mergingView.merge(networks.toArray(new Network[0])); - return mergingView; - }); + .map(t -> { + // creation of the merging view + List networks = new ArrayList<>(); + networks.add(t.getT1()); + networks.addAll(t.getT2()); + MergingView mergingView = MergingView.create("merge", "iidm"); + mergingView.merge(networks.toArray(new Network[0])); + return mergingView; + }); } } @@ -138,24 +132,20 @@ public Mono run(SecurityAnalysisRunContext context) { private Mono run(SecurityAnalysisRunContext context, UUID resultUuid) { Objects.requireNonNull(context); - LOGGER.info("Run security analysis on contingency lists: {}", context.getContingencyListNames().stream().map(SecurityAnalysisWorkerService::sanitizeParam).collect(Collectors.toList())); - if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) { return Mono.empty(); } - LOGGER.info("Loading networks"); - - Mono network = getNetwork(context.getNetworkUuid(), context.getOtherNetworkUuids(), resultUuid); - - if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) { - return Mono.empty(); - } - - LOGGER.info("Loading contingencies"); + Mono network = getNetwork(context.getNetworkUuid(), context.getOtherNetworkUuids()).map(uuid -> { + LOGGER.info("Loading network"); + return uuid; + }); Mono> contingencies = Flux.fromIterable(context.getContingencyListNames()) - .flatMap(contingencyListName -> actionsService.getContingencyList(contingencyListName, context.getNetworkUuid())) + .flatMap(contingencyListName -> { + LOGGER.info("Loading contingencies"); + return actionsService.getContingencyList(contingencyListName, context.getNetworkUuid()); + }) .collectList(); return Mono.zip(network, contingencies) @@ -163,12 +153,12 @@ private Mono run(SecurityAnalysisRunContext context, UUI if (resultUuid != null && cancelComputationRequests.get(resultUuid) != null) { return Mono.empty(); } else { + LOGGER.info("Starting security analysis on contingency lists: {}", context.getContingencyListNames().stream().map(SecurityAnalysisWorkerService::sanitizeParam).collect(Collectors.toList())); SecurityAnalysis securityAnalysis = configService.getSecurityAnalysisFactory().create(tuple.getT1(), LocalComputationManager.getDefault(), 0); CompletableFuture future = securityAnalysis.run(VariantManagerConstants.INITIAL_VARIANT_ID, context.getParameters(), n -> tuple.getT2()); if (resultUuid != null) { futures.put(resultUuid, future); } - LOGGER.info("Starting security analysis computation"); return Mono.fromCompletionStage(future); } });