Skip to content

Commit

Permalink
Issue/none use rr debugger in aurora multi replica consistency (#176)
Browse files Browse the repository at this point in the history
* Use SdkMan

* Add `LazyLogger` to `AuroraMultiReplicaConsistency`
  • Loading branch information
wyrzyk authored Jul 24, 2023
1 parent 902f77a commit 8d39319
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 34 deletions.
3 changes: 3 additions & 0 deletions .sdkmanrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Enable auto-env through the sdkman_auto_env config
# Add key=value pairs of SDKs to use below
java=11.0.15-tem
21 changes: 20 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,27 @@ Dropping a requirement of a major version of a dependency is a new contract.


## [Unreleased]
[Unreleased]: https://github.com/atlassian-labs/db-replica/compare/release-2.9.0...master
[Unreleased]: https://github.com/atlassian-labs/db-replica/compare/release-2.9.4...master

### Added
- `LazyLogger` to `AuroraMultiReplicaConsistency`

## [2.9.6] - 2023-07-21
[2.9.4]: https://github.com/atlassian-labs/db-replica/compare/release-2.9.2...release-2.9.4

### Fixed
- Add additional logging to `AuroraMultiReplicaConsistency`

## [2.9.4] - 2023-06-28
[2.9.4]: https://github.com/atlassian-labs/db-replica/compare/release-2.9.2...release-2.9.4

### Fixed
- Default `getClientInfo()` method in `DualConnection` to use read connection

## [2.9.2] - 2023-06-26
[2.9.2]: https://github.com/atlassian-labs/db-replica/compare/release-2.9.0...release-2.9.2

### Fixed
- Default `getMetaData()` method in `DualConnection` to use read connection

## [2.9.0] - 2022-10-10
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.atlassian.db.replica.api;

import com.atlassian.db.replica.api.exception.ConnectionCouldNotBeClosedException;
import com.atlassian.db.replica.api.exception.ReadReplicaConnectionCreationException;
import com.atlassian.db.replica.internal.LazyReference;
import com.atlassian.db.replica.internal.NoCacheSuppliedCache;
import com.atlassian.db.replica.internal.NotLoggingLogger;
import com.atlassian.db.replica.internal.aurora.AuroraClusterDiscovery;
import com.atlassian.db.replica.api.exception.ReadReplicaConnectionCreationException;
import com.atlassian.db.replica.internal.logs.LazyLogger;
import com.atlassian.db.replica.internal.logs.NoopLazyLogger;
import com.atlassian.db.replica.spi.Logger;
import com.atlassian.db.replica.spi.ReplicaConnectionPerUrlProvider;
import com.atlassian.db.replica.spi.ReplicaConsistency;
Expand All @@ -16,25 +18,29 @@
import java.util.Collection;
import java.util.function.Supplier;

import static java.lang.String.format;

public final class AuroraMultiReplicaConsistency implements ReplicaConsistency {
private final Logger logger;
private final ReplicaConsistency replicaConsistency;
private final AuroraClusterDiscovery cluster;
private final LazyLogger lazyLogger;

private AuroraMultiReplicaConsistency(
Logger logger,
ReplicaConsistency replicaConsistency,
ReplicaConnectionPerUrlProvider replicaConnectionPerUrlProvider,
SuppliedCache<Collection<Database>> discoveredReplicasCache,
String clusterUri
String clusterUri,
LazyLogger lazyLogger
) {
this.logger = logger;
this.replicaConsistency = replicaConsistency;
this.lazyLogger = lazyLogger;
this.cluster = AuroraClusterDiscovery.builder()
.replicaConnectionPerUrlProvider(replicaConnectionPerUrlProvider)
.discoveredReplicasCache(discoveredReplicasCache)
.clusterUri(clusterUri)
.logger(logger)
.build();
}

Expand All @@ -49,21 +55,28 @@ public void write(Connection main) {

@Override
public boolean isConsistent(Supplier<Connection> replicaSupplier) {
Collection<Database> replicas = cluster.getReplicas(replicaSupplier);
final Collection<Database> replicas = cluster.getReplicas(replicaSupplier);
logger.info("Checking consistency for " + replicas.size() + " replicas.");

return replicas.stream()
lazyLogger.debug(() -> format("Checking consistency for %d replicas.", replicas.size()));
return replicas
.stream()
.allMatch(replica -> {
logger.info("Checking consistency for replica:" + replica.getId());
try (LazyConnectionSupplier connectionSupplier = new LazyConnectionSupplier(replica.getConnectionSupplier())) {
return replicaConsistency.isConsistent(connectionSupplier);
final boolean isConsistent = replicaConsistency.isConsistent(connectionSupplier);
lazyLogger.debug(() -> format(
"AuroraMultiReplicaConsistency#isConsistent (isConsistent=%s, replica=%s)",
isConsistent,
replica.getId()
));
return isConsistent;
} catch (ReadReplicaConnectionCreationException exception) {
logger.warn(
"ReadReplicaConnectionCreationException occurred during consistency checking. It is likely that replica is the process of scaling, replica id: " + replica.getId(),
exception
);
final String message = "ReadReplicaConnectionCreationException occurred during consistency checking. It is likely that replica is the process of scaling, replica id: " + replica.getId();
logger.warn(message, exception);
lazyLogger.warn(() -> "AuroraMultiReplicaConsistency#isConsistent " + message);
return true;
} catch (SQLException exception) {
lazyLogger.warn(() -> "AuroraMultiReplicaConsistency#isConsistent (ConnectionCouldNotBeClosedException)");
throw new ConnectionCouldNotBeClosedException(exception);
}
});
Expand Down Expand Up @@ -112,6 +125,7 @@ public static final class Builder {
private ReplicaConnectionPerUrlProvider replicaConnectionPerUrlProvider;
private SuppliedCache<Collection<Database>> discoveredReplicasCache = new NoCacheSuppliedCache<>();
private Logger logger = new NotLoggingLogger();
private LazyLogger lazyLogger = new NoopLazyLogger();
private String clusterUri;

public Builder replicaConsistency(ReplicaConsistency replicaConsistency) {
Expand All @@ -124,11 +138,22 @@ public Builder replicaConnectionPerUrlProvider(ReplicaConnectionPerUrlProvider r
return this;
}

/**
* @param logger
* @return
* @deprecated use {@link Builder#logger(LazyLogger)}
*/
@Deprecated
public Builder logger(Logger logger) {
this.logger = logger;
return this;
}

public Builder logger(LazyLogger lazyLogger) {
this.lazyLogger = lazyLogger;
return this;
}

public Builder clusterUri(String clusterUri) {
this.clusterUri = clusterUri;
return this;
Expand Down Expand Up @@ -177,7 +202,8 @@ public AuroraMultiReplicaConsistency build() {
replicaConsistency,
replicaConnectionPerUrlProvider,
discoveredReplicasCache,
clusterUri
clusterUri,
lazyLogger
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.atlassian.db.replica.api.Database;
import com.atlassian.db.replica.api.jdbc.JdbcUrl;
import com.atlassian.db.replica.internal.NoCacheSuppliedCache;
import com.atlassian.db.replica.internal.logs.LazyLogger;
import com.atlassian.db.replica.internal.logs.NoopLazyLogger;
import com.atlassian.db.replica.spi.Logger;
import com.atlassian.db.replica.spi.ReplicaConnectionPerUrlProvider;
import com.atlassian.db.replica.spi.SuppliedCache;
Expand All @@ -14,44 +16,47 @@
import java.util.Collections;
import java.util.function.Supplier;

import static java.lang.String.format;
import static java.util.stream.Collectors.toList;

public final class AuroraClusterDiscovery {
private final ReplicaConnectionPerUrlProvider replicaConnectionPerUrlProvider;
private final SuppliedCache<Collection<Database>> discoveredReplicasCache;
private final String clusterUri;
private final Logger logger;
private final LazyLogger lazyLogger;

private AuroraClusterDiscovery(
ReplicaConnectionPerUrlProvider replicaConnectionPerUrlProvider,
SuppliedCache<Collection<Database>> discoveredReplicasCache,
String clusterUri,
Logger logger
Logger logger,
LazyLogger lazyLogger
) {
this.replicaConnectionPerUrlProvider = replicaConnectionPerUrlProvider;
this.discoveredReplicasCache = discoveredReplicasCache;
this.clusterUri = clusterUri;
this.logger = logger;
this.lazyLogger = lazyLogger;
}

public Collection<Database> getReplicas(Supplier<Connection> connectionSupplier) {
return discoveredReplicasCache.get(() -> fetchReplicas(connectionSupplier))
.orElse(Collections.emptyList());
return discoveredReplicasCache.get(() -> fetchReplicas(connectionSupplier)).orElseGet(() -> {
lazyLogger.debug(() -> "AuroraClusterDiscovery#getReplicas no replicas cached.");
return Collections.emptyList();
});
}

private Collection<Database> fetchReplicas(Supplier<Connection> connectionSupplier) {
try {
final Connection connection = connectionSupplier.get();
final AuroraReplicasDiscoverer discoverer = createDiscoverer(connection);
return discoverer.fetchReplicasUrls(connection).stream()
.map(auroraUrl -> {
JdbcUrl url = auroraUrl.toJdbcUrl();
return new AuroraReplicaNode(
auroraUrl.getEndpoint().getServerId(),
replicaConnectionPerUrlProvider.getReplicaConnectionProvider(url)
);
})
.collect(toList());
return discoverer.fetchReplicasUrls(connection).stream().map(auroraUrl -> {
JdbcUrl url = auroraUrl.toJdbcUrl();
return new AuroraReplicaNode(auroraUrl.getEndpoint().getServerId(),
replicaConnectionPerUrlProvider.getReplicaConnectionProvider(url)
);
}).collect(toList());
} catch (SQLException exception) {
throw new ReadReplicaDiscoveryOperationException(exception);
}
Expand All @@ -68,25 +73,31 @@ private AuroraReplicasDiscoverer createDiscoverer(Connection connection) {
private AuroraReplicasDiscoverer createDiscovererFromConnection(Connection connection) {
try {
final String databaseUrl = connection.getMetaData().getURL();
lazyLogger.debug(() -> format("AuroraClusterDiscovery#createDiscovererFromConnection (databaseUrl=%s)",
databaseUrl
));
final String[] split = databaseUrl.split("/");
final String readerEndpoint = split[2];
final String databaseName = split[3];
return new AuroraReplicasDiscoverer(
new AuroraJdbcUrl(AuroraEndpoint.parse(readerEndpoint), databaseName),
logger
return new AuroraReplicasDiscoverer(new AuroraJdbcUrl(AuroraEndpoint.parse(readerEndpoint), databaseName),
logger,
lazyLogger
);
} catch (SQLException exception) {
throw new ReadReplicaDiscovererCreationException(exception);
}
}

private AuroraReplicasDiscoverer createDiscovererFromClusterUri() {
lazyLogger.debug(() -> format("AuroraClusterDiscovery#createDiscovererFromClusterUri (clusterUri=%s)",
clusterUri
));
final String[] split = clusterUri.split("/");
final String readerEndpoint = split[2];
final String databaseName = split[3];
return new AuroraReplicasDiscoverer(
new AuroraJdbcUrl(AuroraEndpoint.parse(readerEndpoint), databaseName),
logger
return new AuroraReplicasDiscoverer(new AuroraJdbcUrl(AuroraEndpoint.parse(readerEndpoint), databaseName),
logger,
lazyLogger
);
}

Expand All @@ -99,6 +110,7 @@ public static final class Builder {
private SuppliedCache<Collection<Database>> discoveredReplicasCache = new NoCacheSuppliedCache<>();
private String clusterUri;
private Logger logger;
private LazyLogger lazyLogger = new NoopLazyLogger();

public Builder replicaConnectionPerUrlProvider(ReplicaConnectionPerUrlProvider replicaConnectionPerUrlProvider) {
this.replicaConnectionPerUrlProvider = replicaConnectionPerUrlProvider;
Expand All @@ -115,11 +127,22 @@ public Builder clusterUri(String clusterUri) {
return this;
}

/**
* @param logger
* @return
* @deprecated use {@link Builder#logger(LazyLogger)}
*/
@Deprecated
public Builder logger(Logger logger) {
this.logger = logger;
return this;
}

public Builder logger(LazyLogger lazyLogger) {
this.lazyLogger = lazyLogger;
return this;
}

/**
* @deprecated use
* {@link Builder#replicaConnectionPerUrlProvider(ReplicaConnectionPerUrlProvider)}{@code .}
Expand All @@ -132,8 +155,11 @@ public AuroraClusterDiscovery build(AuroraConnectionDetails auroraConnectionDeta
}

public AuroraClusterDiscovery build() {
return new AuroraClusterDiscovery(replicaConnectionPerUrlProvider, discoveredReplicasCache, clusterUri,
logger
return new AuroraClusterDiscovery(replicaConnectionPerUrlProvider,
discoveredReplicasCache,
clusterUri,
logger,
lazyLogger
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.atlassian.db.replica.internal.aurora;

import com.atlassian.db.replica.internal.logs.LazyLogger;
import com.atlassian.db.replica.spi.Logger;

import java.sql.Connection;
Expand All @@ -18,10 +19,12 @@ public final class AuroraReplicasDiscoverer {
private final AuroraJdbcUrl readerUrl;

private final Logger logger;
private final LazyLogger lazyLogger;

public AuroraReplicasDiscoverer(AuroraJdbcUrl readerUrl, Logger logger) {
public AuroraReplicasDiscoverer(AuroraJdbcUrl readerUrl, Logger logger, LazyLogger lazyLogger) {
this.readerUrl = readerUrl;
this.logger = logger;
this.lazyLogger = lazyLogger;
}

/**
Expand Down Expand Up @@ -65,6 +68,15 @@ private List<String> fetchReplicasServerIds(Connection connection) throws SQLExc
feedbackXmin,
stateLag
));
lazyLogger.debug(() -> String.format(
"server_id=%s, replica_lag_in_ms=%d, durable_lsn=%d, current_read_lsn=%d, feedback_xmin=%d, state_lag=%d",
serverId,
replicaLagInMs,
durableLsn,
currentReadLsn,
feedbackXmin,
stateLag
));
ids.add(serverId);
}
}
Expand Down

0 comments on commit 8d39319

Please sign in to comment.