diff --git a/src/main/java/org/folio/inventoryupdate/importing/moduledata/Channel.java b/src/main/java/org/folio/inventoryupdate/importing/moduledata/Channel.java index 954a9e4..08c0b6d 100644 --- a/src/main/java/org/folio/inventoryupdate/importing/moduledata/Channel.java +++ b/src/main/java/org/folio/inventoryupdate/importing/moduledata/Channel.java @@ -14,6 +14,7 @@ import org.folio.inventoryupdate.importing.moduledata.database.Tables; import org.folio.inventoryupdate.importing.moduledata.database.Util; import org.folio.inventoryupdate.importing.service.delivery.fileimport.FileListeners; +import org.folio.inventoryupdate.importing.service.delivery.fileimport.FileQueue; import org.folio.tlib.postgres.TenantPgPool; public class Channel extends Entity { @@ -28,6 +29,8 @@ public class Channel extends Entity { // virtual (non-db) property public static final String PROPERTY_COMMISSIONED = "commissioned"; private static final Map CHANNEL_FIELDS = new HashMap<>(); + ChannelRecord theRecord; + private FileQueue fileQueue; static { CHANNEL_FIELDS.put(ID, @@ -46,8 +49,6 @@ public class Channel extends Entity { new Field("listening", "listening", PgColumn.Type.BOOLEAN, false, true)); } - ChannelRecord theRecord; - public Channel() { } @@ -75,6 +76,11 @@ public String entityName() { return "Channel"; } + public Channel withFileQueue(FileQueue fileQueue) { + this.fileQueue = fileQueue; + return this; + } + public Channel fromJson(JsonObject channelJson) { return new Channel( getUuidOrGenerate(channelJson.getString(jsonPropertyName(ID))), @@ -130,6 +136,10 @@ public JsonObject asJson() { json.put(jsonPropertyName(ENABLED), theRecord.enabled()); json.put(PROPERTY_COMMISSIONED, isCommissioned()); json.put(jsonPropertyName(LISTENING), theRecord.listening()); + if (fileQueue != null) { + json.put("queuedFiles", fileQueue.size()); + json.put("fileInProcess", fileQueue.fileInProcess()); + } putMetadata(json); return json; } diff --git a/src/main/java/org/folio/inventoryupdate/importing/moduledata/Transformation.java b/src/main/java/org/folio/inventoryupdate/importing/moduledata/Transformation.java index c49051d..163ec25 100644 --- a/src/main/java/org/folio/inventoryupdate/importing/moduledata/Transformation.java +++ b/src/main/java/org/folio/inventoryupdate/importing/moduledata/Transformation.java @@ -137,10 +137,6 @@ public TupleMapper toTemplateParameters() { }); } - - public record TransformationRecord(UUID id, String name, String description) { - } - public Future fetchTransformationSteps(TenantPgPool pool) { return new TransformationStep().fetchTransformationStepsByTransformationId(pool, this.getId()) .map(steps -> { @@ -150,4 +146,7 @@ public Future fetchTransformationSteps(TenantPgPool pool) { return this; }); } + + public record TransformationRecord(UUID id, String name, String description) { + } } diff --git a/src/main/java/org/folio/inventoryupdate/importing/moduledata/TransformationStep.java b/src/main/java/org/folio/inventoryupdate/importing/moduledata/TransformationStep.java index bd215e3..b75c394 100644 --- a/src/main/java/org/folio/inventoryupdate/importing/moduledata/TransformationStep.java +++ b/src/main/java/org/folio/inventoryupdate/importing/moduledata/TransformationStep.java @@ -23,8 +23,9 @@ public class TransformationStep extends Entity { public static final String TRANSFORMATION_ID = "TRANSFORMATION_ID"; public static final String STEP_ID = "STEP_ID"; public static final String POSITION = "POSITION"; - private static final Map FIELDS = new HashMap<>(); public static final String STEP_NAME = "STEP_NAME"; + private static final Map FIELDS = new HashMap<>(); + TransformationStepRecord theRecord; private String stepName = ""; static { @@ -40,8 +41,6 @@ public class TransformationStep extends Entity { new Field("stepName", "step_name", PgColumn.Type.TEXT, true, false).isVirtual()); } - TransformationStepRecord theRecord; - private Integer positionOfLastStepOfTransformation = null; private Integer positionOfTheExistingStep = null; private Integer newPosition = null; @@ -189,7 +188,8 @@ public Future findPositionOfLastStepOfTransformation(TenantPgPool tenan .map(rows -> rows.iterator().next().getInteger("last_position")); } - public Future> fetchTransformationStepsByTransformationId(TenantPgPool tenantPgPool, UUID transformationId) { + public Future> fetchTransformationStepsByTransformationId(TenantPgPool tenantPgPool, + UUID transformationId) { List steps = new ArrayList<>(); return SqlTemplate.forQuery(tenantPgPool.getPool(), "SELECT transformation_step.*, step.name AS step_name " diff --git a/src/main/java/org/folio/inventoryupdate/importing/service/delivery/fileimport/FileQueue.java b/src/main/java/org/folio/inventoryupdate/importing/service/delivery/fileimport/FileQueue.java index ba3ae5e..25cca23 100644 --- a/src/main/java/org/folio/inventoryupdate/importing/service/delivery/fileimport/FileQueue.java +++ b/src/main/java/org/folio/inventoryupdate/importing/service/delivery/fileimport/FileQueue.java @@ -109,6 +109,23 @@ public boolean hasNextFile() { return fs.readDirBlocking(jobPath).stream().map(File::new).anyMatch(File::isFile); } + public int size() { + if (fs.existsBlocking(jobPath)) { + return fs.readDirBlocking(jobPath).stream().map(File::new).filter(File::isFile).toList().size(); + } else { + return -1; + } + } + + public String fileInProcess() { + if (fs.existsBlocking(jobProcessingSlot)) { + return fs.readDirBlocking(jobProcessingSlot).stream().map(File::new) + .filter(File::isFile).findFirst().map(File::getName).orElse("no file in process"); + } else { + return "no file in process"; + } + } + /** * Promotes the next file in the staging directory to the processing directory * and returns true if a staged file was found (and the processing directory was free), otherwise returns false. diff --git a/src/main/java/org/folio/inventoryupdate/importing/service/delivery/fileimport/InventoryBatchUpdater.java b/src/main/java/org/folio/inventoryupdate/importing/service/delivery/fileimport/InventoryBatchUpdater.java index 2a1a32e..27396cd 100644 --- a/src/main/java/org/folio/inventoryupdate/importing/service/delivery/fileimport/InventoryBatchUpdater.java +++ b/src/main/java/org/folio/inventoryupdate/importing/service/delivery/fileimport/InventoryBatchUpdater.java @@ -147,7 +147,7 @@ private Future persistBatch() { if (upsert.statusCode() == 207 && upsert.hasErrorObjects()) { batch.setResponse(upsert); fileProcessor.reporting.reportErrors(batch) - .onFailure(err -> logger.error("Error logging upsert results for batch #{}, {}", + .onFailure(err -> logger.error("Error logging upsert results for batch #{}, {}", batch.getBatchNumber(), err.getMessage())); } fileProcessor.reporting.incrementInventoryMetrics(new InventoryMetrics(upsert.getMetrics())); @@ -165,7 +165,6 @@ private Future persistBatch() { * Persists the deletion, complete the promise when done. * * @param batch The batch of records containing a deletion record - * @param promise The promise of persistBatch */ private Future persistDeletion(BatchOfRecords batch) { long deletionStarted = System.nanoTime(); diff --git a/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Channels.java b/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Channels.java index d4d3e75..b11c498 100644 --- a/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Channels.java +++ b/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Channels.java @@ -48,7 +48,16 @@ public static Future getChannels(ServiceRequest request) { } public static Future getChannelById(ServiceRequest request) { - return getEntityAndRespond(request, new Channel()); + UUID id = UUID.fromString(request.requestParam("id")); + Channel entity = new Channel(); + return request.entityStorage().getEntity(id, entity).onSuccess(instance -> { + if (instance == null) { + responseText(request.routingContext(), 404).end(entity.entityName() + " " + id + " not found."); + } else { + Channel channel = ((Channel) instance).withFileQueue(new FileQueue(request, id.toString())); + responseJson(request.routingContext(), 200).end(channel.asJson().encodePrettily()); + } + }).mapEmpty(); } public static Future putChannel(ServiceRequest request) { diff --git a/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Transformations.java b/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Transformations.java index 7598f17..306ac73 100644 --- a/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Transformations.java +++ b/src/main/java/org/folio/inventoryupdate/importing/service/delivery/respond/Transformations.java @@ -111,9 +111,9 @@ public static Future getTransformationById(ServiceRequest request) { if (transformation == null) { return responseText(request.routingContext(), 404).end("Transformation with ID " + id + " not found."); } else { - return ((Transformation)transformation).fetchTransformationSteps(request.entityStorage().getTenantPool()) + return ((Transformation) transformation).fetchTransformationSteps(request.entityStorage().getTenantPool()) .compose(done -> - responseJson(request.routingContext(),200).end(transformation.asJson().encodePrettily())); + responseJson(request.routingContext(), 200).end(transformation.asJson().encodePrettily())); } }); } diff --git a/src/main/resources/openapi/inventory-import-1.0.yaml b/src/main/resources/openapi/inventory-import-1.0.yaml index d6b55aa..f376929 100644 --- a/src/main/resources/openapi/inventory-import-1.0.yaml +++ b/src/main/resources/openapi/inventory-import-1.0.yaml @@ -1323,6 +1323,14 @@ components: type: string format: uuid description: UUID referencing the transformation pipeline to use. + queuedFiles: + type: integer + description: Count of source files in the queue. + readOnly: true + fileInProcess: + type: string + description: Name of most recent file to be put in process and yet to be completed. + readOnly: true metadata: $ref: "#/components/schemas/metadata" required: