Skip to content

Commit

Permalink
Added: Check export execution status
Browse files Browse the repository at this point in the history
  • Loading branch information
djuarezgf committed May 22, 2024
1 parent 37ce9c2 commit b58b9c1
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 35 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [0.0.1 - 2024-04-18]
## [0.0.1 - 2024-05-22]
### Added
- First version of the project
- Spring Application
Expand Down Expand Up @@ -133,3 +133,4 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Exporter Job
- Check Queries in Exporter Job
- Max time to wait focus task in minutes
- Check export execution status
16 changes: 13 additions & 3 deletions src/main/java/de/samply/exporter/ExporterJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public void checkExports() {
checkQueriesToSend(),
checkQueriesToSendAndExecute(),
checkQueriesAlreadySent(),
checkQueriesAlreadySentToBeExecuted()).block();
checkQueriesAlreadySentToBeExecuted(),
checkQueriesAlreadyExecutingStep1(),
checkQueriesAlreadyExecutingStep2()).block();

}
}
Expand All @@ -55,11 +57,19 @@ private Mono<Void> checkQueriesToSendAndExecute() {
}

private Mono<Void> checkQueriesAlreadySent() {
return checkQueries(QueryState.SENDING, QueryState.FINISHED, exporterService::checkIfQueryIsAlreadySent);
return checkQueries(QueryState.SENDING, QueryState.FINISHED, exporterService::checkIfQueryIsAlreadySentOrExecuted);
}

private Mono<Void> checkQueriesAlreadyExecutingStep1() {
return checkQueries(QueryState.EXPORT_RUNNING_1, QueryState.EXPORT_RUNNING_2, exporterService::checkExecutionStatus);
}

private Mono<Void> checkQueriesAlreadyExecutingStep2() {
return checkQueries(QueryState.EXPORT_RUNNING_2, QueryState.FINISHED, exporterService::checkIfQueryIsAlreadySentOrExecuted);
}

