Skip to content

Commit 658211e

Browse files
authored
Change the caching strategy to minimize the locks and prevent deadlocks #522 (#529)
* testing deadlock * removing calculate code from lock statement to prevent deadlocks * clean code * use single threaded queue to performe cache clear to prevent deadlocks * clear cache in background * add asserts * release notes * [Gradle Release Plugin] - new version commit: '3.25.5-snapshot'. * change thread name * change docs
1 parent 672dfc8 commit 658211e

File tree

10 files changed

+225
-21
lines changed

10 files changed

+225
-21
lines changed

RELEASE-NOTES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.25.5
2+
* Change the caching strategy to minimize the locks and prevent deadlocks #522
3+
14
## 3.25.4
25
* Disable CircuitBreaker check by using ping, keep using the queries to check the server status #526
36
* Disable Cache Flush when Circuit Breaker change state to Half Open #526

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=3.25.4-snapshot
1+
version=3.25.5-snapshot
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.mageddo.concurrent;
2+
3+
import com.mageddo.commons.concurrent.Threads;
4+
import com.mageddo.commons.lang.exception.UnchekedInterruptedException;
5+
import lombok.extern.slf4j.Slf4j;
6+
7+
import java.util.concurrent.BlockingQueue;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
10+
import java.util.concurrent.LinkedBlockingQueue;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
13+
@Slf4j
14+
public class SingleThreadQueueProcessor implements AutoCloseable {
15+
16+
private final BlockingQueue<Runnable> queue;
17+
private final ExecutorService executor;
18+
private final AtomicInteger processedCounter = new AtomicInteger(0);
19+
20+
public SingleThreadQueueProcessor() {
21+
this(new LinkedBlockingQueue<>());
22+
}
23+
24+
public SingleThreadQueueProcessor(BlockingQueue<Runnable> queue) {
25+
this.queue = queue;
26+
this.executor = Executors.newSingleThreadExecutor(this::buildThread);
27+
this.startConsumer();
28+
}
29+
30+
public void schedule(Runnable task) {
31+
try {
32+
this.queue.put(task);
33+
} catch (InterruptedException e) {
34+
throw new UnchekedInterruptedException(e);
35+
}
36+
}
37+
38+
void startConsumer() {
39+
this.executor.submit(this::consumeQueue);
40+
}
41+
42+
private void consumeQueue() {
43+
while (true) {
44+
final var r = take();
45+
r.run();
46+
this.processedCounter.getAndIncrement();
47+
log.trace("status=processed, count={}, task={}", this.getProcessedCount(), r);
48+
}
49+
}
50+
51+
Runnable take() {
52+
try {
53+
return this.queue.take();
54+
} catch (InterruptedException e) {
55+
throw new UnchekedInterruptedException(e);
56+
}
57+
}
58+
59+
Thread buildThread(Runnable r) {
60+
final var thread = Threads.createDaemonThread(r);
61+
thread.setName("queueProcessor");
62+
return thread;
63+
}
64+
65+
@Override
66+
public void close() throws Exception {
67+
this.executor.close();
68+
}
69+
70+
public int getProcessedCount() {
71+
return this.processedCounter.get();
72+
}
73+
}

