-
Notifications
You must be signed in to change notification settings - Fork 2.3k
[Segment Replication] Avoid data loss in vanilla segment replication #20150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[Segment Replication] Avoid data loss in vanilla segment replication #20150
Conversation
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
WalkthroughExposes InternalEngine.lastRefreshedCheckpoint via IndexShard.getLastRefreshedCheckpoint, incorporates that checkpoint into CopyState's SegmentInfos snapshot (adjusting MAX_SEQ_NO), and adds an integration test (SegmentReplicationPrimaryPromotionIT) with mock engine/transport hooks to validate replica promotion without data loss. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Possibly related issues
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
Comment |
|
❌ Gradle check result for aae28f2: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)
138-139: GuardgetLastRefreshedCheckpoint()against non‑InternalEngineengines
getLastRefreshedCheckpoint()unconditionally castsEnginetoInternalEngine. If a shard ever runs with a differentEngineimplementation (e.g., a customEngineFactory,ReadOnlyEngine, or future engine type) and this method is called, it will fail with aClassCastException(assert only catches it in tests).Consider adding an explicit runtime check and a clearer failure mode, e.g.:
- public long getLastRefreshedCheckpoint() { - Engine engine = getEngine(); - assert engine instanceof InternalEngine; - return ((InternalEngine) engine).lastRefreshedCheckpoint(); - } + public long getLastRefreshedCheckpoint() { + final Engine engine = getEngine(); + if (engine instanceof InternalEngine internalEngine) { + return internalEngine.lastRefreshedCheckpoint(); + } + throw new IllegalStateException( + "lastRefreshedCheckpoint is only available for InternalEngine but found [" + engine.getClass().getName() + "]" + ); + }This keeps the assumption explicit and avoids a silent hard
ClassCastExceptionif the invariant is ever violated.Also applies to: 4028-4033
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (2)
34-38: Static test coordination state should be reset for future extensibilityThe static coordination state:
public static boolean lockEnable = false; public static CountDownLatch indexLuceneLatch = new CountDownLatch(1); public static CountDownLatch flushLatch = new CountDownLatch(1); public static CountDownLatch refreshLatch = new CountDownLatch(1);combined with the mocked engines is fine for a single test, but it will behave oddly if additional tests are ever added to this class (latches already at zero,
lockEnablelefttrue, etc.).Consider resetting all of these in a
@Before(or@After) method, e.g. reassigning newCountDownLatch(1)instances andlockEnable = false, so each test starts from a known clean state.Also applies to: 53-100
39-42: Ensure the@Beforesetup method actually runs
setup()is declaredprivate:@Before private void setup() { internalCluster().startClusterManagerOnlyNode(); }Depending on the JUnit/OpenSearch test harness configuration, private
@Beforemethods may not be discovered. If that happens, this setup will silently be skipped and the test will rely on default cluster behaviour.To be safe and consistent with typical JUnit usage, make this method
public(and optionally rename tosetUp):- @Before - private void setup() { + @Before + public void setup() { internalCluster().startClusterManagerOnlyNode(); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java(1 hunks)server/src/main/java/org/opensearch/index/shard/IndexShard.java(2 hunks)server/src/main/java/org/opensearch/indices/replication/common/CopyState.java(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
server/src/main/java/org/opensearch/index/shard/IndexShard.java (1)
server/src/main/java/org/opensearch/index/engine/InternalEngine.java (1)
InternalEngine(143-3057)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: detect-breaking-change
- GitHub Check: Analyze (java)
- GitHub Check: Mend Security Check
...lusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java
Show resolved
Hide resolved
server/src/main/java/org/opensearch/indices/replication/common/CopyState.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (2)
109-163: Ensure test actually runs and cannot hang indefinitelyA few issues in the main test method impact its effectiveness and robustness:
- Missing
@TestannotationThe method
testPrimaryStopped_ReplicaPromoted_no_data_lossis not annotated with@Test. Under JUnit 4 (as used byOpenSearchIntegTestCase), this will typically not be discovered as a test method. Add the annotation and import:-import org.junit.Before; +import org.junit.Before; +import org.junit.Test; ... - // Used to test that primary promotion does not result in data loss. - public void testPrimaryStopped_ReplicaPromoted_no_data_loss() throws Exception { + // Used to test that primary promotion does not result in data loss. + @Test + public void testPrimaryStopped_ReplicaPromoted_no_data_loss() throws Exception {This is critical because this IT is the primary guard against the data‑loss scenario.
- Unbounded waits on latches and
join()can hang the suite (previously flagged)
flushLatch.await(),refreshLatch.await(), andwriteThread.join()are all unbounded. If the mocked engine or replication flow regresses, the test will hang indefinitely and stall the IT suite.You can mitigate this with bounded waits and assertions:
- flushLatch.await(); + assertTrue("flushLatch timed out", flushLatch.await(30, TimeUnit.SECONDS)); ... - refreshLatch.await(); - writeThread.join(); + assertTrue("refreshLatch timed out", refreshLatch.await(30, TimeUnit.SECONDS)); + writeThread.join(TimeUnit.SECONDS.toMillis(30)); + assertFalse("writeThread is still alive", writeThread.isAlive());(Requires
import java.util.concurrent.TimeUnit;and usingassertTrue/assertFalsefrom the base test class.)
- Mocked publish‑checkpoint exception is permanent; clarify intent (previously flagged in spirit)
The replica’s
PublishCheckpointActionhandler always throws aRemoteTransportException:replicaTransportService.addRequestHandlingBehavior( PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX, (handler, request, channel, task) -> { throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException()); } );If the intention is to simulate a permanent failure on publishing checkpoints, adding a short comment here would make that explicit. If the goal is to fail only the first attempt and allow retries to succeed, consider guarding this with a flag (e.g., an
AtomicBooleanfail‑once pattern) so later attempts go through.
160-161: UseassertEqualsfor clearer hit‑count assertion (already suggested in earlier review)The assertion:
assertTrue(response.getHits().getTotalHits().value() == 2);works but produces a less helpful failure message. Prefer
assertEqualswith the correct long literal:- assertTrue(response.getHits().getTotalHits().value() == 2); + assertEquals(2L, response.getHits().getTotalHits().value());This will show expected vs. actual values directly on failure.
🧹 Nitpick comments (1)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (1)
41-44: Static latch and flag state should be reset for test isolation
lockEnableand theCountDownLatchinstances arepublic staticand never reset. This is fine for a single test today, but will make the class fragile if additional tests are added or if the class is reused, since stale latch state (count already at 0,lockEnablelefttrue) will change the control‑flow assumptions.Consider reinitializing these in a
@Before/@Aftermethod (or localizing them to the test) and, if they remain shared across threads, makinglockEnablevolatileto guarantee visibility.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: Analyze (java)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: detect-breaking-change
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-latest)
...lusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java
Show resolved
Hide resolved
...lusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (4)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (4)
47-50: JUnit lifecycle method should not beprivate.With JUnit 4, methods annotated with
@Beforemust bepublic; declaringprivate void setup()can cause the runner to treat it as an invalid lifecycle method and fail or ignore it. That would prevent the cluster-manager-only node from being started as intended.Change the signature to be public:
@Before - private void setup() { + public void setup() { internalCluster().startClusterManagerOnlyNode(); }
80-92: Guard engine-level latch wait with a timeout and handle interruption explicitly.
MockInternalEngine.generateSeqNoForOperationOnPrimaryblocks onindexLuceneLatch.await()without a timeout and wraps allExceptions in aRuntimeException. If the latch is never released (e.g., test regression, early failure), this will block the engine thread indefinitely and hang the test run.Add a bounded wait with an assertion:
if (lockEnable) { flushLatch.countDown(); - indexLuceneLatch.await(); + if (!indexLuceneLatch.await(30, TimeUnit.SECONDS)) { + throw new AssertionError("indexLuceneLatch timed out"); + } } - } catch (Exception e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); }
120-144: Add timeout to thread join and capture exceptions from the write thread.Two issues with the write thread:
Line 144: Unbounded
writeThread.join()can hang the test
If the write thread encounters an unexpected failure or deadlock, this will block indefinitely. Add a timeout:- writeThread.join(); + writeThread.join(TimeUnit.SECONDS.toMillis(30)); + assertFalse("writeThread still alive after timeout", writeThread.isAlive());Line 120: Exceptions thrown inside the lambda are silently lost
Ifclient().prepareIndex()throws an exception (e.g., rejection, timeout), the test cannot detect it and may incorrectly pass. Capture and propagate exceptions:+ AtomicReference<Exception> writeError = new AtomicReference<>(); Thread writeThread = new Thread(() -> { - client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").get(); + try { + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").get(); + } catch (Exception e) { + writeError.set(e); + } }); writeThread.start(); ... writeThread.join(TimeUnit.SECONDS.toMillis(30)); + assertNull("Write thread failed: " + writeError.get(), writeError.get());
133-138: Clarify whether checkpoint publish should fail permanently or transiently.The mock transport exception is thrown unconditionally for all
PublishCheckpointActionrequests throughout the entire test. If the intent is to simulate a transient network failure where retries eventually succeed, you should use a flag or counter to allow subsequent requests to pass.If permanent failure is the intended scenario to validate data loss prevention, add a comment clarifying that all checkpoint publishes are expected to fail:
+ // Simulate permanent network partition: all checkpoint publishes to replica will fail replicaTransportService.addRequestHandlingBehavior( PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX, (handler, request, channel, task) -> { throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException()); } );Otherwise, if retries should succeed after the first failure:
+ AtomicBoolean failOnce = new AtomicBoolean(true); replicaTransportService.addRequestHandlingBehavior( PublishCheckpointAction.ACTION_NAME + TransportReplicationAction.REPLICA_ACTION_SUFFIX, (handler, request, channel, task) -> { + if (failOnce.compareAndSet(true, false)) { throw new RemoteTransportException("mock remote transport exception", new OpenSearchRejectedExecutionException()); + } + handler.messageReceived(request, channel, task); } );
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: detect-breaking-change
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: Analyze (java)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: assemble (25, ubuntu-latest)
🔇 Additional comments (1)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (1)
110-164: Test logic correctly validates data loss prevention during primary promotion.The test effectively simulates the scenario:
- Writes are in-flight when a flush/refresh occurs
- Network partition prevents checkpoint propagation to replica
- Primary is stopped and replica is promoted
- The promoted replica should contain all committed data
Good improvements from previous review:
- Lines 122, 143: Latch waits now have 30-second timeouts
- Line 162: Uses
assertEqualsfor clearer assertion messagesThe test logic correctly validates that the promoted replica recovers both documents despite the simulated network partition.
...lusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (1)
84-96: Add bounded wait toindexLuceneLatch.await()to prevent test hangs.The
indexLuceneLatch.await()call on line 90 lacks a timeout. If the latch is never released (e.g., due to test failure or regression), this will block the engine thread indefinitely and hang the test suite.Apply this diff to add a bounded wait:
try { if (lockEnable) { flushLatch.countDown(); - indexLuceneLatch.await(); + if (indexLuceneLatch.await(30, TimeUnit.SECONDS) == false) { + throw new AssertionError("indexLuceneLatch timed out"); + } } - } catch (Exception e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); }
🧹 Nitpick comments (2)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (2)
148-148: Add a timeout towriteThread.join()for test robustness.The
writeThread.join()call lacks a timeout. If the write thread hangs for any reason, the test will block indefinitely.Apply this diff:
- writeThread.join(); + writeThread.join(TimeUnit.SECONDS.toMillis(30)); + assertFalse("writeThread did not complete in time", writeThread.isAlive());
137-142: Clarify the intentional permanent failure behavior with a comment.The mock transport exception handler unconditionally throws for the entire test duration. While this appears intentional (simulating persistent checkpoint publish failures until the primary stops), adding a brief comment would clarify this for future readers.
// mock network exception + // The exception is thrown unconditionally to simulate persistent publish-checkpoint failures + // until the primary is stopped and the replica is promoted. MockTransportService replicaTransportService = ((MockTransportService) internalCluster().getInstance(
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (3)
server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java (1)
TransportReplicationAction(113-1689)test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java (1)
MockTransportService(105-744)server/src/main/java/org/opensearch/transport/TransportService.java (1)
TransportService(102-1859)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: Analyze (java)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: detect-breaking-change
🔇 Additional comments (4)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (4)
47-54: LGTM!The setup method correctly initializes the static coordination fields before each test, preventing cross-test pollution. The method visibility is properly set to
publicfor JUnit 4 lifecycle.
56-63: LGTM!Correctly filters out the default
MockEngineFactoryPluginand adds the test-specificMockEnginePluginto enable controlled timing behavior during the test.
99-112: LGTM!The
MockNRTReplicationEnginecorrectly signals the refresh latch after indexing completes on the replica, enabling proper test coordination.
114-168: Well-structured test for data loss prevention during primary promotion.The test effectively validates that documents indexed on the primary are not lost when the replica is promoted, even when checkpoint publish requests fail. The latch-based coordination ensures deterministic timing of operations.
|
❌ Gradle check result for 810aa28: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (2)
79-97: Consider preserving thread interrupt status.The bounded wait on line 90 correctly addresses the previous timeout concern. However, when catching
InterruptedExceptionwithin the broadExceptioncatch, the thread's interrupt status is lost. While the current approach works for test purposes, explicitly handlingInterruptedExceptionand restoring the interrupt status would be more robust:try { if (lockEnable) { flushLatch.countDown(); assertTrue("indexLuceneLatch timed out", indexLuceneLatch.await(30, TimeUnit.SECONDS)); } -} catch (Exception e) { +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); }
114-170: Test logic is sound; consider capturing write thread exceptions for clearer diagnostics.The test correctly implements all previous recommendations: bounded waits with timeouts (lines 126, 148), timed thread join with assertion (lines 149-150), and
assertEqualsfor cleaner assertions (line 168). The unconditional mock exception behavior (lines 138-143) is now clearly documented as intentional.One enhancement for test robustness: capture any exception from the write thread to surface indexing failures explicitly, rather than relying on downstream assertions to detect them:
+final AtomicReference<Exception> writeException = new AtomicReference<>(); Thread writeThread = new Thread(() -> { - client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").get(); + try { + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", "bar2").get(); + } catch (Exception e) { + writeException.set(e); + } }); writeThread.start(); assertTrue("flushLatch timed out", flushLatch.await(30, TimeUnit.SECONDS)); +// ... rest of test +writeThread.join(TimeUnit.SECONDS.toMillis(30)); +assertFalse("writeThread did not complete in time", writeThread.isAlive()); +assertNull("Write thread failed with exception: " + writeException.get(), writeException.get());Also add the import:
import java.util.concurrent.atomic.AtomicReference;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (20)
- GitHub Check: gradle-check
- GitHub Check: precommit (25, macos-15-intel)
- GitHub Check: precommit (25, ubuntu-24.04-arm)
- GitHub Check: precommit (21, windows-2025, true)
- GitHub Check: precommit (21, macos-15-intel)
- GitHub Check: precommit (25, macos-15)
- GitHub Check: precommit (25, windows-latest)
- GitHub Check: precommit (21, macos-15)
- GitHub Check: precommit (25, ubuntu-latest)
- GitHub Check: precommit (21, ubuntu-24.04-arm)
- GitHub Check: precommit (21, ubuntu-latest)
- GitHub Check: precommit (21, windows-latest)
- GitHub Check: assemble (25, ubuntu-latest)
- GitHub Check: assemble (25, ubuntu-24.04-arm)
- GitHub Check: assemble (25, windows-latest)
- GitHub Check: assemble (21, ubuntu-24.04-arm)
- GitHub Check: assemble (21, windows-latest)
- GitHub Check: assemble (21, ubuntu-latest)
- GitHub Check: Analyze (java)
- GitHub Check: detect-breaking-change
🔇 Additional comments (2)
server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationPrimaryPromotionIT.java (2)
42-54: Previous review concerns have been addressed.The
setup()method is now public (fixing the JUnit lifecycle issue), and all static coordination fields are properly reset before each test. The bounded waits and proper exception handling recommended in earlier reviews are also now in place throughout the test.
56-112: LGTM! Mock engine infrastructure is well-designed.The plugin filtering (lines 58-60) correctly prevents conflicts, and the factory pattern (line 75) properly instantiates different engine types based on replica mode. The latch coordination in
MockNRTReplicationEngine.index()(lines 107-109) integrates cleanly with the test flow.
|
❌ Gradle check result for 5b603a8: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
…rformed after translog recovery. Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #20150 +/- ##
============================================
+ Coverage 73.25% 73.29% +0.03%
- Complexity 71684 71756 +72
============================================
Files 5788 5793 +5
Lines 327866 328069 +203
Branches 47218 47246 +28
============================================
+ Hits 240194 240454 +260
+ Misses 68399 68330 -69
- Partials 19273 19285 +12 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
…vanilla-segment-replication
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
|
❌ Gradle check result for 8ba8c98: null Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
|
❌ Gradle check result for 740deb3: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
❌ Gradle check result for f3fec76: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: guojialiang <guojialiang.2012@bytedance.com>
|
❌ Gradle check result for f4a712b: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Description
The purpose of this PR is to address the issue of data loss in vanilla segment replication mentioned in #20118.
Related Issues
Resolves #[20118]
Instructions
When constructing
CopyState, put theMath.min(maxSeqNo, lastRefreshedCheckpoint)inmax_seq_no. Ensure theuserDatastructure is consistent across segment replication and remote store.I did not directly put
lastRefreshedCheckpointinmax_seq_no. It is designed to ensure that TestSegmentReplicationIT#testRestartPrimarycan pass under the new logic, and I will provide a detailed explanation.This test simulates the process of writing a document, restarting the primary shard node after completing segment replication, and waiting for the cluster to reach the green state.
Let's first take a look at the execution process in the current main branch.
SegmentInfos#versionof the primary shard and the replica are equal (it is9in the test).commitIndexWriter, and theSegmentInfos#versionwill be incremented by1in the logic ofIndexWriter#setLiveCommitDataandIndexWriter#prepareCommitInternalrespectively (updated from9to11).NRTReplicationEngine, theSegmentInfos#versionof the replica will be incremented by1inNRTReplicationEngine#closeNoLock(updated from9to10). Then it will switch to theInternalEngine, perform translog recovery (the actual number of recovered operations is0), and then execute the flush operation. In the current logic, because condition is met (getProcessedLocalCheckpoint()is0,local_checkpointis-1), shard will performcommitIndexWriter, and theSegmentInfos#versionwill be incremented by1in the logic ofIndexWriter#setLiveCommitDataandIndexWriter#prepareCommitInternalrespectively (updated from10to12).segmentInfosVersionof the primary shard(12)is greater than thesegmentInfosVersionof the replica(11), force segment replication can be executed normally.Next, let's take a look at the execution process of directly using
lastRefreshedCheckpoint.NRTReplicationEngine, theSegmentInfos#versionof the replica will be incremented by1inNRTReplicationEngine#closeNoLock(updated from9to10). Then it will switch to theInternalEngine, perform translog recovery (the actual number of recovered operations is0), and then execute the flush operation. Since thelocal checkpointis updated using thelatest refreshed checkpoint, condition is not met (getProcessedLocalCheckpoint()is0,local_checkpointis0), and theSegmentInfos#versionremains10.segmentInfosVersionof the primary shard(10)is less than the localsegmentInfosVersion(11), it will cause the recovery of the replica to fail.Finally, let's take a look at the solution using
Math.min(maxSeqNo, lastRefreshedCheckpoint).We need to ensure that after the primary promotion, the
SegmentInfos#versionof the primary shard is always greater than or equal to that of the replicas. At the same time, it is also necessary to ensure that there is no risk of data loss.During the primary promotion process,
restoreVersionMapAndCheckpointTrackerensures the live version map and checkpoint tracker are in sync with the Lucene commit. Therefore, we only need to ensure that thelocal_checkpointof the replica shard is less than the checkpoint in the Lucene commit, rather than strictly equal.When using
Math.min(maxSeqNo, lastRefreshedCheckpoint), there are the following scenarios.maxSeqNo < lastRefreshedCheckpoint. The old primary shard has not yet performed a flush. During the restart of the old primary shard and the promotion of the new primary shard,commitIndexWriterwill be executed because the condition are met.maxSeqNo = lastRefreshedCheckpoint. After writing the last doc, a flush operation was performed. During the restart process of the old primary shard and the promotion process of the new primary shard, neither will executecommitIndexWriterdue to unmet condition.maxSeqNo > lastRefreshedCheckpoint. There are documents in the translog of the old primary shard that have not been indexed. During the restart of the old primary shard and the promotion of the new primary shard,commitIndexWriterwill be executed because the condition are met.All scenarios above can ensure that the
segmentInfosVersionof the primary shard is greater than or equal to that of the replica. Meanwhile, sincelocal_checkpointof the replica is less than or equal to checkpoint of the Lucene commit, no data loss will occur during primary promotion.Additionally, it should be noted that the remote store does not have the above issues. Although when restarting the old primary shard node, the
SegmentInfos#versionis updated to11due to the execution of the flush operation, which is greater than that of the old replica. However, when the new replica starts, it will download the segment files from the remote store. Thus ensuring that force segment replication can execute normally.Test
Introduce Test
SegmentReplicationPrimaryPromotionIT#testPrimaryStopped_ReplicaPromoted_no_data_lossto simulate the reproduced case in #20118.Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.
Summary by CodeRabbit
Bug Fixes
Tests
New Features
✏️ Tip: You can customize this high-level summary in your review settings.