diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java index 77e85332c7..0ec4eefff5 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/DatasetItemDAO.java @@ -6,7 +6,7 @@ import com.comet.opik.api.ExperimentItem; import com.comet.opik.api.FeedbackScore; import com.comet.opik.api.ScoreSource; -import com.comet.opik.infrastructure.BulkConfig; +import com.comet.opik.infrastructure.BulkOperationsConfig; import com.comet.opik.infrastructure.db.TransactionTemplate; import com.comet.opik.utils.JsonUtils; import com.fasterxml.jackson.databind.JsonNode; @@ -127,7 +127,7 @@ INSERT INTO dataset_items ( :expectedOutput AS expected_output, :metadata AS metadata, now64(9) AS created_at, - :workspace_id AS workspace_id, + :workspace_id AS workspace_id, :createdBy AS created_by, :lastUpdatedBy AS last_updated_by @@ -139,6 +139,14 @@ LEFT JOIN ( SELECT * FROM dataset_items + WHERE id IN ( + + + , + + }> + ) ORDER BY last_updated_at DESC LIMIT 1 BY id ) AS old @@ -379,7 +387,7 @@ LEFT JOIN ( """; private final @NonNull TransactionTemplate asyncTemplate; - private final @NonNull @Config("bulkOperations") BulkConfig bulkConfig; + private final @NonNull @Config("bulkOperations") BulkOperationsConfig bulkConfig; @Override public Mono save(@NonNull UUID datasetId, @NonNull List items) { @@ -388,10 +396,10 @@ public Mono save(@NonNull UUID datasetId, @NonNull List items return Mono.empty(); } - return inset(datasetId, items); + return insert(datasetId, items); } - private Mono inset(UUID datasetId, List items) { + private Mono insert(UUID datasetId, List items) { List> batches = Lists.partition(items, bulkConfig.getSize()); return Flux.fromIterable(batches) @@ -401,7 +409,7 @@ private Mono inset(UUID datasetId, List items) { private Mono mapAndInsert(UUID datasetId, List items, Connection connection) { - List queryItems = getQueryItemPlaceHolder(items); + List queryItems = getQueryItemPlaceHolder(items.size()); var template = new ST(INSERT_DATASET_ITEM) .add("items", queryItems); @@ -412,6 +420,8 @@ private Mono mapAndInsert(UUID datasetId, List items, Connect return makeMonoContextAware((userName, workspaceName, workspaceId) -> { + statement.bind("workspace_id", workspaceId); + int i = 0; for (DatasetItem item : items) { statement.bind("id" + i, item.id()); @@ -422,7 +432,6 @@ private Mono mapAndInsert(UUID datasetId, List items, Connect statement.bind("input" + i, getOrDefault(item.input())); statement.bind("expectedOutput" + i, getOrDefault(item.expectedOutput())); statement.bind("metadata" + i, getOrDefault(item.metadata())); - statement.bind("workspace_id" + i, workspaceId); statement.bind("createdBy" + i,userName); statement.bind("lastUpdatedBy" + i, userName); i++; diff --git a/apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentItemDAO.java b/apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentItemDAO.java index 942cd448a4..80441c0a26 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentItemDAO.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/domain/ExperimentItemDAO.java @@ -1,7 +1,7 @@ package com.comet.opik.domain; import com.comet.opik.api.ExperimentItem; -import com.comet.opik.infrastructure.BulkConfig; +import com.comet.opik.infrastructure.BulkOperationsConfig; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import io.r2dbc.spi.Connection; @@ -79,7 +79,7 @@ INSERT INTO experiment_items ( :experiment_id AS experiment_id, :dataset_item_id AS dataset_item_id, :trace_id AS trace_id, - :workspace_id AS workspace_id, + :workspace_id AS workspace_id, :created_by AS created_by, :last_updated_by AS last_updated_by @@ -91,6 +91,14 @@ LEFT JOIN ( SELECT id, workspace_id FROM experiment_items + WHERE id IN ( + + + , + + }> + ) ORDER BY last_updated_at DESC LIMIT 1 BY id ) AS old @@ -131,7 +139,7 @@ LEFT JOIN ( """; private final @NonNull ConnectionFactory connectionFactory; - private final @NonNull @Config("bulkOperations") BulkConfig bulkConfig; + private final @NonNull @Config("bulkOperations") BulkOperationsConfig bulkConfig; public Flux findExperimentSummaryByDatasetIds(Collection datasetIds) { @@ -169,7 +177,7 @@ public Mono insert(@NonNull Set experimentItems) { private Mono insert(Collection experimentItems, Connection connection) { - List queryItems = getQueryItemPlaceHolder(experimentItems); + List queryItems = getQueryItemPlaceHolder(experimentItems.size()); var template = new ST(INSERT) .add("items", queryItems); @@ -180,13 +188,14 @@ private Mono insert(Collection experimentItems, Connection return makeMonoContextAware((userName, workspaceName, workspaceId) -> { + statement.bind("workspace_id", workspaceId); + int index = 0; for (ExperimentItem item : experimentItems) { statement.bind("id" + index, item.id()); statement.bind("experiment_id" + index, item.experimentId()); statement.bind("dataset_item_id" + index, item.datasetItemId()); statement.bind("trace_id" + index, item.traceId()); - statement.bind("workspace_id" + index, workspaceId); statement.bind("created_by" + index, userName); statement.bind("last_updated_by" + index, userName); index++; diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/BulkConfig.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/BulkOperationsConfig.java similarity index 61% rename from apps/opik-backend/src/main/java/com/comet/opik/infrastructure/BulkConfig.java rename to apps/opik-backend/src/main/java/com/comet/opik/infrastructure/BulkOperationsConfig.java index a86f194a08..8856f6df45 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/BulkConfig.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/BulkOperationsConfig.java @@ -3,13 +3,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; import lombok.Data; @Data -public class BulkConfig { +public class BulkOperationsConfig { @Valid @JsonProperty - private Integer size; - + @NotNull + private int size; } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java index 4a52bcdb17..2c9a9b70c7 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/OpikConfiguration.java @@ -36,5 +36,5 @@ public class OpikConfiguration extends Configuration { @Valid @NotNull @JsonProperty - private BulkConfig bulkOperations = new BulkConfig(); + private BulkOperationsConfig bulkOperations = new BulkOperationsConfig(); } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java index ce164a384f..2fed687d73 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/infrastructure/db/DatabaseAnalyticsModule.java @@ -1,5 +1,6 @@ package com.comet.opik.infrastructure.db; +import com.comet.opik.infrastructure.BulkOperationsConfig; import com.comet.opik.infrastructure.DatabaseAnalyticsFactory; import com.comet.opik.infrastructure.OpikConfiguration; import com.google.inject.Provides; @@ -7,6 +8,7 @@ import jakarta.inject.Named; import jakarta.inject.Singleton; import ru.vyarus.dropwizard.guice.module.support.DropwizardAwareModule; +import ru.vyarus.dropwizard.guice.module.yaml.bind.Config; public class DatabaseAnalyticsModule extends DropwizardAwareModule { @@ -38,4 +40,10 @@ public TransactionTemplate getTransactionTemplate(ConnectionFactory connectionFa return new TransactionTemplateImpl(connectionFactory); } + @Provides + @Singleton + public BulkOperationsConfig bulkOperation(@Config("bulkOperations") BulkOperationsConfig bulkConfig) { + return bulkConfig; + } + } diff --git a/apps/opik-backend/src/main/java/com/comet/opik/utils/TemplateUtils.java b/apps/opik-backend/src/main/java/com/comet/opik/utils/TemplateUtils.java index c76f684b30..0e9235f489 100644 --- a/apps/opik-backend/src/main/java/com/comet/opik/utils/TemplateUtils.java +++ b/apps/opik-backend/src/main/java/com/comet/opik/utils/TemplateUtils.java @@ -1,29 +1,26 @@ package com.comet.opik.utils; -import java.util.Collection; +import lombok.RequiredArgsConstructor; + import java.util.List; import java.util.stream.IntStream; public class TemplateUtils { + @RequiredArgsConstructor public static class QueryItem { public final int index; public final boolean hasNext; - - public QueryItem(int index, boolean hasNext) { - this.index = index; - this.hasNext = hasNext; - } } - public static List getQueryItemPlaceHolder(Collection items) { + public static List getQueryItemPlaceHolder(int size) { - if (items == null || items.isEmpty()) { + if (size == 0) { return List.of(); } - return IntStream.range(0, items.size()) - .mapToObj(i -> new QueryItem(i, i < items.size() - 1)) + return IntStream.range(0, size) + .mapToObj(i -> new QueryItem(i, i < size - 1)) .toList(); } }