diff --git a/lucene/facet/src/java/org/apache/lucene/facet/DrillSideways.java b/lucene/facet/src/java/org/apache/lucene/facet/DrillSideways.java index 576aa84d51fc..3e4757513ed1 100644 --- a/lucene/facet/src/java/org/apache/lucene/facet/DrillSideways.java +++ b/lucene/facet/src/java/org/apache/lucene/facet/DrillSideways.java @@ -18,6 +18,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,20 +31,25 @@ import org.apache.lucene.facet.sortedset.SortedSetDocValuesReaderState; import org.apache.lucene.facet.taxonomy.FastTaxonomyFacetCounts; import org.apache.lucene.facet.taxonomy.TaxonomyReader; +import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.search.Collector; import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.LeafCollector; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MultiCollectorManager; import org.apache.lucene.search.Query; +import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreDoc; +import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Sort; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.search.TopFieldCollectorManager; import org.apache.lucene.search.TopFieldDocs; import org.apache.lucene.search.TopScoreDocCollectorManager; +import org.apache.lucene.search.Weight; import org.apache.lucene.util.ThreadInterruptedException; /** @@ -85,6 +91,8 @@ public class DrillSideways { /** (optional) {@link ExecutorService} used for "concurrent" drill sideways if desired. */ private final ExecutorService executor; + private final boolean allowEarlyTermination; + /** Create a new {@code DrillSideways} instance. */ public DrillSideways(IndexSearcher searcher, FacetsConfig config, TaxonomyReader taxoReader) { this(searcher, config, taxoReader, null); @@ -108,7 +116,7 @@ public DrillSideways( FacetsConfig config, TaxonomyReader taxoReader, SortedSetDocValuesReaderState state) { - this(searcher, config, taxoReader, state, null); + this(searcher, config, taxoReader, state, null, false); } /** @@ -123,11 +131,28 @@ public DrillSideways( TaxonomyReader taxoReader, SortedSetDocValuesReaderState state, ExecutorService executor) { + this(searcher, config, taxoReader, state, executor, false); + } + + /** + * Create a new {@code DrillSideways} instance, where some dimensions were indexed with {@link + * SortedSetDocValuesFacetField} and others were indexed with {@link FacetField}. + * + *

Use this constructor to use the concurrent implementation + */ + public DrillSideways( + IndexSearcher searcher, + FacetsConfig config, + TaxonomyReader taxoReader, + SortedSetDocValuesReaderState state, + ExecutorService executor, + boolean allowEarlyTermination) { this.searcher = searcher; this.config = config; this.taxoReader = taxoReader; this.state = state; this.executor = executor; + this.allowEarlyTermination = allowEarlyTermination; } /** @@ -333,7 +358,10 @@ public ConcurrentDrillSidewaysResult search( if (drillDownFacetsCollectorManager != null) { // Make sure we populate a facet collector corresponding to the base query if desired: mainCollectorManager = - new MultiCollectorManager(drillDownFacetsCollectorManager, hitCollectorManager); + allowEarlyTermination + ? new EarlyTerminatingFacetsAndHitsCollectorManager<>( + drillDownFacetsCollectorManager, hitCollectorManager) + : new MultiCollectorManager(drillDownFacetsCollectorManager, hitCollectorManager); } else { mainCollectorManager = hitCollectorManager; } @@ -542,4 +570,81 @@ public static class ConcurrentDrillSidewaysResult extends DrillSidewaysResult this.collectorResult = collectorResult; } } + + static class EarlyTerminatingFacetsAndHitsCollectorManager + implements CollectorManager< + EarlyTerminatingFacetsAndHitsCollectorManager.FacetAndHitsCollector, Object[]> { + + private final FacetsCollectorManager facetsCollectorManager; + private final CollectorManager hitCollectorManager; + + public EarlyTerminatingFacetsAndHitsCollectorManager( + FacetsCollectorManager facetsCollectorManager, CollectorManager hitCollectorManager) { + this.facetsCollectorManager = facetsCollectorManager; + this.hitCollectorManager = hitCollectorManager; + } + + @Override + public FacetAndHitsCollector newCollector() throws IOException { + return new FacetAndHitsCollector<>( + facetsCollectorManager.newCollector(), hitCollectorManager.newCollector()); + } + + @Override + public Object[] reduce(Collection> collectors) throws IOException { + List hitCollectors = new ArrayList<>(collectors.size()); + List facetsCollectors = new ArrayList<>(collectors.size()); + for (FacetAndHitsCollector collector : collectors) { + hitCollectors.add(collector.hitCollector); + facetsCollectors.add(collector.facetsCollector); + } + FacetsCollector facetsCollectorResult = facetsCollectorManager.reduce(facetsCollectors); + T hitCollectorResult = hitCollectorManager.reduce(hitCollectors); + return new Object[] {facetsCollectorResult, hitCollectorResult}; + } + + record FacetAndHitsCollector( + FacetsCollector facetsCollector, C hitCollector) implements Collector { + + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { + final LeafCollector hitCollectorLeaf = hitCollector.getLeafCollector(context); + final LeafCollector facetCollectorLeaf = facetsCollector.getLeafCollector(context); + return new LeafCollector() { + @Override + public void setScorer(Scorable scorer) throws IOException { + hitCollectorLeaf.setScorer(scorer); + facetCollectorLeaf.setScorer(scorer); + } + + @Override + public void collect(int doc) throws IOException { + hitCollectorLeaf.collect( + doc); // Any exception in the hit collector will terminate facets collection too + facetCollectorLeaf.collect(doc); + } + }; + } + + @Override + public ScoreMode scoreMode() { + ScoreMode hitCollectorScoreMode = hitCollector.scoreMode(); + ScoreMode facetCollectorScoreMode = facetsCollector.scoreMode(); + if (hitCollectorScoreMode == facetCollectorScoreMode) { + return hitCollectorScoreMode; + } + if (hitCollectorScoreMode.needsScores() || facetCollectorScoreMode.needsScores()) { + return ScoreMode.COMPLETE; + } else { + return ScoreMode.COMPLETE_NO_SCORES; + } + } + + @Override + public void setWeight(Weight weight) { + hitCollector.setWeight(weight); + facetsCollector.setWeight(weight); + } + } + } } diff --git a/lucene/facet/src/test/org/apache/lucene/facet/TestDrillSideways.java b/lucene/facet/src/test/org/apache/lucene/facet/TestDrillSideways.java index 74205522451f..e599215c715b 100644 --- a/lucene/facet/src/test/org/apache/lucene/facet/TestDrillSideways.java +++ b/lucene/facet/src/test/org/apache/lucene/facet/TestDrillSideways.java @@ -29,6 +29,8 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -104,6 +106,11 @@ protected DrillSideways getNewDrillSideways( return new DrillSideways(searcher, config, taxoReader); } + protected DrillSideways getNewDrillSidewaysWithEarlyTermination( + IndexSearcher searcher, FacetsConfig config, TaxonomyReader taxoReader) { + return new DrillSideways(searcher, config, taxoReader, null, null, true); + } + protected DrillSideways getNewDrillSidewaysScoreSubdocsAtOnce( IndexSearcher searcher, FacetsConfig config, TaxonomyReader taxoReader) { return new DrillSideways(searcher, config, taxoReader) { @@ -980,6 +987,89 @@ public void testMultipleRequestsPerDim() throws Exception { IOUtils.close(searcher.getIndexReader(), taxoReader, taxoWriter, dir, taxoDir); } + public void testEarlyTermination() throws Exception { + Directory dir = newDirectory(); + Directory taxoDir = newDirectory(); + + // Writes facet ords to a separate directory from the + // main index: + DirectoryTaxonomyWriter taxoWriter = + new DirectoryTaxonomyWriter(taxoDir, IndexWriterConfig.OpenMode.CREATE); + + FacetsConfig config = new FacetsConfig(); + + RandomIndexWriter writer = new RandomIndexWriter(random(), dir); + + Document doc = new Document(); + doc.add(new FacetField("Author", "Bob")); + doc.add(new FacetField("Publisher", "foo")); + writer.addDocument(config.build(taxoWriter, doc)); + + for (int i = 0; i < 5; i++) { + doc = new Document(); + doc.add(new FacetField("Author", "Lisa")); + doc.add(new FacetField("Publisher", "foo")); + writer.addDocument(config.build(taxoWriter, doc)); + } + + // NRT open + IndexSearcher searcher = getNewSearcher(writer.getReader()); + + // NRT open + TaxonomyReader taxoReader = new DirectoryTaxonomyReader(taxoWriter); + + // Run all the basic test cases with a standard DrillSideways implementation: + DrillSideways ds = getNewDrillSidewaysWithEarlyTermination(searcher, config, taxoReader); + DrillDownQuery ddq = new DrillDownQuery(config); + ddq.add("Publisher", "foo"); + AtomicInteger docsCollected = new AtomicInteger(0); + AtomicBoolean earlyTerminated = new AtomicBoolean(false); + int maxDocsForEarlyTermination = 3; + AtomicInteger canCollectDocs = new AtomicInteger(maxDocsForEarlyTermination); + DrillSidewaysResult result = + ds.search( + ddq, + new SimpleCollectorManager(2, Comparator.comparing(cr -> cr.docAndScore.doc)) { + @Override + public SimpleCollector newCollector() { + return new SimpleCollector(ScoreMode.COMPLETE_NO_SCORES) { + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) + throws IOException { + return new SimpleLeafCollector() { + @Override + public void setScorer(Scorable scorer) { + super.scorer = scorer; + } + + @Override + public void collect(int doc) throws IOException { + if (canCollectDocs.decrementAndGet() < 0) { + earlyTerminated.set(true); + throw new CollectionTerminatedException(); + } + docsCollected.incrementAndGet(); + } + }; + } + }; + } + }); + // sanity check that the hits collector early terminated at 3 + assertTrue("Expecting early termination", earlyTerminated.get()); + assertEquals( + "Expecting num docs collected to be 3", maxDocsForEarlyTermination, docsCollected.get()); + + // Facets should have early terminated + assertEquals( + "Early termination didn't stop facet collection", + maxDocsForEarlyTermination, + result.facets.getTopChildren(10, "Publisher").value); + + writer.close(); + IOUtils.close(searcher.getIndexReader(), taxoReader, taxoWriter, dir, taxoDir); + } + private static class Doc implements Comparable { String id; String contentToken; diff --git a/lucene/facet/src/test/org/apache/lucene/facet/TestParallelDrillSideways.java b/lucene/facet/src/test/org/apache/lucene/facet/TestParallelDrillSideways.java index 3a2ac651ef8f..3920c8ab5048 100644 --- a/lucene/facet/src/test/org/apache/lucene/facet/TestParallelDrillSideways.java +++ b/lucene/facet/src/test/org/apache/lucene/facet/TestParallelDrillSideways.java @@ -55,6 +55,11 @@ protected DrillSideways getNewDrillSideways( return new DrillSideways(searcher, config, taxoReader, null, executor); } + protected DrillSideways getNewDrillSidewaysWithEarlyTermination( + IndexSearcher searcher, FacetsConfig config, TaxonomyReader taxoReader) { + return new DrillSideways(searcher, config, taxoReader, null, executor, true); + } + @Override protected DrillSideways getNewDrillSidewaysScoreSubdocsAtOnce( IndexSearcher searcher, FacetsConfig config, TaxonomyReader taxoReader) {