diff --git a/CHANGELOG.md b/CHANGELOG.md index 5375d13323d5c..d71d4fca1f915 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add pluggable gRPC interceptors with explicit ordering([#19005](https://github.com/opensearch-project/OpenSearch/pull/19005)) - Add BindableServices extension point to transport-grpc-spi ([#19304](https://github.com/opensearch-project/OpenSearch/pull/19304)) - Add metrics for the merged segment warmer feature ([#18929](https://github.com/opensearch-project/OpenSearch/pull/18929)) -- Handle deleted documents for filter rewrite subaggregation optimization ([#19643](https://github.com/opensearch-project/OpenSearch/pull/19643)) +- Handle deleted documents for filter rewrite sub-aggregation optimization ([#19643](https://github.com/opensearch-project/OpenSearch/pull/19643)) +- Add bulk collect API for filter rewrite sub-aggregation optimization ([#19933](https://github.com/opensearch-project/OpenSearch/pull/19933)) - Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635)) - Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523)) - Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629)) diff --git a/server/src/main/java/org/apache/lucene/search/DocIdStreamHelper.java b/server/src/main/java/org/apache/lucene/search/DocIdStreamHelper.java new file mode 100644 index 0000000000000..4bcee99e9002c --- /dev/null +++ b/server/src/main/java/org/apache/lucene/search/DocIdStreamHelper.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.apache.lucene.search; + +import org.apache.lucene.util.FixedBitSet; + +/* +Need this helper class for initializing BitSetDocIdStream as it is +package-private class in Lucene + */ +public class DocIdStreamHelper { + public static DocIdStream getDocIdStream(FixedBitSet fixedBitSet) { + return new BitSetDocIdStream(fixedBitSet, 0); + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java index 0b34ffc78853a..e54f24b393e33 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java @@ -32,8 +32,10 @@ package org.opensearch.search.aggregations; +import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Scorable; +import org.opensearch.common.annotation.ExperimentalApi; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; import java.io.IOException; @@ -123,6 +125,29 @@ public void collect(int doc) throws IOException { collect(doc, 0); } + @Override + public void collect(DocIdStream stream) throws IOException { + collect(stream, 0); + } + + /** + * Bulk-collect doc IDs within {@code owningBucketOrd}. + * + *

Note: The provided {@link DocIdStream} may be reused across calls and should be consumed immediately. + * + *

Note: The provided DocIdStream typically only holds a small subset of query matches. This method may be called multiple times per segment. + * Like collect(int), it is guaranteed that doc IDs get collected in order, ie. doc IDs are collected in order within a DocIdStream, and if + * called twice, all doc IDs from the second DocIdStream will be greater than all doc IDs from the first DocIdStream. + * + *

It is legal for callers to mix calls to {@link #collect(DocIdStream, long)} and {@link #collect(int, long)}. + * + *

The default implementation calls {@code stream.forEach(doc -> collect(doc, owningBucketOrd))}. + */ + @ExperimentalApi + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + stream.forEach((doc) -> collect(doc, owningBucketOrd)); + } + @Override public void setScorer(Scorable scorer) throws IOException { // no-op by default diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java index e23af22a698d1..c3df41ca7cd95 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java @@ -12,7 +12,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.util.BitDocIdSet; +import org.apache.lucene.search.DocIdStreamHelper; import org.apache.lucene.util.Bits; import org.apache.lucene.util.FixedBitSet; import org.opensearch.search.aggregations.BucketCollector; @@ -24,8 +24,6 @@ import java.util.function.BiConsumer; import java.util.function.Function; -import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; - /** * Range collector implementation that supports sub-aggregations by collecting doc IDs. */ @@ -40,7 +38,6 @@ public class SubAggRangeCollector extends SimpleRangeCollector { private final Bits liveDocs; private final FixedBitSet bitSet; - private final BitDocIdSet bitDocIdSet; public SubAggRangeCollector( Ranges ranges, @@ -56,9 +53,7 @@ public SubAggRangeCollector( this.collectableSubAggregators = subAggCollectorParam.collectableSubAggregators(); this.leafCtx = subAggCollectorParam.leafCtx(); this.liveDocs = leafCtx.reader().getLiveDocs(); - int numDocs = leafCtx.reader().maxDoc(); - bitSet = new FixedBitSet(numDocs); - bitDocIdSet = new BitDocIdSet(bitSet); + bitSet = new FixedBitSet(leafCtx.reader().maxDoc()); } @Override @@ -109,13 +104,9 @@ public void finalizePreviousRange() { // trigger the sub agg collection for this range try { - DocIdSetIterator iterator = bitDocIdSet.iterator(); // build a new leaf collector for each bucket LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(leafCtx); - while (iterator.nextDoc() != NO_MORE_DOCS) { - int currentDoc = iterator.docID(); - sub.collect(currentDoc, bucketOrd); - } + sub.collect(DocIdStreamHelper.getDocIdStream(bitSet), bucketOrd); logger.trace("collected sub aggregation for bucket {}", bucketOrd); } catch (IOException e) { throw new RuntimeException(e); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 8fa9c61821fd8..dfd0b7b3b99ba 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -33,6 +33,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Rounding; @@ -267,6 +268,11 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc public void collect(int doc, long owningBucketOrd) throws IOException { iteratingCollector.collect(doc, owningBucketOrd); } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } }; } @@ -414,6 +420,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException { } } + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } + private void collectValue(int doc, long rounded) throws IOException { long bucketOrd = bucketOrds.add(0, rounded); if (bucketOrd < 0) { // already seen @@ -663,6 +674,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException { } } + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } + private int collectValue(long owningBucketOrd, int roundingIdx, int doc, long rounded) throws IOException { long bucketOrd = bucketOrds.add(owningBucketOrd, rounded); if (bucketOrd < 0) { // already seen diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 7ba939f64dbbf..9d691f72d10fe 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -251,6 +251,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException { collectValue(sub, doc, owningBucketOrd, preparedRounding.round(value)); } } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } }; } @@ -274,6 +279,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException { } } } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java index 5f99a9cc05558..3a4960246795b 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/AvgAggregator.java @@ -33,6 +33,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.DocIdSetIterator; +import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.FixedBitSet; import org.apache.lucene.util.NumericUtils; @@ -156,6 +157,11 @@ public void collect(int doc, long bucket) throws IOException { compensations.set(bucket, kahanSummation.delta()); } } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java index fbba20d8a6d7d..298b976225586 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MaxAggregator.java @@ -35,6 +35,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.Bits; import org.apache.lucene.util.NumericUtils; @@ -172,6 +173,10 @@ public void collect(int doc, long bucket) throws IOException { } } + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } }; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java index 5c2ed2b240a09..7e2b9a7728ed2 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/MinAggregator.java @@ -35,6 +35,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.PointValues; import org.apache.lucene.search.CollectionTerminatedException; +import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.Bits; import org.apache.lucene.util.NumericUtils; @@ -171,6 +172,11 @@ public void collect(int doc, long bucket) throws IOException { mins.set(bucket, min); } } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } }; }