From c32afaadfe14be0b03c4853f6089b23f707ee6e5 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Sat, 8 Nov 2025 17:52:58 -0800 Subject: [PATCH 1/2] Keeping the logic basic for now Signed-off-by: Ankit Jain --- .../aggregations/BitSetDocIdStream.java | 61 +++++++++++++++++++ .../aggregations/LeafBucketCollector.java | 16 +++++ .../rangecollector/SubAggRangeCollector.java | 12 +--- .../AutoDateHistogramAggregator.java | 16 +++++ .../histogram/DateHistogramAggregator.java | 10 +++ 5 files changed, 106 insertions(+), 9 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/aggregations/BitSetDocIdStream.java diff --git a/server/src/main/java/org/opensearch/search/aggregations/BitSetDocIdStream.java b/server/src/main/java/org/opensearch/search/aggregations/BitSetDocIdStream.java new file mode 100644 index 0000000000000..b5a2861d3476f --- /dev/null +++ b/server/src/main/java/org/opensearch/search/aggregations/BitSetDocIdStream.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.aggregations; + +import org.apache.lucene.search.CheckedIntConsumer; +import org.apache.lucene.search.DocIdStream; +import org.apache.lucene.util.FixedBitSet; +import org.apache.lucene.util.MathUtil; + +import java.io.IOException; + +/** + * DocIdStream implementation using FixedBitSet. This is duplicate of the implementation in Lucene + * and should ideally eventually be removed. + * + * @opensearch.internal + */ +public final class BitSetDocIdStream extends DocIdStream { + private final FixedBitSet bitSet; + private final int offset, max; + private int upTo; + + public BitSetDocIdStream(FixedBitSet bitSet, int offset) { + this.bitSet = bitSet; + this.offset = offset; + upTo = offset; + max = MathUtil.unsignedMin(Integer.MAX_VALUE, offset + bitSet.length()); + } + + @Override + public boolean mayHaveRemaining() { + return upTo < max; + } + + @Override + public void forEach(int upTo, CheckedIntConsumer consumer) throws IOException { + if (upTo > this.upTo) { + upTo = Math.min(upTo, max); + bitSet.forEach(this.upTo - offset, upTo - offset, offset, consumer); + this.upTo = upTo; + } + } + + @Override + public int count(int upTo) throws IOException { + if (upTo > this.upTo) { + upTo = Math.min(upTo, max); + int count = bitSet.cardinality(this.upTo - offset, upTo - offset); + this.upTo = upTo; + return count; + } else { + return 0; + } + } +} diff --git a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java index 0b34ffc78853a..7ce30decebace 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/LeafBucketCollector.java @@ -32,6 +32,7 @@ package org.opensearch.search.aggregations; +import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.Scorable; import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds; @@ -123,6 +124,21 @@ public void collect(int doc) throws IOException { collect(doc, 0); } + @Override + public void collect(DocIdStream stream) throws IOException { + collect(stream, 0); + } + + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + // for (docCount = stream.intoArray(docBuffer); docCount != 0; docCount = stream.intoArray(docBuffer)) { + // if (docCount == docBuffer.length) { + // collect(docBuffer, owningBucketOrd); + // } + // } + // collectRemaining(); + stream.forEach((doc) -> collect(doc, owningBucketOrd)); + } + @Override public void setScorer(Scorable scorer) throws IOException { // no-op by default diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java index e23af22a698d1..f89879f633bde 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java @@ -15,6 +15,7 @@ import org.apache.lucene.util.BitDocIdSet; import org.apache.lucene.util.Bits; import org.apache.lucene.util.FixedBitSet; +import org.opensearch.search.aggregations.BitSetDocIdStream; import org.opensearch.search.aggregations.BucketCollector; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext; @@ -40,7 +41,6 @@ public class SubAggRangeCollector extends SimpleRangeCollector { private final Bits liveDocs; private final FixedBitSet bitSet; - private final BitDocIdSet bitDocIdSet; public SubAggRangeCollector( Ranges ranges, @@ -56,9 +56,7 @@ public SubAggRangeCollector( this.collectableSubAggregators = subAggCollectorParam.collectableSubAggregators(); this.leafCtx = subAggCollectorParam.leafCtx(); this.liveDocs = leafCtx.reader().getLiveDocs(); - int numDocs = leafCtx.reader().maxDoc(); - bitSet = new FixedBitSet(numDocs); - bitDocIdSet = new BitDocIdSet(bitSet); + bitSet = new FixedBitSet(leafCtx.reader().maxDoc()); } @Override @@ -109,13 +107,9 @@ public void finalizePreviousRange() { // trigger the sub agg collection for this range try { - DocIdSetIterator iterator = bitDocIdSet.iterator(); // build a new leaf collector for each bucket LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(leafCtx); - while (iterator.nextDoc() != NO_MORE_DOCS) { - int currentDoc = iterator.docID(); - sub.collect(currentDoc, bucketOrd); - } + sub.collect(new BitSetDocIdStream(bitSet, 0), bucketOrd); logger.trace("collected sub aggregation for bucket {}", bucketOrd); } catch (IOException e) { throw new RuntimeException(e); diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 8fa9c61821fd8..dfd0b7b3b99ba 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -33,6 +33,7 @@ import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.search.DocIdStream; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.util.CollectionUtil; import org.opensearch.common.Rounding; @@ -267,6 +268,11 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc public void collect(int doc, long owningBucketOrd) throws IOException { iteratingCollector.collect(doc, owningBucketOrd); } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } }; } @@ -414,6 +420,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException { } } + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } + private void collectValue(int doc, long rounded) throws IOException { long bucketOrd = bucketOrds.add(0, rounded); if (bucketOrd < 0) { // already seen @@ -663,6 +674,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException { } } + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } + private int collectValue(long owningBucketOrd, int roundingIdx, int doc, long rounded) throws IOException { long bucketOrd = bucketOrds.add(owningBucketOrd, rounded); if (bucketOrd < 0) { // already seen diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java index 7ba939f64dbbf..9d691f72d10fe 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java @@ -251,6 +251,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException { collectValue(sub, doc, owningBucketOrd, preparedRounding.round(value)); } } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } }; } @@ -274,6 +279,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException { } } } + + @Override + public void collect(DocIdStream stream, long owningBucketOrd) throws IOException { + super.collect(stream, owningBucketOrd); + } }; } From c76ffc0b0527fc13e2ff786379d698933512d677 Mon Sep 17 00:00:00 2001 From: Ankit Jain Date: Sat, 8 Nov 2025 17:55:13 -0800 Subject: [PATCH 2/2] Apply spotless Signed-off-by: Ankit Jain --- .../filterrewrite/rangecollector/SubAggRangeCollector.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java index f89879f633bde..ff7fb3dba8e07 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filterrewrite/rangecollector/SubAggRangeCollector.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.DocIdSetIterator; -import org.apache.lucene.util.BitDocIdSet; import org.apache.lucene.util.Bits; import org.apache.lucene.util.FixedBitSet; import org.opensearch.search.aggregations.BitSetDocIdStream; @@ -25,8 +24,6 @@ import java.util.function.BiConsumer; import java.util.function.Function; -import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; - /** * Range collector implementation that supports sub-aggregations by collecting doc IDs. */