diff --git a/curator-recipes/pom.xml b/curator-recipes/pom.xml
index 927af0c28..1ac5a8b75 100644
--- a/curator-recipes/pom.xml
+++ b/curator-recipes/pom.xml
@@ -83,6 +83,12 @@
test
+
+ org.assertj
+ assertj-core
+ test
+
+
org.awaitility
awaitility
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index 80509dbf8..4d20f9afd 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -509,7 +509,7 @@ public void processResult(CuratorFramework client, CuratorEvent event) throws Ex
getChildren();
}
} else {
- log.error("getChildren() failed. rc = " + event.getResultCode());
+ log.error("getChildren() failed. rc = {}", event.getResultCode());
}
}
};
@@ -548,43 +548,57 @@ private void checkLeadership(List children) throws Exception {
log.debug("checkLeadership with id: {}, ourPath: {}, children: {}", id, localOurPath, sortedChildren);
if (ourIndex < 0) {
- log.error("Can't find our node. Resetting. Index: " + ourIndex);
+ log.error("Can't find our node. Resetting. Index: {}", ourIndex);
reset();
- } else if (ourIndex == 0) {
- lastPathIsLeader.set(localOurPath);
- setLeadership(true);
- } else {
- setLeadership(false);
- String watchPath = sortedChildren.get(ourIndex - 1);
- Watcher watcher = new Watcher() {
- @Override
- public void process(WatchedEvent event) {
- if (state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted) {
- try {
- getChildren();
- } catch (Exception ex) {
- ThreadUtils.checkInterrupted(ex);
- log.error("An error occurred checking the leadership.", ex);
+ return;
+ }
+
+ if (ourIndex == 0) {
+ client.getData()
+ .inBackground((client, event) -> {
+ final long ephemeralOwner =
+ event.getStat() != null ? event.getStat().getEphemeralOwner() : -1;
+ final long thisSessionId =
+ client.getZookeeperClient().getZooKeeper().getSessionId();
+ if (ephemeralOwner != thisSessionId) {
+ // this node is gone - reset
+ reset();
+ } else {
+ lastPathIsLeader.set(localOurPath);
+ setLeadership(true);
}
- }
- }
- };
+ })
+ .forPath(localOurPath);
+ return;
+ }
- BackgroundCallback callback = new BackgroundCallback() {
- @Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
- if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
- // previous node is gone - retry getChildren
+ setLeadership(false);
+ String watchPath = sortedChildren.get(ourIndex - 1);
+ Watcher watcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (state.get() == State.STARTED && event.getType() == Event.EventType.NodeDeleted) {
+ try {
getChildren();
+ } catch (Exception ex) {
+ ThreadUtils.checkInterrupted(ex);
+ log.error("An error occurred checking the leadership.", ex);
}
}
- };
- // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
- client.getData()
- .usingWatcher(watcher)
- .inBackground(callback)
- .forPath(ZKPaths.makePath(latchPath, watchPath));
- }
+ }
+ };
+
+ BackgroundCallback callback = new BackgroundCallback() {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
+ if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) {
+ // previous node is gone - retry getChildren
+ getChildren();
+ }
+ }
+ };
+ // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
+ client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath));
}
private void getChildren() throws Exception {
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index dc79f6668..528b317ff 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.recipes.leader;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -72,9 +73,12 @@
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Tag(CuratorTestBase.zk35TestCompatibilityGroup)
public class TestLeaderLatch extends BaseClassForTests {
+ private static final Logger LOG = LoggerFactory.getLogger(TestLeaderLatch.class);
private static final String PATH_NAME = "/one/two/me";
private static final int MAX_LOOPS = 5;
@@ -208,6 +212,58 @@ public void testWatchedNodeDeletedOnReconnect() throws Exception {
}
}
+ @Test
+ public void testSessionInterruptionDoNotCauseBrainSplit() throws Exception {
+ final String latchPath = "/testSessionInterruptionDoNotCauseBrainSplit";
+ final Timing2 timing = new Timing2();
+ final BlockingQueue events0 = new LinkedBlockingQueue<>();
+ final BlockingQueue events1 = new LinkedBlockingQueue<>();
+
+ final List closeableResources = new ArrayList<>();
+ try {
+ final String id0 = "id0";
+ final CuratorFramework client0 = createAndStartClient(server.getConnectString(), timing, id0, null);
+ closeableResources.add(client0);
+ final LeaderLatch latch0 = createAndStartLeaderLatch(client0, latchPath, id0, events0);
+ closeableResources.add(latch0);
+
+ assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
+ .isNotNull()
+ .isEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP));
+
+ final String id1 = "id1";
+ final CuratorFramework client1 = createAndStartClient(server.getConnectString(), timing, id1, null);
+ closeableResources.add(client1);
+ final LeaderLatch latch1 = createAndStartLeaderLatch(client1, latchPath, id1, events1);
+ closeableResources.add(latch1);
+
+ // wait for the non-leading LeaderLatch (i.e. latch1) instance to be done with its creation
+ // this call is time-consuming but necessary because we don't have a handle to detect the end of the reset
+ // call
+ timing.forWaiting().sleepABit();
+
+ assertTrue(latch0.hasLeadership());
+ assertFalse(latch1.hasLeadership());
+
+ client0.getZookeeperClient().getZooKeeper().getTestable().injectSessionExpiration();
+
+ assertThat(events1.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
+ .isNotNull()
+ .isEqualTo(new TestEvent(id1, TestEventType.GAINED_LEADERSHIP));
+
+ assertThat(events0.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS))
+ .isNotNull()
+ .isEqualTo(new TestEvent(id0, TestEventType.LOST_LEADERSHIP));
+ // No leadership grained to old leader after session changed, hence no brain split.
+ assertThat(events0.poll(20, TimeUnit.MILLISECONDS))
+ .isNotEqualTo(new TestEvent(id0, TestEventType.GAINED_LEADERSHIP));
+ } finally {
+ // reverse is necessary for closing the LeaderLatch instances before closing the corresponding client
+ Collections.reverse(closeableResources);
+ closeableResources.forEach(CloseableUtils::closeQuietly);
+ }
+ }
+
@Test
public void testResettingOfLeadershipAfterConcurrentLeadershipChange() throws Exception {
final String latchPath = "/test";
@@ -316,7 +372,9 @@ private static CuratorFramework createAndStartClient(
client.getConnectionStateListenable().addListener((client1, newState) -> {
if (newState == ConnectionState.CONNECTED) {
- events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION));
+ if (events != null) {
+ events.add(new TestEvent(id, TestEventType.GAINED_CONNECTION));
+ }
}
});
@@ -366,6 +424,11 @@ public boolean equals(Object o) {
TestEvent testEvent = (TestEvent) o;
return Objects.equals(id, testEvent.id) && eventType == testEvent.eventType;
}
+
+ @Override
+ public String toString() {
+ return "TestEvent{" + "eventType=" + eventType + ", id='" + id + '\'' + '}';
+ }
}
@Test