Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.folio.inventoryupdate.importing.moduledata.database.Tables;
import org.folio.inventoryupdate.importing.moduledata.database.Util;
import org.folio.inventoryupdate.importing.service.delivery.fileimport.FileListeners;
import org.folio.inventoryupdate.importing.service.delivery.fileimport.FileQueue;
import org.folio.tlib.postgres.TenantPgPool;

public class Channel extends Entity {
Expand All @@ -28,6 +29,8 @@ public class Channel extends Entity {
// virtual (non-db) property
public static final String PROPERTY_COMMISSIONED = "commissioned";
private static final Map<String, Field> CHANNEL_FIELDS = new HashMap<>();
ChannelRecord theRecord;
private FileQueue fileQueue;

static {
CHANNEL_FIELDS.put(ID,
Expand All @@ -46,8 +49,6 @@ public class Channel extends Entity {
new Field("listening", "listening", PgColumn.Type.BOOLEAN, false, true));
}

ChannelRecord theRecord;

public Channel() {
}

Expand Down Expand Up @@ -75,6 +76,11 @@ public String entityName() {
return "Channel";
}

public Channel withFileQueue(FileQueue fileQueue) {
this.fileQueue = fileQueue;
return this;
}

public Channel fromJson(JsonObject channelJson) {
return new Channel(
getUuidOrGenerate(channelJson.getString(jsonPropertyName(ID))),
Expand Down Expand Up @@ -130,6 +136,10 @@ public JsonObject asJson() {
json.put(jsonPropertyName(ENABLED), theRecord.enabled());
json.put(PROPERTY_COMMISSIONED, isCommissioned());
json.put(jsonPropertyName(LISTENING), theRecord.listening());
if (fileQueue != null) {
json.put("queuedFiles", fileQueue.size());
json.put("fileInProcess", fileQueue.fileInProcess());
}
putMetadata(json);
return json;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,6 @@ public TupleMapper<Entity> toTemplateParameters() {
});
}


public record TransformationRecord(UUID id, String name, String description) {
}

public Future<Transformation> fetchTransformationSteps(TenantPgPool pool) {
return new TransformationStep().fetchTransformationStepsByTransformationId(pool, this.getId())
.map(steps -> {
Expand All @@ -150,4 +146,7 @@ public Future<Transformation> fetchTransformationSteps(TenantPgPool pool) {
return this;
});
}

public record TransformationRecord(UUID id, String name, String description) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ public class TransformationStep extends Entity {
public static final String TRANSFORMATION_ID = "TRANSFORMATION_ID";
public static final String STEP_ID = "STEP_ID";
public static final String POSITION = "POSITION";
private static final Map<String, Field> FIELDS = new HashMap<>();
public static final String STEP_NAME = "STEP_NAME";
private static final Map<String, Field> FIELDS = new HashMap<>();
TransformationStepRecord theRecord;
private String stepName = "";

static {
Expand All @@ -40,8 +41,6 @@ public class TransformationStep extends Entity {
new Field("stepName", "step_name", PgColumn.Type.TEXT, true, false).isVirtual());
}

TransformationStepRecord theRecord;

private Integer positionOfLastStepOfTransformation = null;
private Integer positionOfTheExistingStep = null;
private Integer newPosition = null;
Expand Down Expand Up @@ -189,7 +188,8 @@ public Future<Integer> findPositionOfLastStepOfTransformation(TenantPgPool tenan
.map(rows -> rows.iterator().next().getInteger("last_position"));
}

public Future<List<JsonObject>> fetchTransformationStepsByTransformationId(TenantPgPool tenantPgPool, UUID transformationId) {
public Future<List<JsonObject>> fetchTransformationStepsByTransformationId(TenantPgPool tenantPgPool,
UUID transformationId) {
List<JsonObject> steps = new ArrayList<>();
return SqlTemplate.forQuery(tenantPgPool.getPool(),
"SELECT transformation_step.*, step.name AS step_name "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,23 @@ public boolean hasNextFile() {
return fs.readDirBlocking(jobPath).stream().map(File::new).anyMatch(File::isFile);
}

public int size() {
if (fs.existsBlocking(jobPath)) {
return fs.readDirBlocking(jobPath).stream().map(File::new).filter(File::isFile).toList().size();
} else {
return -1;
}
}

public String fileInProcess() {
if (fs.existsBlocking(jobProcessingSlot)) {
return fs.readDirBlocking(jobProcessingSlot).stream().map(File::new)
.filter(File::isFile).findFirst().map(File::getName).orElse("no file in process");
} else {
return "no file in process";
}
}

/**
* Promotes the next file in the staging directory to the processing directory
* and returns true if a staged file was found (and the processing directory was free), otherwise returns false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private Future<Void> persistBatch() {
if (upsert.statusCode() == 207 && upsert.hasErrorObjects()) {
batch.setResponse(upsert);
fileProcessor.reporting.reportErrors(batch)
.onFailure(err -> logger.error("Error logging upsert results for batch #{}, {}",
.onFailure(err -> logger.error("Error logging upsert results for batch #{}, {}",
batch.getBatchNumber(), err.getMessage()));
}
fileProcessor.reporting.incrementInventoryMetrics(new InventoryMetrics(upsert.getMetrics()));
Expand All @@ -165,7 +165,6 @@ private Future<Void> persistBatch() {
* Persists the deletion, complete the promise when done.
*
* @param batch The batch of records containing a deletion record
* @param promise The promise of persistBatch
*/
private Future<Void> persistDeletion(BatchOfRecords batch) {
long deletionStarted = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,16 @@ public static Future<Void> getChannels(ServiceRequest request) {
}

public static Future<Void> getChannelById(ServiceRequest request) {
return getEntityAndRespond(request, new Channel());
UUID id = UUID.fromString(request.requestParam("id"));
Channel entity = new Channel();
return request.entityStorage().getEntity(id, entity).onSuccess(instance -> {
if (instance == null) {
responseText(request.routingContext(), 404).end(entity.entityName() + " " + id + " not found.");
} else {
Channel channel = ((Channel) instance).withFileQueue(new FileQueue(request, id.toString()));
responseJson(request.routingContext(), 200).end(channel.asJson().encodePrettily());
}
}).mapEmpty();
}

public static Future<Void> putChannel(ServiceRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ public static Future<Void> getTransformationById(ServiceRequest request) {
if (transformation == null) {
return responseText(request.routingContext(), 404).end("Transformation with ID " + id + " not found.");
} else {
return ((Transformation)transformation).fetchTransformationSteps(request.entityStorage().getTenantPool())
return ((Transformation) transformation).fetchTransformationSteps(request.entityStorage().getTenantPool())
.compose(done ->
responseJson(request.routingContext(),200).end(transformation.asJson().encodePrettily()));
responseJson(request.routingContext(), 200).end(transformation.asJson().encodePrettily()));
}
});
}
Expand Down
8 changes: 8 additions & 0 deletions src/main/resources/openapi/inventory-import-1.0.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,14 @@ components:
type: string
format: uuid
description: UUID referencing the transformation pipeline to use.
queuedFiles:
type: integer
description: Count of source files in the queue.
readOnly: true
fileInProcess:
type: string
description: Name of most recent file to be put in process and yet to be completed.
readOnly: true
metadata:
$ref: "#/components/schemas/metadata"
required:
Expand Down