Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the caching strategy to minimize the locks and prevent deadlocks #522 #529

Merged
merged 10 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.25.5
* Change the caching strategy to minimize the locks and prevent deadlocks #522

## 3.25.4
* Disable CircuitBreaker check by using ping, keep using the queries to check the server status #526
* Disable Cache Flush when Circuit Breaker change state to Half Open #526
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=3.25.4-snapshot
version=3.25.5-snapshot
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.mageddo.concurrent;

import com.mageddo.commons.concurrent.Threads;
import com.mageddo.commons.lang.exception.UnchekedInterruptedException;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class SingleThreadQueueProcessor implements AutoCloseable {

private final BlockingQueue<Runnable> queue;
private final ExecutorService executor;
private final AtomicInteger processedCounter = new AtomicInteger(0);

public SingleThreadQueueProcessor() {
this(new LinkedBlockingQueue<>());
}

public SingleThreadQueueProcessor(BlockingQueue<Runnable> queue) {
this.queue = queue;
this.executor = Executors.newSingleThreadExecutor(this::buildThread);
this.startConsumer();
}

public void schedule(Runnable task) {
try {
this.queue.put(task);
} catch (InterruptedException e) {
throw new UnchekedInterruptedException(e);
}
}

void startConsumer() {
this.executor.submit(this::consumeQueue);
}

private void consumeQueue() {
while (true) {
final var r = take();
r.run();
this.processedCounter.getAndIncrement();
log.trace("status=processed, count={}, task={}", this.getProcessedCount(), r);
}
}

Runnable take() {
try {
return this.queue.take();
} catch (InterruptedException e) {
throw new UnchekedInterruptedException(e);
}
}

Thread buildThread(Runnable r) {
final var thread = Threads.createDaemonThread(r);
thread.setName("queueProcessor");
return thread;
}

@Override
public void close() throws Exception {
this.executor.close();
}

public int getProcessedCount() {
return this.processedCounter.get();
}
}
43 changes: 30 additions & 13 deletions src/main/java/com/mageddo/dnsproxyserver/solver/SolverCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,41 @@ public Message handle(Message query, Function<Message, Response> delegate) {

public Response handleRes(Message query, Function<Message, Response> delegate) {
final var key = buildKey(query);
final var cacheValue = this.cache.get(key, (k) -> {
log.trace("status=lookup, key={}, req={}", key, Messages.simplePrint(query));
final var _res = delegate.apply(query);
if (_res == null) {
log.debug("status=noAnswer, action=cantCache, k={}", k);
return null;
}
final var ttl = _res.getDpsTtl();
log.debug("status=hotload, k={}, ttl={}, simpleMsg={}", k, ttl, Messages.simplePrint(query));
return CacheValue.of(_res, ttl);
});
if (cacheValue == null) {
final var cachedValue = this.cache.getIfPresent(key);

if (cachedValue != null) {
return mapResponse(query, cachedValue);
}

final var calculatedValue = calculateValueWithoutLocks(key, query, delegate);
this.cacheValue(key, calculatedValue);
return mapResponse(query, calculatedValue);
}

void cacheValue(String key, CacheValue calculatedValue) {
this.cache.get(key, k -> calculatedValue);
}

Response mapResponse(Message query, CacheValue cachedValue) {
if (cachedValue == null) {
return null;
}
final var response = cacheValue.getResponse();
final var response = cachedValue.getResponse();
return response.withMessage(Messages.mergeId(query, response.getMessage()));
}

CacheValue calculateValueWithoutLocks(String key, Message query, Function<Message, Response> delegate) {
log.trace("status=lookup, key={}, req={}", key, Messages.simplePrint(query));
final var _res = delegate.apply(query);
if (_res == null) {
log.debug("status=noAnswer, action=cantCache, key={}", key);
return null;
}
final var ttl = _res.getDpsTtl();
log.debug("status=hotload, k={}, ttl={}, simpleMsg={}", key, ttl, Messages.simplePrint(query));
return CacheValue.of(_res, ttl);
}

static String buildKey(Message reqMsg) {
final var type = findQuestionType(reqMsg);
return String.format("%s-%s", type != null ? type : UUID.randomUUID(), findQuestionHostname(reqMsg));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.mageddo.dnsproxyserver.solver;

import com.mageddo.concurrent.SingleThreadQueueProcessor;
import com.mageddo.dnsproxyserver.solver.CacheName.Name;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -19,6 +20,7 @@ public class SolverCacheFactory {

private final SolverCache remote;
private final SolverCache global;
private final SingleThreadQueueProcessor queueProcessor;

@Inject
public SolverCacheFactory(
Expand All @@ -30,6 +32,7 @@ public SolverCacheFactory(
) {
this.remote = remote;
this.global = global;
this.queueProcessor = new SingleThreadQueueProcessor();
}

public SolverCache getInstance(Name name) {
Expand Down Expand Up @@ -59,7 +62,7 @@ private List<SolverCache> getCaches() {

public void clear(Name name) {
if (name == null) {
this.clearCaches();
this.scheduleCacheClear();
return;
}
this.getInstance(name).clear();
Expand All @@ -73,16 +76,23 @@ public Map<String, Integer> findInstancesSizeMap(Name name) {
}

/**
* This method should be called from one single thread, or it will cause deadlock.
* This method should be called from one single thread, or it can cause deadlock, see #522
*/
public void clearCaches() {
// fixme #526 possible solutions for the deadlock:
// 1 - only one thread can clear the cache at a time
// 2 - move the locks to one centralized thread responsible for the cache management
public void scheduleCacheClear() {
this.queueProcessor.schedule(this::clearCaches);
log.debug("status=scheduled");
}

void clearCaches() {
for (final var cache : this.getCaches()) {
log.trace("status=clearing, cache={}", cache.name());
cache.clear();
log.trace("status=cleared, cache={}", cache.name());
}
log.debug("status=finished, caches={}", this.getCaches().size());
}

public int getProcessedInBackground(){
return this.queueProcessor.getProcessedCount();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ public class SolverConsistencyGuaranteeDAOImpl implements SolverConsistencyGuara

@Override
public void flushCachesFromCircuitBreakerStateChange() {
this.solverCacheFactory.clearCaches();
this.solverCacheFactory.scheduleCacheClear();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.mageddo.dnsproxyserver.solver;

import com.mageddo.commons.concurrent.Threads;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

@ExtendWith(MockitoExtension.class)
class SolverCacheFactoryTest {

SolverCacheFactory factory = spy(new SolverCacheFactory(
new SolverCache(CacheName.Name.GLOBAL), new SolverCache(CacheName.Name.GLOBAL)
));

@Test
void mustClearCacheInBackground(){
// arrange
assertEquals(0, this.factory.getProcessedInBackground());

// act
this.factory.scheduleCacheClear();
Threads.sleep(30);

// assert
verify(this.factory).clearCaches();
assertEquals(1, this.factory.getProcessedInBackground());

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.mageddo.dnsproxyserver.solver;

import com.mageddo.commons.concurrent.ThreadPool;
import com.mageddo.commons.concurrent.Threads;
import com.mageddo.dns.utils.Messages;
import com.mageddo.dnsproxyserver.solver.CacheName.Name;
Expand All @@ -10,12 +11,19 @@
import org.xbill.DNS.Flags;
import org.xbill.DNS.Message;
import testing.templates.MessageTemplates;
import testing.templates.ResponseTemplates;

import java.security.SecureRandom;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.IntStream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -91,6 +99,43 @@ void cantCacheWhenDelegateSolverHasNoAnswer() {
assertEquals(0, this.cache.getSize());
}

@Test
void mustEvictLocksAndDeadLocks() throws Exception {
// arrange
final var r = new SecureRandom();
final Function<Message, Response> fn = message -> {
Threads.sleep(r.nextInt(50) + 10);
this.cache.clear();
return ResponseTemplates.acmeAResponse();
};

final var pool = ThreadPool.newFixed(3);

// act
this.runNTimes(
it -> pool.submit(() -> this.cache.handle(MessageTemplates.randomHostnameAQuery(), fn)),
30
);

pool.shutdown();
pool.awaitTermination(5, TimeUnit.SECONDS);

// assert
assertTrue(pool.isTerminated());
assertTrue(pool.isShutdown());
closePoolWhenItWontGetStuckByDeadlock(pool);
}

static void closePoolWhenItWontGetStuckByDeadlock(ExecutorService pool) {
pool.close();
}

void runNTimes(final Consumer<Integer> task, final int times) {
IntStream.range(0, times)
.boxed()
.forEach(task);
}

@SneakyThrows
private void concurrentRequests(int quantity, Message req, Random r) {
final var runnables = new ArrayList<Callable<Object>>();
Expand All @@ -117,4 +162,5 @@ private Object handleRequest(Message req, Random r) {
});
return null;
}

}
4 changes: 4 additions & 0 deletions src/test/java/testing/templates/MessageTemplates.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ public static Message acmeNxDomain() {
public static Message acmeSoaQuery() {
return Messages.soaQuestion(HostnameTemplates.ACME_HOSTNAME);
}

public static Message randomHostnameAQuery() {
return Messages.aQuestion(System.nanoTime() + ".com");
}
}
18 changes: 18 additions & 0 deletions src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>
%d{HH:mm:ss.SSS} [%-15.15thread] %3.-3level %-50.50logger{50}l=%-4.4line m=%-30.30method{30} %mdc %msg%n
</pattern>
<charset>utf8</charset>
</encoder>
</appender>

<root level="DEBUG">
<appender-ref ref="CONSOLE"/>
</root>

<logger name="com.mageddo" level="TRACE"/>

</configuration>
Loading