From aae28f214bd81a0d500d7d4682242625c61d2331 Mon Sep 17 00:00:00 2001 From: guojialiang Date: Tue, 2 Dec 2025 16:52:38 +0800 Subject: [PATCH 01/12] Avoid data loss in vanilla segment replication. Signed-off-by: guojialiang --- .../SegmentReplicationPrimaryPromotionIT.java | 163 ++++++++++++++++++ .../opensearch/index/shard/IndexShard.java | 8 + .../indices/replication/common/CopyState.java | 13 +- 3 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java new file mode 100644 index 0000000000000..42b2058f61873 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java @@ -0,0 +1,163 @@ +package org.opensearch.indices.replication; + +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.replication.TransportReplicationAction; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.engine.Engine; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.engine.EngineException; +import org.opensearch.index.engine.EngineFactory; +import org.opensearch.index.engine.InternalEngine; +import org.opensearch.index.engine.NRTReplicationEngine; +import org.opensearch.indices.replication.checkpoint.PublishCheckpointAction; +import org.opensearch.plugins.EnginePlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalTestCluster; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.transport.MockTransportService; +import org.opensearch.transport.RemoteTransportException; +import org.opensearch.transport.TransportService; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationPrimaryPromotionIT extends SegmentReplicationBaseIT { + public static boolean lockEnable = false; + public static CountDownLatch indexLuceneLatch = new CountDownLatch(1); + public static CountDownLatch flushLatch = new CountDownLatch(1); + public static CountDownLatch refreshLatch = new CountDownLatch(1); + + @Before + private void setup() { + internalCluster().startClusterManagerOnlyNode(); + } + + @Override + protected Collection> getMockPlugins() { + List> plugins = super.getMockPlugins().stream() + .filter(plugin -> !plugin.getName().contains("MockEngineFactoryPlugin")) + .collect(java.util.stream.Collectors.toList()); + plugins.add(MockEnginePlugin.class); + return plugins; + } + + public static class MockEnginePlugin extends Plugin implements EnginePlugin { + @Override + public Optional getEngineFactory(final IndexSettings indexSettings) { + return Optional.of(new MockEngineFactory()); + } + } + + public static class MockEngineFactory implements EngineFactory { + @Override + public Engine newReadWriteEngine(EngineConfig config) { + return config.isReadOnlyReplica() ? new MockNRTReplicationEngine(config) : new MockInternalEngine(config); + } + } + + public static class MockInternalEngine extends InternalEngine { + MockInternalEngine(EngineConfig config) throws EngineException { + super(config); + } + + @Override + protected long generateSeqNoForOperationOnPrimary(final Operation operation) { + long seqNo = super.generateSeqNoForOperationOnPrimary(operation); + try { + if (lockEnable) { + flushLatch.countDown(); + indexLuceneLatch.await(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return seqNo; + } + } + + public static class MockNRTReplicationEngine extends NRTReplicationEngine { + MockNRTReplicationEngine(EngineConfig config) throws EngineException { + super(config); + } + + @Override + public IndexResult index(Index index) throws IOException { + IndexResult indexResult = super.index(index); + if (lockEnable) { + refreshLatch.countDown(); + } + return indexResult; + } + } + + // Used to test that primary promotion does not result in data loss. + public void testPrimaryStopped_ReplicaPromoted_no_data_loss() throws Exception { + final String primary = internalCluster().startDataOnlyNode(); + createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put("index.refresh_interval", -1).build()); + ensureYellowAndNoInitializingShards(INDEX_NAME); + final String replica = internalCluster().startDataOnlyNode(); + ensureGreen(INDEX_NAME); + + client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").get(); + lockEnable = true; + Thread writeThread = new Thread(() -> { client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").get(); }); + writeThread.start(); + flushLatch.await(); + + flush(INDEX_NAME); + + waitForSearchableDocs(1, replica); + + // mock network exception + MockTransportService replicaTransportService = ((MockTransportService) internalCluster().getInstance( + TransportService.class, + replica + )); + AtomicBoolean mockReplicaReceivePublishCheckpointException = new AtomicBoolean(true); + replicaTransportService.addRequestHandlingBehavior( + PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX, + (handler, request, channel, task) -> { + if (mockReplicaReceivePublishCheckpointException.get()) { + logger.info("mock remote transport exception"); + throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException()); + } + logger.info("replica receive publish checkpoint request"); + handler.messageReceived(request, channel, task); + } + ); + + refresh(INDEX_NAME); + waitForSearchableDocs(1, primary); + indexLuceneLatch.countDown(); + refreshLatch.await(); + writeThread.join(); + + logger.info("refresh index"); + refresh(INDEX_NAME); + flush(INDEX_NAME); + waitForSearchableDocs(2, primary); + waitForSearchableDocs(1, replica); + + // stop the primary node - we only have one shard on here. + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary)); + ensureYellowAndNoInitializingShards(INDEX_NAME); + + final ShardRouting replicaShardRouting = getShardRoutingForNodeName(replica); + assertNotNull(replicaShardRouting); + assertTrue(replicaShardRouting + " should be promoted as a primary", replicaShardRouting.primary()); + + refresh(INDEX_NAME); + SearchResponse response = client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); + assertTrue(response.getHits().getTotalHits().value() == 2); + replicaTransportService.clearAllRules(); + } +} diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index f3fe60d70d532..a1046c8e66916 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -135,6 +135,7 @@ import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.engine.IngestionEngine; +import org.opensearch.index.engine.InternalEngine; import org.opensearch.index.engine.MergedSegmentWarmerFactory; import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.ReadOnlyEngine; @@ -4024,6 +4025,13 @@ protected Engine getEngineOrNull() { return this.currentEngineReference.get(); } + // Only used for initializing segment replication CopyState + public long getLastRefreshedCheckpoint() { + Engine engine = getEngine(); + assert engine instanceof InternalEngine; + return ((InternalEngine) engine).lastRefreshedCheckpoint(); + } + public void startRecovery( RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService, diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 7d3eb9083208b..6abb3de478642 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -13,6 +13,7 @@ import org.apache.lucene.store.ByteBuffersIndexOutput; import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -22,6 +23,8 @@ import java.io.UncheckedIOException; import java.util.Map; +import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; + /** * An Opensearch-specific version of Lucene's CopyState class that * holds incRef'd file level details for one point-in-time segment infos. @@ -38,15 +41,23 @@ public class CopyState implements Closeable { public CopyState(IndexShard shard) throws IOException { this.shard = shard; + long lastRefreshedCheckpoint = shard.getLastRefreshedCheckpoint(); final Tuple, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = shard .getLatestSegmentInfosAndCheckpoint(); this.segmentInfosRef = latestSegmentInfosAndCheckpoint.v1(); this.replicationCheckpoint = latestSegmentInfosAndCheckpoint.v2(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); + + SegmentInfos segmentInfosSnapshot = segmentInfos.clone(); + Map userData = segmentInfosSnapshot.getUserData(); + userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint)); + userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint)); + segmentInfosSnapshot.setUserData(userData, false); + ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); // resource description and name are not used, but resource description cannot be null try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { - segmentInfos.write(indexOutput); + segmentInfosSnapshot.write(indexOutput); } this.infosBytes = buffer.toArrayCopy(); } From 88ba97cb4f78d3739a5d028decdfe4f1cd8ade91 Mon Sep 17 00:00:00 2001 From: guojialiang Date: Tue, 2 Dec 2025 21:54:52 +0800 Subject: [PATCH 02/12] update Signed-off-by: guojialiang --- .../SegmentReplicationPrimaryPromotionIT.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java index 42b2058f61873..573c9fbd9198e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java @@ -1,3 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + package org.opensearch.indices.replication; import org.opensearch.action.search.SearchResponse; @@ -27,7 +35,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationPrimaryPromotionIT extends SegmentReplicationBaseIT { @@ -122,16 +129,10 @@ public void testPrimaryStopped_ReplicaPromoted_no_data_loss() throws Exception { TransportService.class, replica )); - AtomicBoolean mockReplicaReceivePublishCheckpointException = new AtomicBoolean(true); replicaTransportService.addRequestHandlingBehavior( PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX, (handler, request, channel, task) -> { - if (mockReplicaReceivePublishCheckpointException.get()) { - logger.info("mock remote transport exception"); - throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException()); - } - logger.info("replica receive publish checkpoint request"); - handler.messageReceived(request, channel, task); + throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException()); } ); From 1aa8d32a4d7bd1943ee4517e54e5395d2561a6f5 Mon Sep 17 00:00:00 2001 From: guojialiang Date: Tue, 2 Dec 2025 22:05:03 +0800 Subject: [PATCH 03/12] update Signed-off-by: guojialiang --- .../replication/SegmentReplicationPrimaryPromotionIT.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java index 573c9fbd9198e..470ccfe9f8370 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationPrimaryPromotionIT extends SegmentReplicationBaseIT { @@ -118,7 +119,7 @@ public void testPrimaryStopped_ReplicaPromoted_no_data_loss() throws Exception { lockEnable = true; Thread writeThread = new Thread(() -> { client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").get(); }); writeThread.start(); - flushLatch.await(); + assertTrue("flushLatch timed out", flushLatch.await(30, TimeUnit.SECONDS)); flush(INDEX_NAME); @@ -139,7 +140,7 @@ public void testPrimaryStopped_ReplicaPromoted_no_data_loss() throws Exception { refresh(INDEX_NAME); waitForSearchableDocs(1, primary); indexLuceneLatch.countDown(); - refreshLatch.await(); + assertTrue("refreshLatch timed out", refreshLatch.await(30, TimeUnit.SECONDS)); writeThread.join(); logger.info("refresh index"); @@ -158,7 +159,7 @@ public void testPrimaryStopped_ReplicaPromoted_no_data_loss() throws Exception { refresh(INDEX_NAME); SearchResponse response = client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(); - assertTrue(response.getHits().getTotalHits().value() == 2); + assertEquals(2L, response.getHits().getTotalHits().value()); replicaTransportService.clearAllRules(); } } From 14ce5db813e5ea1b736c5f65a52046b4f97485c4 Mon Sep 17 00:00:00 2001 From: guojialiang Date: Tue, 2 Dec 2025 22:07:59 +0800 Subject: [PATCH 04/12] update Signed-off-by: guojialiang --- .../replication/SegmentReplicationPrimaryPromotionIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java index 470ccfe9f8370..93a3f1c14abb7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java @@ -45,7 +45,7 @@ public class SegmentReplicationPrimaryPromotionIT extends SegmentReplicationBase public static CountDownLatch refreshLatch = new CountDownLatch(1); @Before - private void setup() { + public void setup() { internalCluster().startClusterManagerOnlyNode(); } From 810aa28ee973c53da143cf9c7ed2045ac060e06f Mon Sep 17 00:00:00 2001 From: guojialiang Date: Tue, 2 Dec 2025 22:20:49 +0800 Subject: [PATCH 05/12] update Signed-off-by: guojialiang --- .../SegmentReplicationPrimaryPromotionIT.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java index 93a3f1c14abb7..63c00a8537997 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java @@ -39,13 +39,17 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationPrimaryPromotionIT extends SegmentReplicationBaseIT { - public static boolean lockEnable = false; - public static CountDownLatch indexLuceneLatch = new CountDownLatch(1); - public static CountDownLatch flushLatch = new CountDownLatch(1); - public static CountDownLatch refreshLatch = new CountDownLatch(1); + private static boolean lockEnable; + private static CountDownLatch indexLuceneLatch; + private static CountDownLatch flushLatch; + private static CountDownLatch refreshLatch; @Before public void setup() { + lockEnable = false; + indexLuceneLatch = new CountDownLatch(1); + flushLatch = new CountDownLatch(1); + refreshLatch = new CountDownLatch(1); internalCluster().startClusterManagerOnlyNode(); } From 5b603a859c9c05f54a513fa8edcce8c8894f0ec1 Mon Sep 17 00:00:00 2001 From: guojialiang Date: Wed, 3 Dec 2025 10:58:22 +0800 Subject: [PATCH 06/12] update Signed-off-by: guojialiang --- .../replication/SegmentReplicationPrimaryPromotionIT.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java index 63c00a8537997..5b97d69e305f1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java @@ -87,7 +87,7 @@ protected long generateSeqNoForOperationOnPrimary(final Operation operation) { try { if (lockEnable) { flushLatch.countDown(); - indexLuceneLatch.await(); + assertTrue("indexLuceneLatch timed out", indexLuceneLatch.await(30, TimeUnit.SECONDS)); } } catch (Exception e) { throw new RuntimeException(e); @@ -129,7 +129,8 @@ public void testPrimaryStopped_ReplicaPromoted_no_data_loss() throws Exception { waitForSearchableDocs(1, replica); - // mock network exception + // The exception is thrown unconditionally to simulate persistent publish-checkpoint failures + // until the primary is stopped and the replica is promoted. MockTransportService replicaTransportService = ((MockTransportService) internalCluster().getInstance( TransportService.class, replica @@ -145,7 +146,8 @@ public void testPrimaryStopped_ReplicaPromoted_no_data_loss() throws Exception { waitForSearchableDocs(1, primary); indexLuceneLatch.countDown(); assertTrue("refreshLatch timed out", refreshLatch.await(30, TimeUnit.SECONDS)); - writeThread.join(); + writeThread.join(TimeUnit.SECONDS.toMillis(30)); + assertFalse("writeThread did not complete in time", writeThread.isAlive()); logger.info("refresh index"); refresh(INDEX_NAME); From 8afe73cf214cb2dbaacb4c70d3272e2f1a9e6b04 Mon Sep 17 00:00:00 2001 From: guojialiang Date: Thu, 4 Dec 2025 16:27:45 +0800 Subject: [PATCH 07/12] If local segment replication is enabled, a force flush needs to be performed after translog recovery. Signed-off-by: guojialiang --- .../main/java/org/opensearch/index/engine/InternalEngine.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index a583e41213d1c..d1c0ee69cc270 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -269,7 +269,8 @@ public void onAfterTranslogSync() { @Override public void onAfterTranslogRecovery() { - flush(false, true); + boolean isForceFlush = engineConfig.getIndexSettings().isSegRepLocalEnabled(); + flush(isForceFlush, true); translogManager.trimUnreferencedTranslogFiles(); } From 8ba8c98e2c1445a5d968ce975d21aabdc10f26b4 Mon Sep 17 00:00:00 2001 From: guojialiang Date: Thu, 4 Dec 2025 21:17:46 +0800 Subject: [PATCH 08/12] Add comments Signed-off-by: guojialiang --- .../main/java/org/opensearch/index/engine/InternalEngine.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index d1c0ee69cc270..e33566acd29c5 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -269,6 +269,8 @@ public void onAfterTranslogSync() { @Override public void onAfterTranslogRecovery() { + // In the scenario of vanilla segment replication, the force parameter is used to ensure that the + // SegmentInfos#version of the primary shard is greater than or equal to that of the replicas. boolean isForceFlush = engineConfig.getIndexSettings().isSegRepLocalEnabled(); flush(isForceFlush, true); translogManager.trimUnreferencedTranslogFiles(); From 740deb36603aa587acd4cc53e7da8a71977ce3ff Mon Sep 17 00:00:00 2001 From: guojialiang Date: Fri, 5 Dec 2025 00:23:16 +0800 Subject: [PATCH 09/12] put Math.min(maxSeqNo, lastRefreshedCheckpoint) in max_seq_no Signed-off-by: guojialiang --- .../java/org/opensearch/index/engine/InternalEngine.java | 5 +---- .../opensearch/indices/replication/common/CopyState.java | 6 ++---- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index e33566acd29c5..a583e41213d1c 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -269,10 +269,7 @@ public void onAfterTranslogSync() { @Override public void onAfterTranslogRecovery() { - // In the scenario of vanilla segment replication, the force parameter is used to ensure that the - // SegmentInfos#version of the primary shard is greater than or equal to that of the replicas. - boolean isForceFlush = engineConfig.getIndexSettings().isSegRepLocalEnabled(); - flush(isForceFlush, true); + flush(false, true); translogManager.trimUnreferencedTranslogFiles(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 6abb3de478642..b13178c5c0ddc 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -23,8 +23,6 @@ import java.io.UncheckedIOException; import java.util.Map; -import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; - /** * An Opensearch-specific version of Lucene's CopyState class that * holds incRef'd file level details for one point-in-time segment infos. @@ -50,8 +48,8 @@ public CopyState(IndexShard shard) throws IOException { SegmentInfos segmentInfosSnapshot = segmentInfos.clone(); Map userData = segmentInfosSnapshot.getUserData(); - userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(lastRefreshedCheckpoint)); - userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(lastRefreshedCheckpoint)); + long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); + userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(Math.min(maxSeqNo, lastRefreshedCheckpoint))); segmentInfosSnapshot.setUserData(userData, false); ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); From f3fec762a59eb2eea8c1368652cb07665a2c713c Mon Sep 17 00:00:00 2001 From: guojialiang Date: Fri, 5 Dec 2025 02:15:42 +0800 Subject: [PATCH 10/12] update Signed-off-by: guojialiang --- .../org/opensearch/indices/replication/common/CopyState.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index b13178c5c0ddc..163e1a9b8817e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -48,7 +48,7 @@ public CopyState(IndexShard shard) throws IOException { SegmentInfos segmentInfosSnapshot = segmentInfos.clone(); Map userData = segmentInfosSnapshot.getUserData(); - long maxSeqNo = Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO)); + long maxSeqNo = Long.parseLong(userData.getOrDefault(SequenceNumbers.MAX_SEQ_NO, "-1")); userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(Math.min(maxSeqNo, lastRefreshedCheckpoint))); segmentInfosSnapshot.setUserData(userData, false); From 9231a7f9e65718acae7e5e4be7b8c40727d49c0a Mon Sep 17 00:00:00 2001 From: guojialiang Date: Fri, 5 Dec 2025 03:04:32 +0800 Subject: [PATCH 11/12] Add comments Signed-off-by: guojialiang --- .../org/opensearch/indices/replication/common/CopyState.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 163e1a9b8817e..2599b8a1942f2 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -49,6 +49,9 @@ public CopyState(IndexShard shard) throws IOException { SegmentInfos segmentInfosSnapshot = segmentInfos.clone(); Map userData = segmentInfosSnapshot.getUserData(); long maxSeqNo = Long.parseLong(userData.getOrDefault(SequenceNumbers.MAX_SEQ_NO, "-1")); + // In the scenario of vanilla segment replication. We need to ensure that after the primary promotion, + // the SegmentInfos#version of the primary shard is always greater than or equal to that of the replicas. + // At the same time, it is also necessary to ensure that there is no risk of data loss. userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(Math.min(maxSeqNo, lastRefreshedCheckpoint))); segmentInfosSnapshot.setUserData(userData, false); From 4265d7110105ce1ec7c9b8028544af077e472863 Mon Sep 17 00:00:00 2001 From: guojialiang Date: Fri, 5 Dec 2025 09:06:05 +0800 Subject: [PATCH 12/12] Add comments Signed-off-by: guojialiang --- .../opensearch/indices/replication/common/CopyState.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 2599b8a1942f2..adfa960e257c3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -49,9 +49,9 @@ public CopyState(IndexShard shard) throws IOException { SegmentInfos segmentInfosSnapshot = segmentInfos.clone(); Map userData = segmentInfosSnapshot.getUserData(); long maxSeqNo = Long.parseLong(userData.getOrDefault(SequenceNumbers.MAX_SEQ_NO, "-1")); - // In the scenario of vanilla segment replication. We need to ensure that after the primary promotion, - // the SegmentInfos#version of the primary shard is always greater than or equal to that of the replicas. - // At the same time, it is also necessary to ensure that there is no risk of data loss. + // In the scenario of primary promotion. We need to ensure that the SegmentInfos#version of the new primary shard + // is greater than or equal to that of the replicas, and also need to ensure that the local_checkpoint of the new + // primary shard is less than or equal to the checkpoint of the Lucene commit. userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(Math.min(maxSeqNo, lastRefreshedCheckpoint))); segmentInfosSnapshot.setUserData(userData, false);