Skip to content

Commit 4384115

Browse files
committed
GH-1277 SafeNotifyService threads leak in CuratorFrameWorkImpl
CURATOR-495 introduced a new runSafeService field in CuratorFrameworkImpl class, and this field is either initialized by an external ExecutorService via the builder, or it is created internally within the class. In the CuratorFrameworkImpl#close method though, this Executor is never closed, so the threads that are opened by the instances are lingering there until the VM is closed by default. Worse, if someone specifies a thread factory to the framework implementation via the builder that produces non-daemon threads, the VM never exits due to the unstopped single thread executor.
1 parent 3bc3ea1 commit 4384115

File tree

2 files changed

+63
-0
lines changed

2 files changed

+63
-0
lines changed

curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ public final class CuratorFrameworkImpl extends CuratorFrameworkBase {
100100
private final EnsembleTracker ensembleTracker;
101101
private final SchemaSet schemaSet;
102102
private final Executor runSafeService;
103+
private boolean isExternalRunSafeService = false;
103104
private final ZookeeperCompatibility zookeeperCompatibility;
104105

105106
private volatile ExecutorService executorService;
@@ -194,6 +195,7 @@ public void process(WatchedEvent watchedEvent) {
194195

195196
private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder) {
196197
if (builder.getRunSafeService() != null) {
198+
isExternalRunSafeService = true;
197199
return builder.getRunSafeService();
198200
}
199201
ThreadFactory threadFactory = builder.getThreadFactory();
@@ -383,6 +385,16 @@ public void close() {
383385
}
384386
}
385387

388+
if (!isExternalRunSafeService) {
389+
((ExecutorService) runSafeService).shutdownNow();
390+
try {
391+
((ExecutorService) runSafeService).awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS);
392+
} catch (InterruptedException e) {
393+
// Interrupted while interrupting; I give up.
394+
Thread.currentThread().interrupt();
395+
}
396+
}
397+
386398
if (ensembleTracker != null) {
387399
ensembleTracker.close();
388400
}

curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,19 @@
2424
import static org.junit.jupiter.api.Assertions.assertTrue;
2525
import com.google.common.collect.Lists;
2626
import com.google.common.collect.Queues;
27+
import io.netty.util.concurrent.DefaultThreadFactory;
2728
import java.util.ArrayList;
29+
import java.util.Arrays;
2830
import java.util.List;
2931
import java.util.concurrent.BlockingQueue;
3032
import java.util.concurrent.CountDownLatch;
33+
import java.util.concurrent.ExecutorService;
34+
import java.util.concurrent.Executors;
3135
import java.util.concurrent.TimeUnit;
3236
import java.util.concurrent.atomic.AtomicBoolean;
3337
import java.util.concurrent.atomic.AtomicLong;
3438
import java.util.concurrent.atomic.AtomicReference;
39+
import java.util.stream.Stream;
3540
import org.apache.curator.framework.CuratorFramework;
3641
import org.apache.curator.framework.CuratorFrameworkFactory;
3742
import org.apache.curator.framework.api.ACLProvider;
@@ -45,6 +50,7 @@
4550
import org.apache.curator.test.BaseClassForTests;
4651
import org.apache.curator.test.Timing;
4752
import org.apache.curator.utils.CloseableUtils;
53+
import org.apache.curator.utils.ThreadUtils;
4854
import org.apache.zookeeper.KeeperException.Code;
4955
import org.apache.zookeeper.data.ACL;
5056
import org.junit.jupiter.api.Test;
@@ -306,4 +312,49 @@ public void listen(OperationAndData<?> data) {
306312
CloseableUtils.closeQuietly(client);
307313
}
308314
}
315+
316+
@Test
317+
public void testCloseShutsDownInternalRunSafeService() {
318+
Timing timing = new Timing();
319+
CuratorFramework client = CuratorFrameworkFactory.newClient(
320+
server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
321+
client.start();
322+
client.runSafe(() -> {});
323+
assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("SafeNotifyService")));
324+
325+
client.close();
326+
327+
assertTrue(enumerateThreads().noneMatch(t -> t.getName().contains("SafeNotifyService")));
328+
}
329+
330+
@Test
331+
public void testCloseLeavesExternalRunSafeServiceRunning() throws Exception {
332+
Timing timing = new Timing();
333+
ExecutorService externalRunner =
334+
Executors.newSingleThreadScheduledExecutor(ThreadUtils.newThreadFactory("ExternalSafeNotifyService"));
335+
CuratorFramework client = CuratorFrameworkFactory.builder()
336+
.connectString(server.getConnectString())
337+
.sessionTimeoutMs(timing.session())
338+
.connectionTimeoutMs(timing.connection())
339+
.retryPolicy(new RetryOneTime(1))
340+
.maxCloseWaitMs(timing.forWaiting().milliseconds())
341+
.runSafeService(externalRunner)
342+
.build();
343+
client.start();
344+
client.runSafe(() -> {});
345+
assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("ExternalSafeNotifyService")));
346+
347+
client.close();
348+
349+
assertTrue(enumerateThreads().anyMatch(t -> t.getName().contains("ExternalSafeNotifyService")));
350+
351+
externalRunner.shutdownNow();
352+
assertTrue(externalRunner.awaitTermination(10, TimeUnit.SECONDS));
353+
}
354+
355+
private static Stream<Thread> enumerateThreads() {
356+
Thread[] threads = new Thread[Thread.activeCount()];
357+
Thread.enumerate(threads);
358+
return Arrays.stream(threads);
359+
}
309360
}

0 commit comments

Comments
 (0)