diff --git a/extra/nouveau/build.gradle b/extra/nouveau/build.gradle index c23ef8fc20c..495b0489325 100644 --- a/extra/nouveau/build.gradle +++ b/extra/nouveau/build.gradle @@ -63,6 +63,7 @@ spotless { java { importOrder() removeUnusedImports() + forbidWildcardImports() cleanthat() palantirJavaFormat() } diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/BulkUpdateRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/BulkUpdateRequest.java new file mode 100644 index 00000000000..3077ea8b3d5 --- /dev/null +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/BulkUpdateRequest.java @@ -0,0 +1,19 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.couchdb.nouveau.api; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +public record BulkUpdateRequest(@JsonProperty List updates) {} diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java index 82e9b716aa5..36f70bf788d 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentDeleteRequest.java @@ -17,7 +17,7 @@ import jakarta.validation.constraints.Positive; import jakarta.validation.constraints.PositiveOrZero; -public final class DocumentDeleteRequest { +public final class DocumentDeleteRequest extends DocumentRequest { @PositiveOrZero private final long matchSeq; diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java new file mode 100644 index 00000000000..7dc8aa26954 --- /dev/null +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentRequest.java @@ -0,0 +1,24 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.couchdb.nouveau.api; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@type") +@JsonSubTypes({ + @JsonSubTypes.Type(value = DocumentDeleteRequest.class, name = "delete"), + @JsonSubTypes.Type(value = DocumentUpdateRequest.class, name = "update"), +}) +public abstract class DocumentRequest {} diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdate.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdate.java new file mode 100644 index 00000000000..141860f3a98 --- /dev/null +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdate.java @@ -0,0 +1,18 @@ +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.apache.couchdb.nouveau.api; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public record DocumentUpdate(@JsonProperty("doc_id") String docId, @JsonProperty("update") DocumentRequest request) {} diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java index 82c1966022d..9cbed3ac059 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/api/DocumentUpdateRequest.java @@ -20,7 +20,7 @@ import jakarta.validation.constraints.PositiveOrZero; import java.util.Collection; -public final class DocumentUpdateRequest { +public final class DocumentUpdateRequest extends DocumentRequest { @PositiveOrZero private final long matchSeq; diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java index 7e5facb2e26..0a70b2b6caa 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/health/IndexHealthCheck.java @@ -16,6 +16,9 @@ import com.codahale.metrics.health.HealthCheck; import java.io.IOException; import java.util.Collections; +import java.util.List; +import org.apache.couchdb.nouveau.api.BulkUpdateRequest; +import org.apache.couchdb.nouveau.api.DocumentUpdate; import org.apache.couchdb.nouveau.api.DocumentUpdateRequest; import org.apache.couchdb.nouveau.api.IndexDefinition; import org.apache.couchdb.nouveau.api.SearchRequest; @@ -41,10 +44,10 @@ protected Result check() throws Exception { indexResource.createIndex(name, new IndexDefinition(IndexDefinition.LATEST_LUCENE_VERSION, "standard", null)); try { - final DocumentUpdateRequest documentUpdateRequest = - new DocumentUpdateRequest(0, 1, null, Collections.emptyList()); - indexResource.updateDoc(name, "foo", documentUpdateRequest); - + indexResource.update( + name, + new BulkUpdateRequest(List.of(new DocumentUpdate( + "foo", new DocumentUpdateRequest(0, 1, null, Collections.emptyList()))))); final SearchRequest searchRequest = new SearchRequest(); searchRequest.setQuery("_id:foo"); searchRequest.setMinUpdateSeq(1); diff --git a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java index 9ba3821090e..ef78ab20ea9 100644 --- a/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java +++ b/extra/nouveau/src/main/java/org/apache/couchdb/nouveau/resources/IndexResource.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.List; import java.util.Objects; +import org.apache.couchdb.nouveau.api.BulkUpdateRequest; import org.apache.couchdb.nouveau.api.DocumentDeleteRequest; import org.apache.couchdb.nouveau.api.DocumentUpdateRequest; import org.apache.couchdb.nouveau.api.IndexDefinition; @@ -67,6 +68,7 @@ public Ok createIndex(@PathParam("name") String name, @NotNull @Valid IndexDefin return Ok.INSTANCE; } + @Deprecated(since = "3.5.2", forRemoval = true) @DELETE @Path("/doc/{docId}") public Ok deleteDoc( @@ -120,6 +122,7 @@ public SearchResults searchIndex(@PathParam("name") String name, @NotNull @Valid }); } + @Deprecated(since = "3.5.2", forRemoval = true) @PUT @Path("/doc/{docId}") public Ok updateDoc( @@ -132,4 +135,20 @@ public Ok updateDoc( return Ok.INSTANCE; }); } + + @POST + @Path("/update") + public Ok update(@PathParam("name") String name, @NotNull @Valid BulkUpdateRequest request) throws Exception { + return indexManager.with(name, (index) -> { + for (var update : request.updates()) { + if (update.request() instanceof DocumentUpdateRequest) { + index.update(update.docId(), (DocumentUpdateRequest) update.request()); + } + if (update.request() instanceof DocumentDeleteRequest) { + index.delete(update.docId(), (DocumentDeleteRequest) update.request()); + } + } + return Ok.INSTANCE; + }); + } } diff --git a/mise.toml b/mise.toml index 733c7c6ea56..4da8b2db017 100644 --- a/mise.toml +++ b/mise.toml @@ -3,7 +3,7 @@ NODE_ENV = 'production' [tools] elixir = "1.19.5-otp-26" -erlang = '26' +erlang = "28.3.3" java = '21' nodejs = '24' python = '3' diff --git a/src/nouveau/src/nouveau_api.erl b/src/nouveau/src/nouveau_api.erl index 2d140e5806a..8c85ec910d1 100644 --- a/src/nouveau/src/nouveau_api.erl +++ b/src/nouveau/src/nouveau_api.erl @@ -33,8 +33,22 @@ jaxrs_error/2 ]). +%% batch api functions +-export([ + update/2, + make_update/5, + make_delete/3, + make_purge/3 +]). + -define(JSON_CONTENT_TYPE, {"Content-Type", "application/json"}). +-deprecated([ + {purge_doc, 4, "replaced by updates/2"}, + {update_doc, 6, "replaced by updates/2"}, + {delete_doc, 4, "replaced by updates/2"} +]). + analyze(Text, Analyzer) when is_binary(Text), is_binary(Analyzer) -> @@ -172,6 +186,78 @@ update_doc(#index{} = Index, DocId, MatchSeq, UpdateSeq, Partition, Fields) when send_error(Reason) end. +update(#index{} = Index, Updates) when is_list(Updates) -> + Resp = send_if_enabled( + update_path(Index), + [?JSON_CONTENT_TYPE], + <<"POST">>, + jiffy:encode(#{updates => Updates}) + ), + case Resp of + {ok, 200, _, _} -> + ok; + {ok, StatusCode, _, RespBody} -> + {error, jaxrs_error(StatusCode, RespBody)}; + {error, Reason} -> + send_error(Reason) + end. + +make_delete(DocId, MatchSeq, UpdateSeq) when + is_binary(DocId), + is_integer(MatchSeq), + MatchSeq >= 0, + is_integer(UpdateSeq), + UpdateSeq > 0 +-> + #{ + doc_id => DocId, + update => #{ + '@type' => delete, + doc_id => DocId, + match_seq => MatchSeq, + seq => UpdateSeq, + delete => true + } + }. + +make_purge(DocId, MatchSeq, PurgeSeq) when + is_binary(DocId), + is_integer(MatchSeq), + MatchSeq >= 0, + is_integer(PurgeSeq), + PurgeSeq > 0 +-> + #{ + doc_id => DocId, + update => #{ + '@type' => delete, + doc_id => DocId, + match_seq => MatchSeq, + seq => PurgeSeq, + purge => true + } + }. + +make_update(DocId, MatchSeq, UpdateSeq, Partition, Fields) when + is_binary(DocId), + is_integer(MatchSeq), + MatchSeq >= 0, + is_integer(UpdateSeq), + UpdateSeq > 0, + (is_binary(Partition) orelse Partition == null), + is_list(Fields) +-> + #{ + doc_id => DocId, + update => #{ + '@type' => update, + match_seq => MatchSeq, + seq => UpdateSeq, + partition => Partition, + fields => Fields + } + }. + search(#index{} = Index, QueryArgs) -> Resp = send_if_enabled( search_path(Index), [?JSON_CONTENT_TYPE], <<"POST">>, jiffy:encode(QueryArgs) @@ -245,6 +331,9 @@ doc_path(#index{} = Index, DocId) -> search_path(#index{} = Index) -> [index_path(Index), <<"/search">>]. +update_path(#index{} = Index) -> + [index_path(Index), <<"/update">>]. + jaxrs_error(400, Body) -> {bad_request, message(Body)}; jaxrs_error(404, Body) -> diff --git a/src/nouveau/src/nouveau_index_updater.erl b/src/nouveau/src/nouveau_index_updater.erl index 4bfea753a15..10d4144f0a3 100644 --- a/src/nouveau/src/nouveau_index_updater.erl +++ b/src/nouveau/src/nouveau_index_updater.erl @@ -33,13 +33,18 @@ changes_done, total_changes, exclude_idrevs, - update_seq + update_seq, + batch_size, + batch }). -record(purge_acc, { + index, exclude_list = [], index_update_seq, - index_purge_seq + index_purge_seq, + batch_size, + batch }). outdated(#index{} = Index) -> @@ -77,8 +82,11 @@ update(#index{} = Index) -> couch_task_status:set_update_frequency(500), PurgeAcc0 = #purge_acc{ + index = Index, index_update_seq = IndexUpdateSeq, - index_purge_seq = IndexPurgeSeq + index_purge_seq = IndexPurgeSeq, + batch_size = config:get_integer("nouveau", "batch_size", 20), + batch = [] }, {ok, PurgeAcc1} = purge_index(Db, Index, PurgeAcc0), @@ -94,12 +102,19 @@ update(#index{} = Index) -> changes_done = 0, total_changes = TotalChanges, exclude_idrevs = PurgeAcc1#purge_acc.exclude_list, - update_seq = PurgeAcc1#purge_acc.index_update_seq + update_seq = PurgeAcc1#purge_acc.index_update_seq, + batch_size = config:get_integer("nouveau", "batch_size", 20), + batch = [] }, {ok, Acc1} = couch_db:fold_changes( Db, Acc0#acc.update_seq, fun load_docs/2, Acc0, [] ), - exit(nouveau_api:set_update_seq(Index, Acc1#acc.update_seq, NewCurSeq)) + case flush_batch(Acc1) of + {ok, Acc2} -> + exit(nouveau_api:set_update_seq(Index, Acc2#acc.update_seq, NewCurSeq)); + {error, Reason} -> + exit({error, Reason}) + end after ret_os_process(Proc) end @@ -123,30 +138,26 @@ load_docs(FDI, #acc{} = Acc1) -> true -> Acc1; false -> - case - update_or_delete_index( - Acc1#acc.db, - Acc1#acc.index, - Acc1#acc.update_seq, - DI, - Acc1#acc.proc - ) - of - ok -> - Acc1#acc{ - update_seq = DI#doc_info.high_seq - }; - {error, Reason} -> - exit({error, Reason}) - end + Item = update_or_delete_index( + Acc1#acc.db, Acc1#acc.update_seq, DI, Acc1#acc.proc + ), + Acc1#acc{ + batch = [Item | Acc1#acc.batch], + update_seq = DI#doc_info.high_seq + } end, - {ok, Acc2#acc{changes_done = Acc2#acc.changes_done + 1}}. + case maybe_flush_batch(Acc2) of + {ok, Acc3} -> + {ok, Acc3}; + {error, Reason} -> + exit({error, Reason}) + end. -update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) -> +update_or_delete_index(Db, MatchSeq, #doc_info{} = DI, Proc) -> #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); + nouveau_api:make_delete(Id, MatchSeq, Seq); false -> {ok, Doc} = couch_db:open_doc(Db, DI, []), Json = couch_doc:to_json_obj(Doc, []), @@ -160,14 +171,44 @@ update_or_delete_index(Db, #index{} = Index, MatchSeq, #doc_info{} = DI, Proc) - end, case Fields of [] -> - nouveau_api:delete_doc(Index, Id, MatchSeq, Seq); + nouveau_api:make_delete(Id, MatchSeq, Seq); _ -> - nouveau_api:update_doc( - Index, Id, MatchSeq, Seq, Partition, Fields - ) + nouveau_api:make_update(Id, MatchSeq, Seq, Partition, Fields) end end. +maybe_flush_batch(#acc{} = Acc) when length(Acc#acc.batch) >= Acc#acc.batch_size -> + flush_batch(Acc); +maybe_flush_batch(#purge_acc{} = Acc) when + length(Acc#purge_acc.batch) >= Acc#purge_acc.batch_size +-> + flush_batch(Acc); +maybe_flush_batch(#acc{} = Acc) -> + {ok, Acc}; +maybe_flush_batch(#purge_acc{} = Acc) -> + {ok, Acc}. + +flush_batch(#acc{batch = []} = Acc) -> + {ok, Acc}; +flush_batch(#purge_acc{batch = []} = Acc) -> + {ok, Acc}; +flush_batch(#acc{} = Acc) -> + #acc{batch = Batch} = Acc, + case nouveau_api:update(Acc#acc.index, lists:reverse(Batch)) of + ok -> + {ok, Acc#acc{batch = [], changes_done = Acc#acc.changes_done + length(Batch)}}; + {error, Reason} -> + {error, Reason} + end; +flush_batch(#purge_acc{} = Acc) -> + #purge_acc{batch = Batch} = Acc, + case nouveau_api:update(Acc#purge_acc.index, lists:reverse(Batch)) of + ok -> + {ok, Acc#purge_acc{batch = []}}; + {error, Reason} -> + {error, Reason} + end. + open_or_create_index(#index{} = Index) -> case nouveau_api:index_info(Index) of {ok, #{} = Info} -> @@ -214,13 +255,14 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def, <<"nouveau">>]), FoldFun = fun({PurgeSeq, _UUID, Id, _Revs}, #purge_acc{} = PurgeAcc1) -> - PurgeAcc2 = + PurgeAcc3 = case couch_db:get_full_doc_info(Db, Id) of not_found -> - ok = nouveau_api:purge_doc( - Index, Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq + Item = nouveau_api:make_purge( + Id, PurgeAcc1#purge_acc.index_purge_seq, PurgeSeq ), - PurgeAcc1#purge_acc{index_purge_seq = PurgeSeq}; + PurgeAcc2 = PurgeAcc1#purge_acc{batch = [Item | PurgeAcc1#purge_acc.batch]}, + PurgeAcc2#purge_acc{index_purge_seq = PurgeSeq}; FDI -> DI = couch_doc:to_doc_info(FDI), #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{rev = Rev} | _]} = DI, @@ -228,32 +270,30 @@ purge_index(Db, Index, #purge_acc{} = PurgeAcc0) -> true -> PurgeAcc1; false -> - update_or_delete_index( - Db, - Index, - PurgeAcc1#purge_acc.index_update_seq, - DI, - Proc + Item = update_or_delete_index( + Db, PurgeAcc1#purge_acc.index_update_seq, DI, Proc ), PurgeAcc1#purge_acc{ + batch = [Item | PurgeAcc1#purge_acc.batch], exclude_list = [{Id, Rev} | PurgeAcc1#purge_acc.exclude_list], index_update_seq = Seq } end end, update_task(1), - {ok, PurgeAcc2} + maybe_flush_batch(PurgeAcc3) end, {ok, #purge_acc{} = PurgeAcc3} = couch_db:fold_purge_infos( Db, PurgeAcc0#purge_acc.index_purge_seq, FoldFun, PurgeAcc0, [] ), + {ok, PurgeAcc4} = flush_batch(PurgeAcc3), DbPurgeSeq = couch_db:get_purge_seq(Db), ok = nouveau_api:set_purge_seq( - Index, PurgeAcc3#purge_acc.index_purge_seq, DbPurgeSeq + Index, PurgeAcc4#purge_acc.index_purge_seq, DbPurgeSeq ), update_local_doc(Db, Index, DbPurgeSeq), - {ok, PurgeAcc3} + {ok, PurgeAcc4} after ret_os_process(Proc) end.