src/main/java/com/mageddo/dnsproxyserver/solver/SolverCache.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,24 +42,41 @@ public Message handle(Message query, Function<Message, Response> delegate) {
4242

4343
public Response handleRes(Message query, Function<Message, Response> delegate) {
4444
final var key = buildKey(query);
45-
final var cacheValue = this.cache.get(key, (k) -> {
46-
log.trace("status=lookup, key={}, req={}", key, Messages.simplePrint(query));
47-
final var _res = delegate.apply(query);
48-
if (_res == null) {
49-
log.debug("status=noAnswer, action=cantCache, k={}", k);
50-
return null;
51-
}
52-
final var ttl = _res.getDpsTtl();
53-
log.debug("status=hotload, k={}, ttl={}, simpleMsg={}", k, ttl, Messages.simplePrint(query));
54-
return CacheValue.of(_res, ttl);
55-
});
56-
if (cacheValue == null) {
45+
final var cachedValue = this.cache.getIfPresent(key);
46+
47+
if (cachedValue != null) {
48+
return mapResponse(query, cachedValue);
49+
}
50+
51+
final var calculatedValue = calculateValueWithoutLocks(key, query, delegate);
52+
this.cacheValue(key, calculatedValue);
53+
return mapResponse(query, calculatedValue);
54+
}
55+
56+
void cacheValue(String key, CacheValue calculatedValue) {
57+
this.cache.get(key, k -> calculatedValue);
58+
}
59+
60+
Response mapResponse(Message query, CacheValue cachedValue) {
61+
if (cachedValue == null) {
5762
return null;
5863
}
59-
final var response = cacheValue.getResponse();
64+
final var response = cachedValue.getResponse();
6065
return response.withMessage(Messages.mergeId(query, response.getMessage()));
6166
}
6267

68+
CacheValue calculateValueWithoutLocks(String key, Message query, Function<Message, Response> delegate) {
69+
log.trace("status=lookup, key={}, req={}", key, Messages.simplePrint(query));
70+
final var _res = delegate.apply(query);
71+
if (_res == null) {
72+
log.debug("status=noAnswer, action=cantCache, key={}", key);
73+
return null;
74+
}
75+
final var ttl = _res.getDpsTtl();
76+
log.debug("status=hotload, k={}, ttl={}, simpleMsg={}", key, ttl, Messages.simplePrint(query));
77+
return CacheValue.of(_res, ttl);
78+
}
79+
6380
static String buildKey(Message reqMsg) {
6481
final var type = findQuestionType(reqMsg);
6582
return String.format("%s-%s", type != null ? type : UUID.randomUUID(), findQuestionHostname(reqMsg));

src/main/java/com/mageddo/dnsproxyserver/solver/SolverCacheFactory.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.mageddo.dnsproxyserver.solver;
22

3+
import com.mageddo.concurrent.SingleThreadQueueProcessor;
34
import com.mageddo.dnsproxyserver.solver.CacheName.Name;
45
import lombok.extern.slf4j.Slf4j;
56

@@ -19,6 +20,7 @@ public class SolverCacheFactory {
1920

2021
private final SolverCache remote;
2122
private final SolverCache global;
23+
private final SingleThreadQueueProcessor queueProcessor;
2224

2325
@Inject
2426
public SolverCacheFactory(
@@ -30,6 +32,7 @@ public SolverCacheFactory(
3032
) {
3133
this.remote = remote;
3234
this.global = global;
35+
this.queueProcessor = new SingleThreadQueueProcessor();
3336
}
3437

3538
public SolverCache getInstance(Name name) {
@@ -59,7 +62,7 @@ private List<SolverCache> getCaches() {
5962

6063
public void clear(Name name) {
6164
if (name == null) {
62-
this.clearCaches();
65+
this.scheduleCacheClear();
6366
return;
6467
}
6568
this.getInstance(name).clear();
@@ -73,16 +76,23 @@ public Map<String, Integer> findInstancesSizeMap(Name name) {
7376
}
7477

7578
/**
76-
* This method should be called from one single thread, or it will cause deadlock.
79+
* This method should be called from one single thread, or it can cause deadlock, see #522
7780
*/
78-
public void clearCaches() {
79-
// fixme #526 possible solutions for the deadlock:
80-
// 1 - only one thread can clear the cache at a time
81-
// 2 - move the locks to one centralized thread responsible for the cache management
81+
public void scheduleCacheClear() {
82+
this.queueProcessor.schedule(this::clearCaches);
83+
log.debug("status=scheduled");
84+
}
85+
86+
void clearCaches() {
8287
for (final var cache : this.getCaches()) {
8388
log.trace("status=clearing, cache={}", cache.name());
8489
cache.clear();
8590
log.trace("status=cleared, cache={}", cache.name());
8691
}
92+
log.debug("status=finished, caches={}", this.getCaches().size());
93+
}
94+
95+
public int getProcessedInBackground(){
96+
return this.queueProcessor.getProcessedCount();
8797
}
8898
}

src/main/java/com/mageddo/dnsproxyserver/solver/remote/dataprovider/SolverConsistencyGuaranteeDAOImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ public class SolverConsistencyGuaranteeDAOImpl implements SolverConsistencyGuara
1616

1717
@Override
1818
public void flushCachesFromCircuitBreakerStateChange() {
19-
this.solverCacheFactory.clearCaches();
19+
this.solverCacheFactory.scheduleCacheClear();
2020
}
2121
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package com.mageddo.dnsproxyserver.solver;
2+
3+
import com.mageddo.commons.concurrent.Threads;
4+
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.api.extension.ExtendWith;
6+
import org.mockito.junit.jupiter.MockitoExtension;
7+
8+
import static org.junit.jupiter.api.Assertions.assertEquals;
9+
import static org.mockito.Mockito.spy;
10+
import static org.mockito.Mockito.verify;
11+
12+
@ExtendWith(MockitoExtension.class)
13+
class SolverCacheFactoryTest {
14+
15+
SolverCacheFactory factory = spy(new SolverCacheFactory(
16+
new SolverCache(CacheName.Name.GLOBAL), new SolverCache(CacheName.Name.GLOBAL)
17+
));
18+
19+
@Test
20+
void mustClearCacheInBackground(){
21+
// arrange
22+
assertEquals(0, this.factory.getProcessedInBackground());
23+
24+
// act
25+
this.factory.scheduleCacheClear();
26+
Threads.sleep(30);
27+
28+
// assert
29+
verify(this.factory).clearCaches();
30+
assertEquals(1, this.factory.getProcessedInBackground());
31+
32+
}
33+
}

src/test/java/com/mageddo/dnsproxyserver/solver/SolversCacheTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.mageddo.dnsproxyserver.solver;
22

3+
import com.mageddo.commons.concurrent.ThreadPool;
34
import com.mageddo.commons.concurrent.Threads;
45
import com.mageddo.dns.utils.Messages;
56
import com.mageddo.dnsproxyserver.solver.CacheName.Name;
@@ -10,12 +11,19 @@
1011
import org.xbill.DNS.Flags;
1112
import org.xbill.DNS.Message;
1213
import testing.templates.MessageTemplates;
14+
import testing.templates.ResponseTemplates;
1315

16+
import java.security.SecureRandom;
1417
import java.time.Duration;
1518
import java.util.ArrayList;
1619
import java.util.Random;
1720
import java.util.concurrent.Callable;
21+
import java.util.concurrent.ExecutorService;
1822
import java.util.concurrent.Executors;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.function.Consumer;
25+
import java.util.function.Function;
26+
import java.util.stream.IntStream;
1927

2028
import static org.junit.jupiter.api.Assertions.assertEquals;
2129
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -91,6 +99,43 @@ void cantCacheWhenDelegateSolverHasNoAnswer() {
9199
assertEquals(0, this.cache.getSize());
92100
}
93101

102+
@Test
103+
void mustEvictLocksAndDeadLocks() throws Exception {
104+
// arrange
105+
final var r = new SecureRandom();
106+
final Function<Message, Response> fn = message -> {
107+
Threads.sleep(r.nextInt(50) + 10);
108+
this.cache.clear();
109+
return ResponseTemplates.acmeAResponse();
110+
};
111+
112+
final var pool = ThreadPool.newFixed(3);
113+
114+
// act
115+
this.runNTimes(
116+
it -> pool.submit(() -> this.cache.handle(MessageTemplates.randomHostnameAQuery(), fn)),
117+
30
118+
);
119+
120+
pool.shutdown();
121+
pool.awaitTermination(5, TimeUnit.SECONDS);
122+
123+
// assert
124+
assertTrue(pool.isTerminated());
125+
assertTrue(pool.isShutdown());
126+
closePoolWhenItWontGetStuckByDeadlock(pool);
127+
}
128+
129+
static void closePoolWhenItWontGetStuckByDeadlock(ExecutorService pool) {
130+
pool.close();
131+
}
132+
133+
void runNTimes(final Consumer<Integer> task, final int times) {
134+
IntStream.range(0, times)
135+
.boxed()
136+
.forEach(task);
137+
}
138+
94139
@SneakyThrows
95140
private void concurrentRequests(int quantity, Message req, Random r) {
96141
final var runnables = new ArrayList<Callable<Object>>();
@@ -117,4 +162,5 @@ private Object handleRequest(Message req, Random r) {
117162
});
118163
return null;
119164
}
165+
120166
}

src/test/java/testing/templates/MessageTemplates.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,8 @@ public static Message acmeNxDomain() {
3232
public static Message acmeSoaQuery() {
3333
return Messages.soaQuestion(HostnameTemplates.ACME_HOSTNAME);
3434
}
35+
36+
public static Message randomHostnameAQuery() {
37+
return Messages.aQuestion(System.nanoTime() + ".com");
38+
}
3539
}

src/test/resources/logback-test.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<configuration>
3+
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
4+
<encoder>
5+
<pattern>
6+
%d{HH:mm:ss.SSS} [%-15.15thread] %3.-3level %-50.50logger{50}l=%-4.4line m=%-30.30method{30} %mdc %msg%n
7+
</pattern>
8+
<charset>utf8</charset>
9+
</encoder>
10+
</appender>
11+
12+
<root level="DEBUG">
13+
<appender-ref ref="CONSOLE"/>
14+
</root>
15+
16+
<logger name="com.mageddo" level="TRACE"/>
17+
18+
</configuration>

0 commit comments

Comments
 (0)