Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.apache.lucene.search.CheckedIntConsumer;
import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.MathUtil;

import java.io.IOException;

/**
* DocIdStream implementation using FixedBitSet. This is duplicate of the implementation in Lucene
* and should ideally eventually be removed.
*
* @opensearch.internal
*/
public final class BitSetDocIdStream extends DocIdStream {
private final FixedBitSet bitSet;
private final int offset, max;
private int upTo;

public BitSetDocIdStream(FixedBitSet bitSet, int offset) {
this.bitSet = bitSet;
this.offset = offset;
upTo = offset;
max = MathUtil.unsignedMin(Integer.MAX_VALUE, offset + bitSet.length());
}

@Override
public boolean mayHaveRemaining() {
return upTo < max;
}

@Override
public void forEach(int upTo, CheckedIntConsumer<IOException> consumer) throws IOException {
if (upTo > this.upTo) {
upTo = Math.min(upTo, max);
bitSet.forEach(this.upTo - offset, upTo - offset, offset, consumer);
this.upTo = upTo;
}
}

@Override
public int count(int upTo) throws IOException {
if (upTo > this.upTo) {
upTo = Math.min(upTo, max);
int count = bitSet.cardinality(this.upTo - offset, upTo - offset);
this.upTo = upTo;
return count;
} else {
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.search.aggregations;

import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.Scorable;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
Expand Down Expand Up @@ -123,6 +124,21 @@ public void collect(int doc) throws IOException {
collect(doc, 0);
}

@Override
public void collect(DocIdStream stream) throws IOException {
collect(stream, 0);
}

public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
// for (docCount = stream.intoArray(docBuffer); docCount != 0; docCount = stream.intoArray(docBuffer)) {
// if (docCount == docBuffer.length) {
// collect(docBuffer, owningBucketOrd);
// }
// }
// collectRemaining();
stream.forEach((doc) -> collect(doc, owningBucketOrd));
}

@Override
public void setScorer(Scorable scorer) throws IOException {
// no-op by default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.BitDocIdSet;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.search.aggregations.BitSetDocIdStream;
import org.opensearch.search.aggregations.BucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
Expand All @@ -24,8 +24,6 @@
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;

/**
* Range collector implementation that supports sub-aggregations by collecting doc IDs.
*/
Expand All @@ -40,7 +38,6 @@ public class SubAggRangeCollector extends SimpleRangeCollector {

private final Bits liveDocs;
private final FixedBitSet bitSet;
private final BitDocIdSet bitDocIdSet;

public SubAggRangeCollector(
Ranges ranges,
Expand All @@ -56,9 +53,7 @@ public SubAggRangeCollector(
this.collectableSubAggregators = subAggCollectorParam.collectableSubAggregators();
this.leafCtx = subAggCollectorParam.leafCtx();
this.liveDocs = leafCtx.reader().getLiveDocs();
int numDocs = leafCtx.reader().maxDoc();
bitSet = new FixedBitSet(numDocs);
bitDocIdSet = new BitDocIdSet(bitSet);
bitSet = new FixedBitSet(leafCtx.reader().maxDoc());
}

@Override
Expand Down Expand Up @@ -109,13 +104,9 @@ public void finalizePreviousRange() {

// trigger the sub agg collection for this range
try {
DocIdSetIterator iterator = bitDocIdSet.iterator();
// build a new leaf collector for each bucket
LeafBucketCollector sub = collectableSubAggregators.getLeafCollector(leafCtx);
while (iterator.nextDoc() != NO_MORE_DOCS) {
int currentDoc = iterator.docID();
sub.collect(currentDoc, bucketOrd);
}
sub.collect(new BitSetDocIdStream(bitSet, 0), bucketOrd);
logger.trace("collected sub aggregation for bucket {}", bucketOrd);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdStream;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.common.Rounding;
Expand Down Expand Up @@ -267,6 +268,11 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc
public void collect(int doc, long owningBucketOrd) throws IOException {
iteratingCollector.collect(doc, owningBucketOrd);
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
super.collect(stream, owningBucketOrd);
}
};
}

Expand Down Expand Up @@ -414,6 +420,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
}
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
super.collect(stream, owningBucketOrd);
}

private void collectValue(int doc, long rounded) throws IOException {
long bucketOrd = bucketOrds.add(0, rounded);
if (bucketOrd < 0) { // already seen
Expand Down Expand Up @@ -663,6 +674,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
}
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
super.collect(stream, owningBucketOrd);
}

private int collectValue(long owningBucketOrd, int roundingIdx, int doc, long rounded) throws IOException {
long bucketOrd = bucketOrds.add(owningBucketOrd, rounded);
if (bucketOrd < 0) { // already seen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
collectValue(sub, doc, owningBucketOrd, preparedRounding.round(value));
}
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
super.collect(stream, owningBucketOrd);
}
};
}

Expand All @@ -274,6 +279,11 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
}
}
}

@Override
public void collect(DocIdStream stream, long owningBucketOrd) throws IOException {
super.collect(stream, owningBucketOrd);
}
};
}

Expand Down
Loading