Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solver Remote Refactoring #589

Merged
merged 9 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.30.3
* Solver Remote Refactoring. #589

## 3.30.2
* Detect which condition didn't match and log the cause when configuring DPS as default DNS #580
* Fixed backend develop docker-compose.yml
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=3.30.2-snapshot
version=3.30.3-snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ public CircuitCheckException(String message) {
public CircuitCheckException(Throwable e) {
super(e.getMessage(), e);
}

public CircuitCheckException(String message, Throwable e) {
super(message, e);
}
}
93 changes: 5 additions & 88 deletions src/main/java/com/mageddo/dnsproxyserver/solver/SolverRemote.java
Original file line number Diff line number Diff line change
@@ -1,42 +1,31 @@
package com.mageddo.dnsproxyserver.solver;

import com.mageddo.commons.circuitbreaker.CircuitCheckException;
import com.mageddo.dns.utils.Messages;
import com.mageddo.dnsproxyserver.solver.remote.Request;
import com.mageddo.dnsproxyserver.solver.remote.Result;
import com.mageddo.dnsproxyserver.solver.remote.application.CircuitBreakerService;
import com.mageddo.dnsproxyserver.solver.remote.application.RemoteResultSupplier;
import com.mageddo.dnsproxyserver.solver.remote.application.ResolverStatsFactory;
import com.mageddo.dnsproxyserver.solver.remote.application.ResultSupplier;
import com.mageddo.net.NetExecutorWatchdog;
import com.mageddo.utils.Executors;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.xbill.DNS.Flags;
import org.xbill.DNS.Message;

import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static com.mageddo.dns.utils.Messages.simplePrint;