private Mono<Void> checkQueriesAlreadySentToBeExecuted() {
return checkQueries(QueryState.SENDING_AND_EXECUTING, QueryState.FINISHED, exporterService::checkIfQueryIsAlreadySent, Optional.of(
return checkQueries(QueryState.SENDING_AND_EXECUTING, QueryState.EXPORT_RUNNING_1, exporterService::checkIfQueryIsAlreadySentOrExecuted, Optional.of(
exporterServiceResult ->
exporterService.fetchExporterExecutionIdFromExporterResponse(exporterServiceResult.result()).ifPresent(exportExecutionId ->
exporterServiceResult.projectBridgehead().setExporterExecutionId(exportExecutionId))));
Expand Down
113 changes: 89 additions & 24 deletions src/main/java/de/samply/exporter/ExporterService.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import de.samply.exporter.focus.FocusQuery;
import de.samply.exporter.focus.FocusService;
import de.samply.exporter.focus.FocusServiceException;
import de.samply.exporter.focus.TaskType;
import de.samply.notification.NotificationService;
import de.samply.notification.OperationType;
import de.samply.project.ProjectType;
Expand Down Expand Up @@ -90,25 +91,34 @@ public ExporterService(

public Mono<ExporterServiceResult> sendQueryToBridgehead(ProjectBridgehead projectBridgehead) throws ExporterServiceException {
log.info("Sending query of project " + projectBridgehead.getProject().getCode() + " to bridgehead " + projectBridgehead.getBridgehead() + " ...");
return postRequest(projectBridgehead, generateFocusBody(projectBridgehead, false), true);
TaskType taskType = TaskType.CREATE;
return postRequest(projectBridgehead, generateFocusBody(projectBridgehead, taskType), taskType);
}

public Mono<ExporterServiceResult> sendQueryToBridgeheadAndExecute(ProjectBridgehead projectBridgehead) throws ExporterServiceException {
log.info("Sending query of project " + projectBridgehead.getProject().getCode() + " to bridgehead " + projectBridgehead.getBridgehead() + " to be executed...");
return postRequest(projectBridgehead, generateFocusBody(projectBridgehead, true), true);
TaskType taskType = TaskType.EXECUTE;
return postRequest(projectBridgehead, generateFocusBody(projectBridgehead, taskType), taskType);
}

private Mono<ExporterServiceResult> postRequest(ProjectBridgehead projectBridgehead, FocusQuery focusQuery, boolean toBeExecuted) {
public Mono<ExporterServiceResult> checkExecutionStatus(ProjectBridgehead projectBridgehead) {
log.info("Checking export execution status of project " + projectBridgehead.getProject().getCode() + " in bridgehead " + projectBridgehead.getBridgehead());
TaskType taskType = TaskType.STATUS;
return postRequest(projectBridgehead, generateFocusBody(projectBridgehead, taskType), taskType);
}

private Mono<ExporterServiceResult> postRequest(ProjectBridgehead projectBridgehead, FocusQuery focusQuery, TaskType taskType) {
return webClient.post()
.uri(uriBuilder -> uriBuilder.path(ProjectManagerConst.FOCUS_TASK_PATH).build())
.header(HttpHeaders.AUTHORIZATION, fetchAuthorization())
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(focusQuery)
.exchangeToMono(clientResponse -> {
if (clientResponse.statusCode().equals(HttpStatus.CREATED)) {
createBridgeheadNotification(HttpStatus.OK, null, projectBridgehead, projectBridgehead.getExporterUser(), fetchBridgeheadOperationType(toBeExecuted));
if (clientResponse.statusCode().equals(HttpStatus.OK) || clientResponse.statusCode().equals(HttpStatus.CREATED)) {
fetchBridgeheadOperationType(taskType).ifPresent(operationType ->
createBridgeheadNotification((HttpStatus) clientResponse.statusCode(), null, projectBridgehead, projectBridgehead.getExporterUser(), operationType));
resetProjectBridgeheadDataShield(projectBridgehead);
return Mono.just(new ExporterServiceResult(projectBridgehead, focusQuery.getId()));
return Mono.just(new ExporterServiceResult(projectBridgehead, focusService.serializeFocusQuery(focusQuery)));
} else {
log.error("Http Error " + clientResponse.statusCode() + " posting task " + focusQuery.getId() + " : " + focusQuery.getBody() +
" for project " + projectBridgehead.getProject().getCode() + " and bridgehead " + projectBridgehead.getBridgehead());
Expand All @@ -130,8 +140,12 @@ private void createBridgeheadNotification(
projectBridgehead.getProject().getCode(), projectBridgehead.getBridgehead(), email, operationType, null, error, status);
}

private OperationType fetchBridgeheadOperationType(boolean toBeExecuted) {
return (toBeExecuted) ? OperationType.SEND_QUERY_TO_BRIDGEHEAD_AND_EXECUTE : OperationType.SEND_QUERY_TO_BRIDGEHEAD;
private Optional<OperationType> fetchBridgeheadOperationType(TaskType taskType) {
return switch (taskType) {
case CREATE -> Optional.of(OperationType.SEND_QUERY_TO_BRIDGEHEAD);
case EXECUTE -> Optional.of(OperationType.SEND_QUERY_TO_BRIDGEHEAD_AND_EXECUTE);
default -> Optional.empty();
};
}

private String convertToBase64String(Object jsonObject) {
Expand Down Expand Up @@ -160,6 +174,13 @@ ProjectManagerConst.EXPORTER_PARAM_QUERY_CONTEXT, generateQueryContextForExporte
return convertToBase64String(result);
}

private String generateExportStatusInBase64ForExporterRequest(ProjectBridgehead projectBridgehead) {
Map<String, String> result = Map.of(
ProjectManagerConst.EXPORTER_PARAM_QUERY_EXECUTION_ID, projectBridgehead.getExporterExecutionId()
);
return convertToBase64String(result);
}

private String generateExporterQueryInBase64ForExporterCreateQuery(Project project)
throws ExporterServiceException {
Query query = project.getQuery();
Expand All @@ -177,21 +198,23 @@ ProjectManagerConst.EXPORTER_PARAM_QUERY_CONTEXT, generateQueryContextForExporte
return convertToBase64String(result);
}

private FocusQuery generateFocusBody(ProjectBridgehead projectBridgehead, boolean toBeExecuted) throws ExporterServiceException {
private FocusQuery generateFocusBody(ProjectBridgehead projectBridgehead, TaskType taskType) throws ExporterServiceException {
try {
return generateFocusQueryWithoutExceptionHandling(projectBridgehead, toBeExecuted);
return generateFocusQueryWithoutExceptionHandling(projectBridgehead, taskType);
} catch (FocusServiceException e) {
throw new ExporterServiceException(e);
}
}

private FocusQuery generateFocusQueryWithoutExceptionHandling(ProjectBridgehead projectBridgehead, boolean toBeExecuted) throws FocusServiceException {
String exporterQueryInBase64 = (toBeExecuted) ? generateExportQueryInBase64ForExporterRequest(projectBridgehead) :
generateExporterQueryInBase64ForExporterCreateQuery(projectBridgehead.getProject());
return focusService.generateFocusQuery(exporterQueryInBase64, toBeExecuted, projectBridgehead.getBridgehead());
private FocusQuery generateFocusQueryWithoutExceptionHandling(ProjectBridgehead projectBridgehead, TaskType taskType) throws FocusServiceException {
String exporterQueryInBase64 = switch (taskType) {
case CREATE -> generateExporterQueryInBase64ForExporterCreateQuery(projectBridgehead.getProject());
case EXECUTE -> generateExportQueryInBase64ForExporterRequest(projectBridgehead);
case STATUS -> generateExportStatusInBase64ForExporterRequest(projectBridgehead);
};
return focusService.generateFocusQuery(exporterQueryInBase64, taskType, projectBridgehead.getBridgehead());
}


private String convertToString(LocalDate date) {
return (date != null) ? date.format(DateTimeFormatter.ISO_DATE) : null;
}
Expand Down Expand Up @@ -226,25 +249,47 @@ private void resetProjectBridgeheadDataShield(ProjectBridgehead projectBridgehea
}
}

public Mono<ExporterServiceResult> checkIfQueryIsAlreadySent(ProjectBridgehead projectBridgehead) {
public Mono<ExporterServiceResult> checkIfQueryIsAlreadySentOrExecuted(ProjectBridgehead projectBridgehead) {
Optional<FocusQuery> focusQuery = extractFocusQuery(projectBridgehead);
if (focusQuery.isEmpty()) {
throw new RuntimeException("Focus Query not found for project " + projectBridgehead.getProject().getCode() + " and bridgehead " + projectBridgehead.getBridgehead());
}
return webClient.get()
.uri(uriBuilder -> uriBuilder
.path(ProjectManagerConst.FOCUS_TASK_PATH + "/" + projectBridgehead.getExporterResponse() + ProjectManagerConst.FOCUS_TASK_RESULTS_PATH)
.path(ProjectManagerConst.FOCUS_TASK_PATH + "/" + extractTaskId(focusQuery.get()) + ProjectManagerConst.FOCUS_TASK_RESULTS_PATH)
.queryParam(ProjectManagerConst.FOCUS_TASK_WAIT_TIME_PARAM, focusWaitTime)
.queryParam(ProjectManagerConst.FOCUS_TASK_WAIT_COUNT_PARAM, focusWaitCount).build())
.header(HttpHeaders.AUTHORIZATION, fetchAuthorization())
.exchangeToMono(clientResponse -> {
if (clientResponse.statusCode().equals(HttpStatus.OK) || clientResponse.statusCode().equals(HttpStatus.PARTIAL_CONTENT)) {
OperationType operationType = projectBridgehead.getQueryState() == QueryState.SENDING ? OperationType.CHECK_SEND_QUERY : OperationType.CHECK_SEND_AND_EXECUTE_QUERY;
createBridgeheadNotification(HttpStatus.OK, null, projectBridgehead, projectBridgehead.getExporterUser(), operationType);
return clientResponse.bodyToMono(String.class).flatMap(body -> Mono.just(new ExporterServiceResult(projectBridgehead, body)));
Optional<OperationType> operationType = switch (projectBridgehead.getQueryState()) {
case SENDING -> Optional.of(OperationType.CHECK_SEND_QUERY);
case SENDING_AND_EXECUTING -> Optional.of(OperationType.CHECK_SEND_AND_EXECUTE_QUERY);
default -> Optional.empty();
};
operationType.ifPresent(type -> createBridgeheadNotification(HttpStatus.OK, null, projectBridgehead, projectBridgehead.getExporterUser(), type));
return clientResponse.bodyToMono(FocusQuery[].class).filter(focusQueries -> focusQueries != null && focusQueries.length > 0).flatMap(newFocusQuery -> {
if (projectBridgehead.getQueryState() == QueryState.EXPORT_RUNNING_2) {
if (newFocusQuery[0].getBody() == null) {
return Mono.empty();
}
String decodedBody = Base64Utils.decode(newFocusQuery[0].getBody());
if (!decodedBody.contains("OK")) {
if (decodedBody.contains("ERROR")) {
modifyProjectBridgeheadState(projectBridgehead, QueryState.ERROR);
} else {
modifyProjectBridgeheadState(projectBridgehead, QueryState.EXPORT_RUNNING_1);
}
return Mono.empty();
}
}
return Mono.just(new ExporterServiceResult(projectBridgehead, focusService.serializeFocusQuery(newFocusQuery[0])));
});
} else {
log.error("Http Error " + clientResponse.statusCode() + " checking task " + projectBridgehead.getExporterResponse() +
log.error("Http Error " + clientResponse.statusCode() + " checking task " + extractTaskId(focusQuery.get()) +
" for project " + projectBridgehead.getProject().getCode() + " and bridgehead " + projectBridgehead.getBridgehead());
if (isQueryStateToBeChangedToError((HttpStatus) clientResponse.statusCode(), projectBridgehead)) {
projectBridgehead.setQueryState(QueryState.ERROR);
projectBridgehead.setModifiedAt(Instant.now());
projectBridgeheadRepository.save(projectBridgehead);
modifyProjectBridgeheadState(projectBridgehead, QueryState.ERROR);
}
return clientResponse.bodyToMono(String.class).flatMap(errorBody -> {
log.error("Error: {}", errorBody);
Expand All @@ -254,6 +299,26 @@ public Mono<ExporterServiceResult> checkIfQueryIsAlreadySent(ProjectBridgehead p
});
}

private void modifyProjectBridgeheadState(ProjectBridgehead projectBridgehead, QueryState newState) {
projectBridgehead.setQueryState(newState);
projectBridgehead.setModifiedAt(Instant.now());
projectBridgeheadRepository.save(projectBridgehead);
}

private String extractTaskId(FocusQuery focusQuery) {
return (focusQuery.getId() != null) ? focusQuery.getId() : focusQuery.getTask();
}

private Optional<FocusQuery> extractFocusQuery(ProjectBridgehead projectBridgehead) {
if (projectBridgehead.getExporterResponse() != null) {
FocusQuery[] focusQueries = focusService.deserializeFocusResponse(projectBridgehead.getExporterResponse());
if (focusQueries != null && focusQueries.length > 0) {
return Optional.of(focusQueries[0]);
}
}
return Optional.empty();
}

private boolean isQueryStateToBeChangedToError(HttpStatus httpStatus, ProjectBridgehead projectBridgehead) {
if (httpStatus == HttpStatus.NOT_FOUND) {
return Duration.between(projectBridgehead.getModifiedAt(), Instant.now()).toMinutes() > maxTimeToWaitFocusTaskInMinutes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class FocusQueryMetadata {

@JsonProperty("project")
private String project;
@JsonProperty("execute")
private boolean execute;
@JsonProperty("task_type")
private TaskType taskType;

}
17 changes: 13 additions & 4 deletions src/main/java/de/samply/exporter/focus/FocusService.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ public FocusService(
this.bridgeheadConfiguration = bridgeheadConfiguration;
}

public FocusQuery generateFocusQuery(String exporterQuery, boolean toBeExecuted, String bridgehead) throws FocusServiceException {
public FocusQuery generateFocusQuery(String exporterQuery, TaskType taskType, String bridgehead) throws FocusServiceException {
FocusQuery focusQuery = new FocusQuery();
focusQuery.setId(generateId());
focusQuery.setBody(exporterQuery);
focusQuery.setFrom(projectManagerId);
focusQuery.setTo(fetchExporterFocusIds(bridgehead));
focusQuery.setTtl(ttl);
focusQuery.setMetadata(createFocusQueryMetadata(toBeExecuted));
focusQuery.setMetadata(createFocusQueryMetadata(taskType));
focusQuery.setFailureStrategy(failureStrategy);
return focusQuery;
}
Expand All @@ -56,10 +56,10 @@ private String[] fetchExporterFocusIds(String bridgehead) throws FocusServiceExc
return new String[]{focusId};
}

private FocusQueryMetadata createFocusQueryMetadata(boolean toBeExecuted) {
private FocusQueryMetadata createFocusQueryMetadata(TaskType taskType) {
FocusQueryMetadata focusQueryMetadata = new FocusQueryMetadata();
focusQueryMetadata.setProject(ProjectManagerConst.FOCUS_METADATA_PROJECT);
focusQueryMetadata.setExecute(toBeExecuted);
focusQueryMetadata.setTaskType(taskType);
return focusQueryMetadata;
}

Expand All @@ -80,4 +80,13 @@ public FocusQuery[] deserializeFocusResponse(String focusResponse) throws FocusS
}
}

public String serializeFocusQuery(FocusQuery focusQuery) throws FocusServiceException {
try {
FocusQuery[] focusQueries = {focusQuery};
return objectMapper.writeValueAsString(focusQueries);
} catch (JsonProcessingException e) {
throw new FocusServiceException(e);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package de.samply.exporter.focus;

public class FocusServiceException extends Exception{
public class FocusServiceException extends RuntimeException {
public FocusServiceException(String message) {
super(message);
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/de/samply/exporter/focus/TaskType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package de.samply.exporter.focus;

public enum TaskType {
CREATE,
EXECUTE,
STATUS
}
2 changes: 2 additions & 0 deletions src/main/java/de/samply/query/QueryState.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ public enum QueryState {
TO_BE_SENT_AND_EXECUTED,
SENDING,
SENDING_AND_EXECUTING,
EXPORT_RUNNING_1,
EXPORT_RUNNING_2,
ERROR,
FINISHED
}

0 comments on commit b58b9c1

Please sign in to comment.