diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java index d1696d06fe2..b26b7d44ffb 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftGroupListener.java @@ -96,4 +96,11 @@ default void onConfigurationCommitted(CommittedConfiguration config) { default void onLeaderStart() { // No-op. } + + /** + * Invoked when the belonging node stops being the leader of the group. + */ + default void onLeaderStop() { + // No-op. + } } diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java index fea81813c0a..276ce753270 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java @@ -872,5 +872,12 @@ public void onLeaderStart(long term) { listener.onLeaderStart(); } + + @Override + public void onLeaderStop(Status status) { + super.onLeaderStop(status); + + listener.onLeaderStop(); + } } } diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java index 927ad111623..a778bcc82e4 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicasSafeTimePropagationTest.java @@ -75,7 +75,6 @@ import org.apache.ignite.network.NetworkAddress; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -147,7 +146,6 @@ private static void sendSafeTimeSyncCommand( * */ @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-21565") public void testSafeTimeReorderingOnLeaderReElection() throws Exception { // Start three nodes and a raft group with three peers. { @@ -162,6 +160,9 @@ public void testSafeTimeReorderingOnLeaderReElection() throws Exception { assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + // Send first safeTime aware raft command in order to initialize leader's safe time. + sendSafeTimeSyncCommand(raftClient, HybridTimestamp.MIN_VALUE, true); + HybridTimestamp firstSafeTime = calculateSafeTime(someNode.clockService); // Send command with safe time X. @@ -183,6 +184,8 @@ public void testSafeTimeReorderingOnLeaderReElection() throws Exception { RaftGroupService anotherClient = aliveNode.get().raftClient; + assertThat(anotherClient.refreshLeader(), willCompleteSuccessfully()); + // Send command with safe time less than previously applied to the new leader and verify that SafeTimeReorderException is thrown. sendSafeTimeSyncCommand(anotherClient, firstSafeTime.subtractPhysicalTime(1), true); @@ -220,6 +223,9 @@ public void testSafeTimeReorderingOnLeaderRestart() throws Exception { assertThat(raftClient.refreshLeader(), willCompleteSuccessfully()); + // Send first safeTime aware raft command in order to initialize leader's safe time. + sendSafeTimeSyncCommand(raftClient, HybridTimestamp.MIN_VALUE, true); + HybridTimestamp firstSafeTime = calculateSafeTime(someNode.clockService); // Send command with safe time X. @@ -234,6 +240,8 @@ public void testSafeTimeReorderingOnLeaderRestart() throws Exception { // And restart. startCluster(cluster); + assertThat(someNode.raftClient.refreshLeader(), willCompleteSuccessfully()); + // Send command with safe time less than previously applied to the leader before the restart // and verify that SafeTimeReorderException is thrown. sendSafeTimeSyncCommand(someNode.raftClient, firstSafeTime.subtractPhysicalTime(1), true); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 0ea2a464025..569cc6308c1 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -578,11 +578,6 @@ public void onShutdown() { storage.close(); } - @Override - public void onLeaderStart() { - maxObservableSafeTime = clockService.now().addPhysicalTime(clockService.maxClockSkewMillis()).longValue(); - } - @Override public boolean onBeforeApply(Command command) { // This method is synchronized by replication group specific monitor, see ActionRequestProcessor#handleRequest. @@ -590,6 +585,11 @@ public boolean onBeforeApply(Command command) { SafeTimePropagatingCommand cmd = (SafeTimePropagatingCommand) command; long proposedSafeTime = cmd.safeTime().longValue(); + if (maxObservableSafeTime == -1) { + maxObservableSafeTime = clockService.now().addPhysicalTime(clockService.maxClockSkewMillis()).longValue(); + LOG.info("maxObservableSafeTime is initialized with [" + maxObservableSafeTime + "]."); + } + // Because of clock.tick it's guaranteed that two different commands will have different safe timestamps. // maxObservableSafeTime may match proposedSafeTime only if it is the command that was previously validated and then retried // by raft client because of either TimeoutException or inner raft server recoverable exception. @@ -603,6 +603,12 @@ public boolean onBeforeApply(Command command) { return false; } + @Override + public void onLeaderStop() { + maxObservableSafeTime = -1; + LOG.info("maxObservableSafeTime is set to [" + maxObservableSafeTime + "] on leader stop."); + } + /** * Returns underlying storage. */