Skip to content

Commit

Permalink
IGNITE-21565 Sync effective maxObservableSafeTime initialization on l…
Browse files Browse the repository at this point in the history
…eader election. (#4552)
  • Loading branch information
sanpwc authored Oct 15, 2024
1 parent cc53fb6 commit 25c67ca
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,11 @@ default void onConfigurationCommitted(CommittedConfiguration config) {
default void onLeaderStart() {
// No-op.
}

/**
* Invoked when the belonging node stops being the leader of the group.
*/
default void onLeaderStop() {
// No-op.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -872,5 +872,12 @@ public void onLeaderStart(long term) {

listener.onLeaderStart();
}

@Override
public void onLeaderStop(Status status) {
super.onLeaderStop(status);

listener.onLeaderStop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

Expand Down Expand Up @@ -147,7 +146,6 @@ private static void sendSafeTimeSyncCommand(
* </ol>
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-21565")
public void testSafeTimeReorderingOnLeaderReElection() throws Exception {
// Start three nodes and a raft group with three peers.
{
Expand All @@ -162,6 +160,9 @@ public void testSafeTimeReorderingOnLeaderReElection() throws Exception {

assertThat(raftClient.refreshLeader(), willCompleteSuccessfully());

// Send first safeTime aware raft command in order to initialize leader's safe time.
sendSafeTimeSyncCommand(raftClient, HybridTimestamp.MIN_VALUE, true);

HybridTimestamp firstSafeTime = calculateSafeTime(someNode.clockService);

// Send command with safe time X.
Expand All @@ -183,6 +184,8 @@ public void testSafeTimeReorderingOnLeaderReElection() throws Exception {

RaftGroupService anotherClient = aliveNode.get().raftClient;

assertThat(anotherClient.refreshLeader(), willCompleteSuccessfully());

// Send command with safe time less than previously applied to the new leader and verify that SafeTimeReorderException is thrown.
sendSafeTimeSyncCommand(anotherClient, firstSafeTime.subtractPhysicalTime(1), true);

Expand Down Expand Up @@ -220,6 +223,9 @@ public void testSafeTimeReorderingOnLeaderRestart() throws Exception {

assertThat(raftClient.refreshLeader(), willCompleteSuccessfully());

// Send first safeTime aware raft command in order to initialize leader's safe time.
sendSafeTimeSyncCommand(raftClient, HybridTimestamp.MIN_VALUE, true);

HybridTimestamp firstSafeTime = calculateSafeTime(someNode.clockService);

// Send command with safe time X.
Expand All @@ -234,6 +240,8 @@ public void testSafeTimeReorderingOnLeaderRestart() throws Exception {
// And restart.
startCluster(cluster);

assertThat(someNode.raftClient.refreshLeader(), willCompleteSuccessfully());

// Send command with safe time less than previously applied to the leader before the restart
// and verify that SafeTimeReorderException is thrown.
sendSafeTimeSyncCommand(someNode.raftClient, firstSafeTime.subtractPhysicalTime(1), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,18 +578,18 @@ public void onShutdown() {
storage.close();
}

@Override
public void onLeaderStart() {
maxObservableSafeTime = clockService.now().addPhysicalTime(clockService.maxClockSkewMillis()).longValue();
}

@Override
public boolean onBeforeApply(Command command) {
// This method is synchronized by replication group specific monitor, see ActionRequestProcessor#handleRequest.
if (command instanceof SafeTimePropagatingCommand) {
SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand) command;
long proposedSafeTime = cmd.safeTime().longValue();

if (maxObservableSafeTime == -1) {
maxObservableSafeTime = clockService.now().addPhysicalTime(clockService.maxClockSkewMillis()).longValue();
LOG.info("maxObservableSafeTime is initialized with [" + maxObservableSafeTime + "].");
}

// Because of clock.tick it's guaranteed that two different commands will have different safe timestamps.
// maxObservableSafeTime may match proposedSafeTime only if it is the command that was previously validated and then retried
// by raft client because of either TimeoutException or inner raft server recoverable exception.
Expand All @@ -603,6 +603,12 @@ public boolean onBeforeApply(Command command) {
return false;
}

@Override
public void onLeaderStop() {
maxObservableSafeTime = -1;
LOG.info("maxObservableSafeTime is set to [" + maxObservableSafeTime + "] on leader stop.");
}

/**
* Returns underlying storage.
*/
Expand Down

0 comments on commit 25c67ca

Please sign in to comment.