From e9c9dc2df19011ea6e37210c4e536f370e086cd1 Mon Sep 17 00:00:00 2001 From: Elvis de Freitas Date: Thu, 1 Aug 2024 14:19:19 -0300 Subject: [PATCH 01/10] testing deadlock --- .../solver/SolversCacheTest.java | 57 +++++++++++++++++++ .../testing/templates/MessageTemplates.java | 4 ++ 2 files changed, 61 insertions(+) diff --git a/src/test/java/com/mageddo/dnsproxyserver/solver/SolversCacheTest.java b/src/test/java/com/mageddo/dnsproxyserver/solver/SolversCacheTest.java index 738a78ff8..791b67f9c 100644 --- a/src/test/java/com/mageddo/dnsproxyserver/solver/SolversCacheTest.java +++ b/src/test/java/com/mageddo/dnsproxyserver/solver/SolversCacheTest.java @@ -1,5 +1,6 @@ package com.mageddo.dnsproxyserver.solver; +import com.mageddo.commons.concurrent.ThreadPool; import com.mageddo.commons.concurrent.Threads; import com.mageddo.dns.utils.Messages; import com.mageddo.dnsproxyserver.solver.CacheName.Name; @@ -10,12 +11,20 @@ import org.xbill.DNS.Flags; import org.xbill.DNS.Message; import testing.templates.MessageTemplates; +import testing.templates.ResponseTemplates; +import java.security.SecureRandom; import java.time.Duration; import java.util.ArrayList; +import java.util.List; import java.util.Random; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -91,6 +100,52 @@ void cantCacheWhenDelegateSolverHasNoAnswer() { assertEquals(0, this.cache.getSize()); } + @Test + void mustEvictLocksAndDeadLocks() throws Exception { + // arrange + final var r = new SecureRandom(); + final Function fn = message -> { + Threads.sleep(r.nextInt(50) + 10); + this.cache.clear(); + return ResponseTemplates.acmeAResponse(); + }; + + final var pool = ThreadPool.newFixed(3); + + // act + this.runNTimes( + it -> pool.submit(() -> this.cache.handle(MessageTemplates.randomHostnameAQuery(), fn)), + 30 + ); + + pool.shutdown(); + pool.awaitTermination(5, TimeUnit.SECONDS); + + // assert + assertTrue(pool.isTerminated()); + assertTrue(pool.isShutdown()); + closePoolWhenItWontGetStuckByDeadlock(pool); + + + } + + private static void closePoolWhenItWontGetStuckByDeadlock(ExecutorService pool) { + pool.close(); + } + + private void runNTimes(final Consumer task, final int times) { + IntStream.range(0, times) + .boxed() + .forEach(task); + } + + private List> buildNTasksOf(final int numberOfTasks, final Function> fn) { + return IntStream.of(numberOfTasks) + .boxed() + .map(fn) + .toList(); + } + @SneakyThrows private void concurrentRequests(int quantity, Message req, Random r) { final var runnables = new ArrayList>(); @@ -117,4 +172,6 @@ private Object handleRequest(Message req, Random r) { }); return null; } + + } diff --git a/src/test/java/testing/templates/MessageTemplates.java b/src/test/java/testing/templates/MessageTemplates.java index 10f994c66..ec4ff3d39 100644 --- a/src/test/java/testing/templates/MessageTemplates.java +++ b/src/test/java/testing/templates/MessageTemplates.java @@ -32,4 +32,8 @@ public static Message acmeNxDomain() { public static Message acmeSoaQuery() { return Messages.soaQuestion(HostnameTemplates.ACME_HOSTNAME); } + + public static Message randomHostnameAQuery() { + return Messages.aQuestion(System.nanoTime() + ".com"); + } } From 2d0990015a01283fea62d1940aa49676b75639e7 Mon Sep 17 00:00:00 2001 From: Elvis de Freitas Date: Thu, 1 Aug 2024 14:26:53 -0300 Subject: [PATCH 02/10] removing calculate code from lock statement to prevent deadlocks --- .../dnsproxyserver/solver/SolverCache.java | 43 +++++++++++++------ 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCache.java b/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCache.java index 987057653..67d20dae4 100644 --- a/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCache.java +++ b/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCache.java @@ -42,24 +42,41 @@ public Message handle(Message query, Function delegate) { public Response handleRes(Message query, Function delegate) { final var key = buildKey(query); - final var cacheValue = this.cache.get(key, (k) -> { - log.trace("status=lookup, key={}, req={}", key, Messages.simplePrint(query)); - final var _res = delegate.apply(query); - if (_res == null) { - log.debug("status=noAnswer, action=cantCache, k={}", k); - return null; - } - final var ttl = _res.getDpsTtl(); - log.debug("status=hotload, k={}, ttl={}, simpleMsg={}", k, ttl, Messages.simplePrint(query)); - return CacheValue.of(_res, ttl); - }); - if (cacheValue == null) { + final var cachedValue = this.cache.getIfPresent(key); + + if (cachedValue != null) { + return mapResponse(query, cachedValue); + } + + final var calculatedValue = calculateValueWithoutLocks(key, query, delegate); + this.cacheValue(key, calculatedValue); + return mapResponse(query, calculatedValue); + } + + void cacheValue(String key, CacheValue calculatedValue) { + this.cache.get(key, k -> calculatedValue); + } + + Response mapResponse(Message query, CacheValue cachedValue) { + if (cachedValue == null) { return null; } - final var response = cacheValue.getResponse(); + final var response = cachedValue.getResponse(); return response.withMessage(Messages.mergeId(query, response.getMessage())); } + CacheValue calculateValueWithoutLocks(String key, Message query, Function delegate) { + log.trace("status=lookup, key={}, req={}", key, Messages.simplePrint(query)); + final var _res = delegate.apply(query); + if (_res == null) { + log.debug("status=noAnswer, action=cantCache, key={}", key); + return null; + } + final var ttl = _res.getDpsTtl(); + log.debug("status=hotload, k={}, ttl={}, simpleMsg={}", key, ttl, Messages.simplePrint(query)); + return CacheValue.of(_res, ttl); + } + static String buildKey(Message reqMsg) { final var type = findQuestionType(reqMsg); return String.format("%s-%s", type != null ? type : UUID.randomUUID(), findQuestionHostname(reqMsg)); From 777ac26376902980535447513cfd6a6f5e87a552 Mon Sep 17 00:00:00 2001 From: Elvis de Freitas Date: Thu, 1 Aug 2024 14:30:01 -0300 Subject: [PATCH 03/10] clean code --- .../dnsproxyserver/solver/SolversCacheTest.java | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/src/test/java/com/mageddo/dnsproxyserver/solver/SolversCacheTest.java b/src/test/java/com/mageddo/dnsproxyserver/solver/SolversCacheTest.java index 791b67f9c..496b20e24 100644 --- a/src/test/java/com/mageddo/dnsproxyserver/solver/SolversCacheTest.java +++ b/src/test/java/com/mageddo/dnsproxyserver/solver/SolversCacheTest.java @@ -16,7 +16,6 @@ import java.security.SecureRandom; import java.time.Duration; import java.util.ArrayList; -import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -125,27 +124,18 @@ void mustEvictLocksAndDeadLocks() throws Exception { assertTrue(pool.isTerminated()); assertTrue(pool.isShutdown()); closePoolWhenItWontGetStuckByDeadlock(pool); - - } - private static void closePoolWhenItWontGetStuckByDeadlock(ExecutorService pool) { + static void closePoolWhenItWontGetStuckByDeadlock(ExecutorService pool) { pool.close(); } - private void runNTimes(final Consumer task, final int times) { + void runNTimes(final Consumer task, final int times) { IntStream.range(0, times) .boxed() .forEach(task); } - private List> buildNTasksOf(final int numberOfTasks, final Function> fn) { - return IntStream.of(numberOfTasks) - .boxed() - .map(fn) - .toList(); - } - @SneakyThrows private void concurrentRequests(int quantity, Message req, Random r) { final var runnables = new ArrayList>(); @@ -173,5 +163,4 @@ private Object handleRequest(Message req, Random r) { return null; } - } From 51bc21fdb88bd1b44e60bc57e8d98b15570bb7f5 Mon Sep 17 00:00:00 2001 From: Elvis de Freitas Date: Thu, 1 Aug 2024 14:48:20 -0300 Subject: [PATCH 04/10] use single threaded queue to performe cache clear to prevent deadlocks --- .../SingleThreadQueueProcessor.java | 62 +++++++++++++++++++ .../solver/SolverCacheFactory.java | 9 ++- .../SolverConsistencyGuaranteeDAOImpl.java | 2 +- 3 files changed, 71 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/mageddo/concurrent/SingleThreadQueueProcessor.java diff --git a/src/main/java/com/mageddo/concurrent/SingleThreadQueueProcessor.java b/src/main/java/com/mageddo/concurrent/SingleThreadQueueProcessor.java new file mode 100644 index 000000000..06645fa9b --- /dev/null +++ b/src/main/java/com/mageddo/concurrent/SingleThreadQueueProcessor.java @@ -0,0 +1,62 @@ +package com.mageddo.concurrent; + +import com.mageddo.commons.concurrent.Threads; +import com.mageddo.commons.lang.exception.UnchekedInterruptedException; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +public class SingleThreadQueueProcessor implements AutoCloseable { + + private final BlockingQueue queue; + private final ExecutorService executor; + + public SingleThreadQueueProcessor() { + this(new LinkedBlockingQueue<>()); + } + + public SingleThreadQueueProcessor(BlockingQueue queue) { + this.queue = queue; + this.executor = Executors.newSingleThreadExecutor(this::buildThread); + this.startConsumer(); + } + + public void schedule(Runnable task) { + try { + this.queue.put(task); + } catch (InterruptedException e) { + throw new UnchekedInterruptedException(e); + } + } + + void startConsumer() { + this.executor.submit(this::consumeQueue); + } + + private void consumeQueue() { + while (true) { + take().run(); + } + } + + Runnable take() { + try { + return this.queue.take(); + } catch (InterruptedException e) { + throw new UnchekedInterruptedException(e); + } + } + + Thread buildThread(Runnable r) { + final var thread = Threads.createDaemonThread(r); + thread.setName("SingleThreadQueueProcessor"); + return thread; + } + + @Override + public void close() throws Exception { + this.executor.close(); + } +} diff --git a/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java b/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java index 9c166236d..cd71b5d48 100644 --- a/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java +++ b/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java @@ -1,5 +1,6 @@ package com.mageddo.dnsproxyserver.solver; +import com.mageddo.concurrent.SingleThreadQueueProcessor; import com.mageddo.dnsproxyserver.solver.CacheName.Name; import lombok.extern.slf4j.Slf4j; @@ -19,6 +20,7 @@ public class SolverCacheFactory { private final SolverCache remote; private final SolverCache global; + private final SingleThreadQueueProcessor queueProcessor; @Inject public SolverCacheFactory( @@ -30,6 +32,7 @@ public SolverCacheFactory( ) { this.remote = remote; this.global = global; + this.queueProcessor = new SingleThreadQueueProcessor(); } public SolverCache getInstance(Name name) { @@ -59,7 +62,7 @@ private List getCaches() { public void clear(Name name) { if (name == null) { - this.clearCaches(); + this.scheduleCacheClear(); return; } this.getInstance(name).clear(); @@ -75,6 +78,10 @@ public Map findInstancesSizeMap(Name name) { /** * This method should be called from one single thread, or it will cause deadlock. */ + public void scheduleCacheClear() { + this.queueProcessor.schedule(this::clearCaches); + } + public void clearCaches() { // fixme #526 possible solutions for the deadlock: // 1 - only one thread can clear the cache at a time diff --git a/src/main/java/com/mageddo/dnsproxyserver/solver/remote/dataprovider/SolverConsistencyGuaranteeDAOImpl.java b/src/main/java/com/mageddo/dnsproxyserver/solver/remote/dataprovider/SolverConsistencyGuaranteeDAOImpl.java index 7feca015b..43c0f56a6 100644 --- a/src/main/java/com/mageddo/dnsproxyserver/solver/remote/dataprovider/SolverConsistencyGuaranteeDAOImpl.java +++ b/src/main/java/com/mageddo/dnsproxyserver/solver/remote/dataprovider/SolverConsistencyGuaranteeDAOImpl.java @@ -16,6 +16,6 @@ public class SolverConsistencyGuaranteeDAOImpl implements SolverConsistencyGuara @Override public void flushCachesFromCircuitBreakerStateChange() { - this.solverCacheFactory.clearCaches(); + this.solverCacheFactory.scheduleCacheClear(); } } From b3131bad861e27eb067a345fc5a20032cb6d4934 Mon Sep 17 00:00:00 2001 From: Elvis de Freitas Date: Thu, 1 Aug 2024 15:10:12 -0300 Subject: [PATCH 05/10] clear cache in background --- .../SingleThreadQueueProcessor.java | 13 +++++++- .../solver/SolverCacheFactory.java | 7 ++--- .../solver/SolverCacheFactoryTest.java | 30 +++++++++++++++++++ src/test/resources/logback-test.xml | 18 +++++++++++ 4 files changed, 63 insertions(+), 5 deletions(-) create mode 100644 src/test/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactoryTest.java create mode 100644 src/test/resources/logback-test.xml diff --git a/src/main/java/com/mageddo/concurrent/SingleThreadQueueProcessor.java b/src/main/java/com/mageddo/concurrent/SingleThreadQueueProcessor.java index 06645fa9b..810315a9e 100644 --- a/src/main/java/com/mageddo/concurrent/SingleThreadQueueProcessor.java +++ b/src/main/java/com/mageddo/concurrent/SingleThreadQueueProcessor.java @@ -2,16 +2,20 @@ import com.mageddo.commons.concurrent.Threads; import com.mageddo.commons.lang.exception.UnchekedInterruptedException; +import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; +@Slf4j public class SingleThreadQueueProcessor implements AutoCloseable { private final BlockingQueue queue; private final ExecutorService executor; + private final AtomicInteger processedCounter = new AtomicInteger(0); public SingleThreadQueueProcessor() { this(new LinkedBlockingQueue<>()); @@ -37,7 +41,10 @@ void startConsumer() { private void consumeQueue() { while (true) { - take().run(); + final var r = take(); + r.run(); + this.processedCounter.getAndIncrement(); + log.trace("status=processed, count={}, task={}", this.getProcessedCount(), r); } } @@ -59,4 +66,8 @@ Thread buildThread(Runnable r) { public void close() throws Exception { this.executor.close(); } + + public int getProcessedCount() { + return this.processedCounter.get(); + } } diff --git a/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java b/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java index cd71b5d48..168ffe439 100644 --- a/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java +++ b/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java @@ -80,16 +80,15 @@ public Map findInstancesSizeMap(Name name) { */ public void scheduleCacheClear() { this.queueProcessor.schedule(this::clearCaches); + log.debug("status=scheduled"); } - public void clearCaches() { - // fixme #526 possible solutions for the deadlock: - // 1 - only one thread can clear the cache at a time - // 2 - move the locks to one centralized thread responsible for the cache management + void clearCaches() { for (final var cache : this.getCaches()) { log.trace("status=clearing, cache={}", cache.name()); cache.clear(); log.trace("status=cleared, cache={}", cache.name()); } + log.debug("status=finished, caches={}", this.getCaches().size()); } } diff --git a/src/test/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactoryTest.java b/src/test/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactoryTest.java new file mode 100644 index 000000000..9999f129f --- /dev/null +++ b/src/test/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactoryTest.java @@ -0,0 +1,30 @@ +package com.mageddo.dnsproxyserver.solver; + +import com.mageddo.commons.concurrent.Threads; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +@ExtendWith(MockitoExtension.class) +class SolverCacheFactoryTest { + + SolverCacheFactory factory = spy(new SolverCacheFactory( + new SolverCache(CacheName.Name.GLOBAL), new SolverCache(CacheName.Name.GLOBAL) + )); + + @Test + void mustClearCacheInBackground(){ + // arrange + + // act + this.factory.scheduleCacheClear(); + Threads.sleep(30); + + // assert + verify(this.factory).clearCaches(); + + } +} diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml new file mode 100644 index 000000000..6d7feea88 --- /dev/null +++ b/src/test/resources/logback-test.xml @@ -0,0 +1,18 @@ + + + + + + %d{HH:mm:ss.SSS} [%-15.15thread] %3.-3level %-50.50logger{50}l=%-4.4line m=%-30.30method{30} %mdc %msg%n + + utf8 + + + + + + + + + + From a50f69ae402329fd8e85ed82a8a88256fb5c21c0 Mon Sep 17 00:00:00 2001 From: Elvis de Freitas Date: Thu, 1 Aug 2024 15:11:31 -0300 Subject: [PATCH 06/10] add asserts --- .../com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java | 4 ++++ .../mageddo/dnsproxyserver/solver/SolverCacheFactoryTest.java | 3 +++ 2 files changed, 7 insertions(+) diff --git a/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java b/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java index 168ffe439..f579ddd46 100644 --- a/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java +++ b/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java @@ -91,4 +91,8 @@ void clearCaches() { } log.debug("status=finished, caches={}", this.getCaches().size()); } + + public int getProcessedInBackground(){ + return this.queueProcessor.getProcessedCount(); + } } diff --git a/src/test/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactoryTest.java b/src/test/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactoryTest.java index 9999f129f..00faf8f64 100644 --- a/src/test/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactoryTest.java +++ b/src/test/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactoryTest.java @@ -5,6 +5,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.junit.jupiter.MockitoExtension; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -18,6 +19,7 @@ class SolverCacheFactoryTest { @Test void mustClearCacheInBackground(){ // arrange + assertEquals(0, this.factory.getProcessedInBackground()); // act this.factory.scheduleCacheClear(); @@ -25,6 +27,7 @@ void mustClearCacheInBackground(){ // assert verify(this.factory).clearCaches(); + assertEquals(1, this.factory.getProcessedInBackground()); } } From b992b51fd2845cc915809487fc8cccdc808bf737 Mon Sep 17 00:00:00 2001 From: Elvis de Freitas Date: Thu, 1 Aug 2024 15:23:14 -0300 Subject: [PATCH 07/10] release notes --- RELEASE-NOTES.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index 1e1bc5155..edf45d665 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -1,3 +1,6 @@ +## 3.25.5 +* Change the caching strategy to minimize the locks and prevent deadlocks #522 + ## 3.25.4 * Disable CircuitBreaker check by using ping, keep using the queries to check the server status #526 * Disable Cache Flush when Circuit Breaker change state to Half Open #526 From ac76c0ad443accdebd704386a110c15cdbeb5220 Mon Sep 17 00:00:00 2001 From: Elvis de Freitas Date: Thu, 1 Aug 2024 15:23:28 -0300 Subject: [PATCH 08/10] [Gradle Release Plugin] - new version commit: '3.25.5-snapshot'. --- gradle.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gradle.properties b/gradle.properties index 6728fa276..b5e428d4c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=3.25.4-snapshot +version=3.25.5-snapshot From 697a84a7e9c6bb491c759695ab2e977ca4c15b0e Mon Sep 17 00:00:00 2001 From: Elvis de Freitas Date: Thu, 1 Aug 2024 15:26:25 -0300 Subject: [PATCH 09/10] change thread name --- .../java/com/mageddo/concurrent/SingleThreadQueueProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/mageddo/concurrent/SingleThreadQueueProcessor.java b/src/main/java/com/mageddo/concurrent/SingleThreadQueueProcessor.java index 810315a9e..7886bd316 100644 --- a/src/main/java/com/mageddo/concurrent/SingleThreadQueueProcessor.java +++ b/src/main/java/com/mageddo/concurrent/SingleThreadQueueProcessor.java @@ -58,7 +58,7 @@ Runnable take() { Thread buildThread(Runnable r) { final var thread = Threads.createDaemonThread(r); - thread.setName("SingleThreadQueueProcessor"); + thread.setName("queueProcessor"); return thread; } From a888fb31305fbeebd2b58f10672fdc3b74df97de Mon Sep 17 00:00:00 2001 From: Elvis de Freitas Date: Thu, 1 Aug 2024 15:27:16 -0300 Subject: [PATCH 10/10] change docs --- .../com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java b/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java index f579ddd46..dc56dcdd7 100644 --- a/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java +++ b/src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java @@ -76,7 +76,7 @@ public Map findInstancesSizeMap(Name name) { } /** - * This method should be called from one single thread, or it will cause deadlock. + * This method should be called from one single thread, or it can cause deadlock, see #522 */ public void scheduleCacheClear() { this.queueProcessor.schedule(this::clearCaches);