@Slf4j
@Singleton
@RequiredArgsConstructor(onConstructor = @__({@Inject}))
public class SolverRemote implements Solver, AutoCloseable {

static final String QUERY_TIMED_OUT_MSG = "Query timed out";
public static final int PING_TIMEOUT_IN_MS = 1_500;

private final CircuitBreakerService circuitBreakerService;
private final ResolverStatsFactory resolverStatsFactory;
private final NetExecutorWatchdog netWatchdog = new NetExecutorWatchdog();
Expand Down Expand Up @@ -93,83 +82,11 @@ Request buildRequest(Message query, int resolverIndex, StopWatch stopWatch, Reso

Result safeQueryResult(Request req) {
req.splitStopWatch();
return this.queryUsingCircuitBreaker(req, () -> this.queryResult(req));
}

Result queryUsingCircuitBreaker(Request req, Supplier<Result> sup) {
return this.circuitBreakerService.safeHandle(req.getResolverAddress(), sup);
}

Result queryResult(Request req) {
final var resFuture = this.sendQueryAsyncToResolver(req);
if (this.isPingWhileGettingQueryResponseActive()) {
this.pingWhileGettingQueryResponse(req, resFuture);
}
return this.transformToResult(resFuture, req);
}

CompletableFuture<Message> sendQueryAsyncToResolver(Request req) {
return req.sendQueryAsyncToResolver(this.executor);
return this.queryUsingCircuitBreaker(new RemoteResultSupplier(req, this.executor, this.netWatchdog));
}

void pingWhileGettingQueryResponse(Request req, CompletableFuture<Message> resFuture) {
this.netWatchdog.watch(req.getResolverAddr(), resFuture, PING_TIMEOUT_IN_MS);
}

boolean isPingWhileGettingQueryResponseActive() {
return Boolean.getBoolean("mg.solverRemote.pingWhileGettingQueryResponse");
}

Result transformToResult(CompletableFuture<Message> resFuture, Request request) {
final var res = this.findFutureRes(resFuture, request);
if (res == null) {
return Result.empty();
}

if (Messages.isSuccess(res)) {
log.trace(
"status=found, i={}, time={}, req={}, res={}, server={}",
request.getResolverIndex(), request.getTime(), simplePrint(request.getQuery()),
simplePrint(res), request.getResolverAddress()
);
return Result.fromSuccessResponse(Response.success(res));
} else {
log.trace(
"status=notFound, i={}, time={}, req={}, res={}, server={}",
request.getResolverIndex(), request.getTime(), simplePrint(request.getQuery()),
simplePrint(res), request.getResolverAddress()
);
return Result.fromErrorMessage(res);
}
}

Message findFutureRes(CompletableFuture<Message> resFuture, Request request) {
try {
return Messages.setFlag(resFuture.get(), Flags.RA);
} catch (InterruptedException | ExecutionException e) {
this.checkCircuitError(e, request);
return null;
}
}

void checkCircuitError(Exception e, Request request) {
if (e.getCause() instanceof IOException) {
final var time = request.getElapsedTimeInMs();
if (e.getMessage().contains(QUERY_TIMED_OUT_MSG)) {
log.info(
"status=timedOut, i={}, time={}, req={}, msg={} class={}",
request.getResolverIndex(), time, simplePrint(request.getQuery()), e.getMessage(), ClassUtils.getSimpleName(e)
);
throw new CircuitCheckException(e);
}
log.warn(
"status=failed, i={}, time={}, req={}, server={}, errClass={}, msg={}",
request.getResolverIndex(), time, simplePrint(request.getQuery()), request.getResolverAddress(),
ClassUtils.getSimpleName(e), e.getMessage(), e
);
} else {
throw new RuntimeException(e.getMessage(), e);
}
Result queryUsingCircuitBreaker(ResultSupplier sup) {
return this.circuitBreakerService.safeHandle(sup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package com.mageddo.dnsproxyserver.solver.remote;

import com.mageddo.dns.utils.Messages;
import com.mageddo.dnsproxyserver.solver.Resolver;
import com.mageddo.net.IpAddr;
import com.mageddo.net.IpAddrs;
import lombok.Builder;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import org.xbill.DNS.Message;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

@Slf4j
@Value
@Builder
public class Request {
Expand Down Expand Up @@ -42,6 +45,7 @@ public void splitStopWatch() {
}

public CompletableFuture<Message> sendQueryAsyncToResolver(Executor executor) {
log.trace("status=querying, server={}, req={}", this.resolver.getAddress(), Messages.simplePrint(this.query));
return this.resolver.sendAsync(this.query, executor).toCompletableFuture();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ public class CircuitBreakerService {

private String status;

public Result safeHandle(InetSocketAddress resolverAddress, Supplier<Result> sup) {
public Result safeHandle(ResultSupplier sup) {
try {
return this.handle(resolverAddress, sup);
return this.handle(sup);
} catch (CircuitCheckException | CircuitIsOpenException e) {
final var clazz = ClassUtils.getSimpleName(e);
log.debug("status=circuitEvent, server={}, type={}", resolverAddress, clazz);
this.status = String.format("%s for %s", clazz, resolverAddress);
log.debug("status=circuitEvent, server={}, type={}", sup.getRemoteAddress(), clazz);
this.status = String.format("%s for %s", clazz, sup.getRemoteAddress());
return Result.empty();
}
}

private Result handle(InetSocketAddress resolverAddress, Supplier<Result> sup) {
return this.circuitBreakerFactory.check(resolverAddress, sup);
private Result handle(ResultSupplier sup) {
return this.circuitBreakerFactory.check(sup);
}

public String getStatus() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.mageddo.dnsproxyserver.solver.remote.application;

import com.mageddo.dnsproxyserver.solver.remote.Request;
import com.mageddo.dnsproxyserver.solver.remote.Result;
import com.mageddo.dnsproxyserver.solver.remote.application.mapper.ResultMapper;
import com.mageddo.net.IpAddr;
import com.mageddo.net.NetExecutorWatchdog;
import lombok.extern.slf4j.Slf4j;
import org.xbill.DNS.Message;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

@Slf4j
public class RemoteResultSupplier implements ResultSupplier {

public static final int PING_TIMEOUT_IN_MS = 1_500;

private final Request req;
private final Executor executor;
private final NetExecutorWatchdog netWatchdog;

public RemoteResultSupplier(Request req, Executor executor, NetExecutorWatchdog netWatchdog) {
this.req = req;
this.executor = executor;
this.netWatchdog = netWatchdog;
}

@Override
public Result get() {
return this.queryResult(this.req);
}

Result queryResult(Request req) {
final var resFuture = this.sendQueryAsyncToResolver(req);
if (this.isPingWhileGettingQueryResponseActive()) {
this.pingWhileGettingQueryResponse(req, resFuture);
}
return ResultMapper.from(resFuture, req);
}

CompletableFuture<Message> sendQueryAsyncToResolver(Request req) {
return req.sendQueryAsyncToResolver(this.executor);
}

boolean isPingWhileGettingQueryResponseActive() {
return Boolean.getBoolean("mg.solverRemote.pingWhileGettingQueryResponse");
}

void pingWhileGettingQueryResponse(Request req, CompletableFuture<Message> resFuture) {
this.netWatchdog.watch(req.getResolverAddr(), resFuture, PING_TIMEOUT_IN_MS);
}

@Override
public String toString() {
return String.format("server=%s", this.req.getResolverAddr());
}

@Override
public IpAddr getRemoteAddress() {
return this.req.getResolverAddr();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.mageddo.dnsproxyserver.solver.remote.application;

import com.mageddo.dnsproxyserver.solver.remote.Result;
import com.mageddo.net.IpAddr;

import java.util.function.Supplier;

public interface ResultSupplier extends Supplier<Result> {
IpAddr getRemoteAddress();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
import com.mageddo.dnsproxyserver.solver.remote.CircuitStatus;
import com.mageddo.dnsproxyserver.solver.remote.Result;
import com.mageddo.dnsproxyserver.solver.remote.application.FailsafeCircuitBreakerFactory;
import com.mageddo.dnsproxyserver.solver.remote.application.ResultSupplier;
import com.mageddo.dnsproxyserver.solver.remote.circuitbreaker.application.CircuitBreakerDelegate;
import com.mageddo.dnsproxyserver.solver.remote.circuitbreaker.application.CircuitBreakerDelegateNonResilient;
import com.mageddo.dnsproxyserver.solver.remote.circuitbreaker.application.CircuitBreakerDelegateStaticThresholdFailsafe;
import com.mageddo.dnsproxyserver.solver.remote.mapper.ResolverMapper;
import com.mageddo.net.IpAddr;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -21,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

@Slf4j
@Singleton
Expand All @@ -34,17 +36,20 @@ public class CircuitBreakerFactory {
private final FailsafeCircuitBreakerFactory failsafeCircuitBreakerFactory;
private final com.mageddo.dnsproxyserver.solver.remote.circuitbreaker.canaryratethreshold.CircuitBreakerFactory canaryThresholdFactory;

public Result check(InetSocketAddress remoteAddress, Supplier<Result> sup) {
final var circuitBreaker = this.findCircuitBreaker(remoteAddress);
public Result check(ResultSupplier sup) {
final var circuitBreaker = this.findCircuitBreaker(sup.getRemoteAddress());
return circuitBreaker.execute(sup);
}

public CircuitBreakerDelegate findCircuitBreaker(InetSocketAddress address) {
final var strategy = this.findCircuitBreakerHotLoad(address);
return this.circuitBreakerMap.computeIfAbsent(address, addr -> strategy);
public CircuitBreakerDelegate findCircuitBreaker(IpAddr serverAddress) {
final var strategy = this.findCircuitBreakerHotLoad(serverAddress);
return this.circuitBreakerMap.computeIfAbsent(
ResolverMapper.toInetSocketAddress(serverAddress),
addr -> strategy
);
}

CircuitBreakerDelegate findCircuitBreakerHotLoad(InetSocketAddress address) {
CircuitBreakerDelegate findCircuitBreakerHotLoad(IpAddr address) {
final var config = this.findCircuitBreakerConfig();
return switch (config.name()) {
case STATIC_THRESHOLD -> this.buildStaticThresholdFailSafeCircuitBreaker(address, config);
Expand All @@ -54,10 +59,10 @@ CircuitBreakerDelegate findCircuitBreakerHotLoad(InetSocketAddress address) {
}

private CircuitBreakerDelegateStaticThresholdFailsafe buildStaticThresholdFailSafeCircuitBreaker(
InetSocketAddress address, CircuitBreakerStrategyConfig config
IpAddr address, CircuitBreakerStrategyConfig config
) {
return new CircuitBreakerDelegateStaticThresholdFailsafe(this.failsafeCircuitBreakerFactory.build(
address,
ResolverMapper.toInetSocketAddress(address),
(StaticThresholdCircuitBreakerStrategyConfig) config
));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.mageddo.dnsproxyserver.solver.remote.application.failsafe;

import com.mageddo.commons.circuitbreaker.CircuitCheckException;
import com.mageddo.dnsproxyserver.solver.SolverRemote;
import com.mageddo.dnsproxyserver.solver.remote.application.RemoteResultSupplier;
import com.mageddo.dnsproxyserver.solver.remote.circuitbreaker.application.CircuitBreakerDelegate;
import com.mageddo.net.Networks;
import dev.failsafe.CircuitBreakerOpenException;
Expand Down Expand Up @@ -46,6 +46,6 @@ void check(InetSocketAddress server, CircuitBreakerDelegate circuitBreaker) {
* @see https://github.com/mageddo/dns-proxy-server/issues/526#issuecomment-2261421618
*/
boolean ping(InetSocketAddress server) {
return Networks.ping(server, SolverRemote.PING_TIMEOUT_IN_MS);
return Networks.ping(server, RemoteResultSupplier.PING_TIMEOUT_IN_MS);
}
}
Loading
Loading