Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 107 additions & 2 deletions lucene/facet/src/java/org/apache/lucene/facet/DrillSideways.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -30,20 +31,25 @@
import org.apache.lucene.facet.sortedset.SortedSetDocValuesReaderState;
import org.apache.lucene.facet.taxonomy.FastTaxonomyFacetCounts;
import org.apache.lucene.facet.taxonomy.TaxonomyReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.LeafCollector;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MultiCollectorManager;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Scorable;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.search.TopFieldCollectorManager;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.TopScoreDocCollectorManager;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.ThreadInterruptedException;

/**
Expand Down Expand Up @@ -85,6 +91,8 @@ public class DrillSideways {
/** (optional) {@link ExecutorService} used for "concurrent" drill sideways if desired. */
private final ExecutorService executor;

private final boolean allowEarlyTermination;

/** Create a new {@code DrillSideways} instance. */
public DrillSideways(IndexSearcher searcher, FacetsConfig config, TaxonomyReader taxoReader) {
this(searcher, config, taxoReader, null);
Expand All @@ -108,7 +116,7 @@ public DrillSideways(
FacetsConfig config,
TaxonomyReader taxoReader,
SortedSetDocValuesReaderState state) {
this(searcher, config, taxoReader, state, null);
this(searcher, config, taxoReader, state, null, false);
}

/**
Expand All @@ -123,11 +131,28 @@ public DrillSideways(
TaxonomyReader taxoReader,
SortedSetDocValuesReaderState state,
ExecutorService executor) {
this(searcher, config, taxoReader, state, executor, false);
}

/**
* Create a new {@code DrillSideways} instance, where some dimensions were indexed with {@link
* SortedSetDocValuesFacetField} and others were indexed with {@link FacetField}.
*
* <p>Use this constructor to use the concurrent implementation
*/
public DrillSideways(
IndexSearcher searcher,
FacetsConfig config,
TaxonomyReader taxoReader,
SortedSetDocValuesReaderState state,
ExecutorService executor,
boolean allowEarlyTermination) {
this.searcher = searcher;
this.config = config;
this.taxoReader = taxoReader;
this.state = state;
this.executor = executor;
this.allowEarlyTermination = allowEarlyTermination;
}

/**
Expand Down Expand Up @@ -333,7 +358,10 @@ public <R> ConcurrentDrillSidewaysResult<R> search(
if (drillDownFacetsCollectorManager != null) {
// Make sure we populate a facet collector corresponding to the base query if desired:
mainCollectorManager =
new MultiCollectorManager(drillDownFacetsCollectorManager, hitCollectorManager);
allowEarlyTermination
? new EarlyTerminatingFacetsAndHitsCollectorManager<>(
drillDownFacetsCollectorManager, hitCollectorManager)
: new MultiCollectorManager(drillDownFacetsCollectorManager, hitCollectorManager);
} else {
mainCollectorManager = hitCollectorManager;
}
Expand Down Expand Up @@ -542,4 +570,81 @@ public static class ConcurrentDrillSidewaysResult<R> extends DrillSidewaysResult
this.collectorResult = collectorResult;
}
}

static class EarlyTerminatingFacetsAndHitsCollectorManager<C extends Collector, T>
implements CollectorManager<
EarlyTerminatingFacetsAndHitsCollectorManager.FacetAndHitsCollector<C>, Object[]> {

private final FacetsCollectorManager facetsCollectorManager;
private final CollectorManager<C, T> hitCollectorManager;

public EarlyTerminatingFacetsAndHitsCollectorManager(
FacetsCollectorManager facetsCollectorManager, CollectorManager<C, T> hitCollectorManager) {
this.facetsCollectorManager = facetsCollectorManager;
this.hitCollectorManager = hitCollectorManager;
}

@Override
public FacetAndHitsCollector<C> newCollector() throws IOException {
return new FacetAndHitsCollector<>(
facetsCollectorManager.newCollector(), hitCollectorManager.newCollector());
}

@Override
public Object[] reduce(Collection<FacetAndHitsCollector<C>> collectors) throws IOException {
List<C> hitCollectors = new ArrayList<>(collectors.size());
List<FacetsCollector> facetsCollectors = new ArrayList<>(collectors.size());
for (FacetAndHitsCollector<C> collector : collectors) {
hitCollectors.add(collector.hitCollector);
facetsCollectors.add(collector.facetsCollector);
}
FacetsCollector facetsCollectorResult = facetsCollectorManager.reduce(facetsCollectors);
T hitCollectorResult = hitCollectorManager.reduce(hitCollectors);
return new Object[] {facetsCollectorResult, hitCollectorResult};
}

record FacetAndHitsCollector<C extends Collector>(
FacetsCollector facetsCollector, C hitCollector) implements Collector {

@Override
public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException {
final LeafCollector hitCollectorLeaf = hitCollector.getLeafCollector(context);
final LeafCollector facetCollectorLeaf = facetsCollector.getLeafCollector(context);
return new LeafCollector() {
@Override
public void setScorer(Scorable scorer) throws IOException {
hitCollectorLeaf.setScorer(scorer);
facetCollectorLeaf.setScorer(scorer);
}

@Override
public void collect(int doc) throws IOException {
hitCollectorLeaf.collect(
doc); // Any exception in the hit collector will terminate facets collection too
facetCollectorLeaf.collect(doc);
}
};
}

@Override
public ScoreMode scoreMode() {
ScoreMode hitCollectorScoreMode = hitCollector.scoreMode();
ScoreMode facetCollectorScoreMode = facetsCollector.scoreMode();
if (hitCollectorScoreMode == facetCollectorScoreMode) {
return hitCollectorScoreMode;
}
if (hitCollectorScoreMode.needsScores() || facetCollectorScoreMode.needsScores()) {
return ScoreMode.COMPLETE;
} else {
return ScoreMode.COMPLETE_NO_SCORES;
}
}

@Override
public void setWeight(Weight weight) {
hitCollector.setWeight(weight);
facetsCollector.setWeight(weight);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
Expand Down Expand Up @@ -104,6 +106,11 @@ protected DrillSideways getNewDrillSideways(
return new DrillSideways(searcher, config, taxoReader);
}

protected DrillSideways getNewDrillSidewaysWithEarlyTermination(
IndexSearcher searcher, FacetsConfig config, TaxonomyReader taxoReader) {
return new DrillSideways(searcher, config, taxoReader, null, null, true);
}

protected DrillSideways getNewDrillSidewaysScoreSubdocsAtOnce(
IndexSearcher searcher, FacetsConfig config, TaxonomyReader taxoReader) {
return new DrillSideways(searcher, config, taxoReader) {
Expand Down Expand Up @@ -980,6 +987,89 @@ public void testMultipleRequestsPerDim() throws Exception {
IOUtils.close(searcher.getIndexReader(), taxoReader, taxoWriter, dir, taxoDir);
}

public void testEarlyTermination() throws Exception {
Directory dir = newDirectory();
Directory taxoDir = newDirectory();

// Writes facet ords to a separate directory from the
// main index:
DirectoryTaxonomyWriter taxoWriter =
new DirectoryTaxonomyWriter(taxoDir, IndexWriterConfig.OpenMode.CREATE);

FacetsConfig config = new FacetsConfig();

RandomIndexWriter writer = new RandomIndexWriter(random(), dir);

Document doc = new Document();
doc.add(new FacetField("Author", "Bob"));
doc.add(new FacetField("Publisher", "foo"));
writer.addDocument(config.build(taxoWriter, doc));

for (int i = 0; i < 5; i++) {
doc = new Document();
doc.add(new FacetField("Author", "Lisa"));
doc.add(new FacetField("Publisher", "foo"));
writer.addDocument(config.build(taxoWriter, doc));
}

// NRT open
IndexSearcher searcher = getNewSearcher(writer.getReader());

// NRT open
TaxonomyReader taxoReader = new DirectoryTaxonomyReader(taxoWriter);

// Run all the basic test cases with a standard DrillSideways implementation:
DrillSideways ds = getNewDrillSidewaysWithEarlyTermination(searcher, config, taxoReader);
DrillDownQuery ddq = new DrillDownQuery(config);
ddq.add("Publisher", "foo");
AtomicInteger docsCollected = new AtomicInteger(0);
AtomicBoolean earlyTerminated = new AtomicBoolean(false);
int maxDocsForEarlyTermination = 3;
AtomicInteger canCollectDocs = new AtomicInteger(maxDocsForEarlyTermination);
DrillSidewaysResult result =
ds.search(
ddq,
new SimpleCollectorManager(2, Comparator.comparing(cr -> cr.docAndScore.doc)) {
@Override
public SimpleCollector newCollector() {
return new SimpleCollector(ScoreMode.COMPLETE_NO_SCORES) {
@Override
public LeafCollector getLeafCollector(LeafReaderContext context)
throws IOException {
return new SimpleLeafCollector() {
@Override
public void setScorer(Scorable scorer) {
super.scorer = scorer;
}

@Override
public void collect(int doc) throws IOException {
if (canCollectDocs.decrementAndGet() < 0) {
earlyTerminated.set(true);
throw new CollectionTerminatedException();
}
docsCollected.incrementAndGet();
}
};
}
};
}
});
// sanity check that the hits collector early terminated at 3
assertTrue("Expecting early termination", earlyTerminated.get());
assertEquals(
"Expecting num docs collected to be 3", maxDocsForEarlyTermination, docsCollected.get());

// Facets should have early terminated
assertEquals(
"Early termination didn't stop facet collection",
maxDocsForEarlyTermination,
result.facets.getTopChildren(10, "Publisher").value);

writer.close();
IOUtils.close(searcher.getIndexReader(), taxoReader, taxoWriter, dir, taxoDir);
}

private static class Doc implements Comparable<Doc> {
String id;
String contentToken;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ protected DrillSideways getNewDrillSideways(
return new DrillSideways(searcher, config, taxoReader, null, executor);
}

protected DrillSideways getNewDrillSidewaysWithEarlyTermination(
IndexSearcher searcher, FacetsConfig config, TaxonomyReader taxoReader) {
return new DrillSideways(searcher, config, taxoReader, null, executor, true);
}

@Override
protected DrillSideways getNewDrillSidewaysScoreSubdocsAtOnce(
IndexSearcher searcher, FacetsConfig config, TaxonomyReader taxoReader) {
Expand Down
Loading