diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml index 927af0c28..1ac5a8b75 100644 --- a/curator-recipes/pom.xml +++ b/curator-recipes/pom.xml @@ -83,6 +83,12 @@ test + + org.assertj + assertj-core + test + + org.awaitility awaitility diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java index dc79f6668..a2e61c972 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java @@ -19,6 +19,7 @@ package org.apache.curator.framework.recipes.leader; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; @@ -52,6 +53,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.IntStream; +import javax.annotation.Nonnull; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.imps.TestCleanState; @@ -72,9 +74,12 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Tag(CuratorTestBase.zk35TestCompatibilityGroup) public class TestLeaderLatch extends BaseClassForTests { + private static final Logger LOG = LoggerFactory.getLogger(TestLeaderLatch.class); private static final String PATH_NAME = "/one/two/me"; private static final int MAX_LOOPS = 5; @@ -208,6 +213,59 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception { } } + @Test + public void testSessionInterruptionDoNotCauseBrainSplit() throws Exception { + final String latchPath = "/testSessionInterruptionDoNotCauseBrainSplit"; + final Timing2 timing = new Timing2(); + final BlockingQueue events = new LinkedBlockingQueue() { + @Override + public boolean add(@Nonnull TestEvent testEvent) { + LOG.debug("Add event: {}", testEvent); + return super.add(testEvent); + } + }; + + final List closeableResources = new ArrayList<>(); + try { + final String id0 = "id0"; + final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, null); + closeableResources.add(client0); + final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events); + closeableResources.add(latch0); + + assertThat(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP)); + + final String id1 = "id1"; + final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, null); + closeableResources.add(client1); + final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events); + closeableResources.add(latch1); + + // wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation + // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset + // call + timing.forWaiting().sleepABit(); + + assertTrue(latch0.hasLeadership()); + assertFalse(latch1.hasLeadership()); + + client0.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration(); + + assertThat(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id0, TestEventType.LOST_LEADERSHIP)); + assertThat(events.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS)) + .isNotNull() + .isEqualTo(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP)); + } finally { + // reverse is necessary for closing the LeaderLatch instances before closing the corresponding client + Collections.reverse(closeableResources); + closeableResources.forEach(CloseableUtils::closeQuietly); + } + } + @Test public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Exception { final String latchPath = "/test"; @@ -316,7 +374,9 @@ private static CuratorFramework createAndStartClient( client.getConnectionStateListenable().addListener((client1, newState) -> { if (newState == ConnectionState.CONNECTED) { - events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION)); + if (events != null) { + events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION)); + } } }); @@ -366,6 +426,11 @@ public boolean equals(Object o) { TestEvent testEvent = (TestEvent) o; return Objects.equals(id, testEvent.id) && eventType == testEvent.eventType; } + + @Override + public String toString() { + return "TestEvent{" + "eventType=" + eventType + ", id='" + id + '\'' + '}'; + } } @Test