Skip to content

Commit

Permalink
Disable intra-merge parallelism for all structures but kNN vectors (#…
Browse files Browse the repository at this point in the history
…13799)

After adjusting tests that truly exercise intra-merge parallelism, more issues have arisen. See: #13798

To be risk adverse & due to the soon to be released/freezed Lucene 10 & 9.12, I am reverting all intra-merge parallelism, except for the parallelism when merging HNSW graphs.

Merging other structures was never really enabled in a release (we disabled it in a bugfix for Lucene 9.11). While this is frustrating as it seems like we leaving lots of perf on the floor, I am err'ing on the side of safety here.

In Lucene 10, we can work on incrementally reenabling intra-merge parallelism.

closes: #13798
  • Loading branch information
benwtrent committed Sep 18, 2024
1 parent 71ca6b4 commit b467a2b
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 75 deletions.
2 changes: 2 additions & 0 deletions lucene/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ Bug Fixes
`IndexWriter.forceMerge` or
`IndexWriter.addIndexes(CodecReader...)`, or reindexing entirely.

* GITHUB#13799: Disable intra-merge parallelism for all structures but kNN vectors. (Ben Trent)

Build
---------------------

Expand Down
53 changes: 1 addition & 52 deletions lucene/core/src/java/org/apache/lucene/index/MergeState.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
*
* @lucene.experimental
*/
public class MergeState implements Cloneable {
public class MergeState {

/** Maps document IDs from old segments to document IDs in the new segment */
public final DocMap[] docMaps;
Expand Down Expand Up @@ -302,55 +302,4 @@ public MergeState(
this.intraMergeTaskExecutor = intraMergeTaskExecutor;
this.needsIndexSort = needsIndexSort;
}

@Override
public MergeState clone() {
StoredFieldsReader[] storedFieldsReaders = this.storedFieldsReaders.clone();
TermVectorsReader[] termVectorsReaders = this.termVectorsReaders.clone();
NormsProducer[] normsProducers = this.normsProducers.clone();
DocValuesProducer[] docValuesProducers = this.docValuesProducers.clone();
FieldsProducer[] fieldsProducers = this.fieldsProducers.clone();
PointsReader[] pointsReaders = this.pointsReaders.clone();
KnnVectorsReader[] knnVectorsReaders = this.knnVectorsReaders.clone();
for (int i = 0; i < storedFieldsReaders.length; ++i) {
if (storedFieldsReaders[i] != null) {
storedFieldsReaders[i] = storedFieldsReaders[i].getMergeInstance();
}
if (termVectorsReaders[i] != null) {
termVectorsReaders[i] = termVectorsReaders[i].getMergeInstance();
}
if (normsProducers[i] != null) {
normsProducers[i] = normsProducers[i].getMergeInstance();
}
if (docValuesProducers[i] != null) {
docValuesProducers[i] = docValuesProducers[i].getMergeInstance();
}
if (fieldsProducers[i] != null) {
fieldsProducers[i] = fieldsProducers[i].getMergeInstance();
}
if (pointsReaders[i] != null) {
pointsReaders[i] = pointsReaders[i].getMergeInstance();
}
if (knnVectorsReaders[i] != null) {
knnVectorsReaders[i] = knnVectorsReaders[i].getMergeInstance();
}
}
return new MergeState(
docMaps,
segmentInfo,
mergeFieldInfos,
storedFieldsReaders,
termVectorsReaders,
normsProducers,
docValuesProducers,
fieldInfos,
liveDocs,
fieldsProducers,
pointsReaders,
knnVectorsReaders,
maxDocs,
infoStream,
intraMergeTaskExecutor,
needsIndexSort);
}
}
26 changes: 3 additions & 23 deletions lucene/core/src/java/org/apache/lucene/index/SegmentMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package org.apache.lucene.index;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.codecs.Codec;
Expand All @@ -31,7 +29,6 @@
import org.apache.lucene.codecs.PointsWriter;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.TermVectorsWriter;
import org.apache.lucene.search.TaskExecutor;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.InfoStream;
Expand Down Expand Up @@ -102,12 +99,7 @@ boolean shouldMerge() {
}

private MergeState mergeState() {
MergeState mergeState = this.mergeState;
if (Thread.currentThread() != mergeStateCreationThread) {
// Most merges, e.g. small merges, run in the same thread, so save the cost of pulling a clone
// in that case.
mergeState = mergeState.clone();
}
assert Thread.currentThread() == mergeStateCreationThread;
return mergeState;
}

Expand Down Expand Up @@ -147,8 +139,6 @@ MergeState merge() throws IOException {
IOContext.READ,
segmentWriteState.segmentSuffix);

TaskExecutor taskExecutor = new TaskExecutor(mergeState.intraMergeTaskExecutor);
List<Callable<Void>> mergingTasks = new ArrayList<>();
if (mergeState.mergeFieldInfos.hasNorms()) {
mergeWithLogging(this::mergeNorms, segmentWriteState, segmentReadState, "norms", numMerged);
}
Expand All @@ -161,12 +151,7 @@ MergeState merge() throws IOException {
}

if (mergeState.mergeFieldInfos.hasPointValues()) {
mergingTasks.add(
() -> {
mergeWithLogging(
this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged);
return null;
});
mergeWithLogging(this::mergePoints, segmentWriteState, segmentReadState, "points", numMerged);
}

if (mergeState.mergeFieldInfos.hasVectorValues()) {
Expand All @@ -179,14 +164,9 @@ MergeState merge() throws IOException {
}

if (mergeState.mergeFieldInfos.hasVectors()) {
mergingTasks.add(
() -> {
mergeWithLogging(this::mergeTermVectors, "term vectors");
return null;
});
mergeWithLogging(this::mergeTermVectors, "term vectors");
}

taskExecutor.invokeAll(mergingTasks);
// write the merged infos
mergeWithLogging(
this::mergeFieldInfos, segmentWriteState, segmentReadState, "field infos", numMerged);
Expand Down

0 comments on commit b467a2b

Please sign in to comment.