Skip to content

Commit

Permalink
JAVA-3098: Improve ability to trace active connections across Control…
Browse files Browse the repository at this point in the history
…Connection, BasicLoadBalancingPolicy, DefaultTopologyMonitor and NodeRefresh classes
  • Loading branch information
hhughes committed Jul 25, 2023
1 parent ec93ef9 commit f3dd1c4
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Queue;
import java.util.WeakHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -353,6 +355,14 @@ private void connect(
Runnable onSuccess,
Consumer<Throwable> onFailure) {
assert adminExecutor.inEventLoop();

if (LOG.isTraceEnabled()) {
LOG.trace(
"[{}] Control connection candidate nodes: [{}]",
logPrefix,
nodes.stream().map(Objects::toString).collect(Collectors.joining(", ")));
}

Node node = nodes.poll();
if (node == null) {
onFailure.accept(AllNodesFailedException.fromErrors(errors));
Expand All @@ -368,11 +378,17 @@ private void connect(
NodeStateEvent lastStateEvent = lastStateEvents.get(node);
if (error != null) {
if (closeWasCalled || initFuture.isCancelled()) {
LOG.trace(
"[{}] Error connecting to {} after close called", logPrefix, node);
onSuccess.run(); // abort, we don't really care about the result
} else {
if (error instanceof AuthenticationException) {
Loggers.warnWithException(
LOG, "[{}] Authentication error", logPrefix, error);
LOG,
"[{}] Authentication error connecting to {}",
logPrefix,
node,
error);
} else {
if (config
.getDefaultProfile()
Expand All @@ -399,39 +415,44 @@ private void connect(
}
} else if (closeWasCalled || initFuture.isCancelled()) {
LOG.debug(
"[{}] New channel opened ({}) but the control connection was closed, closing it",
"[{}] New channel opened ({}) to {} but the control connection was closed, closing it",
logPrefix,
channel);
channel,
node);
channel.forceClose();
onSuccess.run();
} else if (lastDistanceEvent != null
&& lastDistanceEvent.distance == NodeDistance.IGNORED) {
LOG.debug(
"[{}] New channel opened ({}) but node became ignored, "
"[{}] New channel opened ({}) to {} but node became ignored, "
+ "closing and trying next node",
logPrefix,
channel);
channel,
node);
channel.forceClose();
connect(nodes, errors, onSuccess, onFailure);
} else if (lastStateEvent != null
&& (lastStateEvent.newState == null /*(removed)*/
|| lastStateEvent.newState == NodeState.FORCED_DOWN)) {
LOG.debug(
"[{}] New channel opened ({}) but node was removed or forced down, "
"[{}] New channel opened ({}) to {} but node was removed or forced down, "
+ "closing and trying next node",
logPrefix,
channel);
channel,
node);
channel.forceClose();
connect(nodes, errors, onSuccess, onFailure);
} else {
LOG.debug("[{}] New channel opened {}", logPrefix, channel);
LOG.debug("[{}] New channel opened {} to {}", logPrefix, channel, node);
DriverChannel previousChannel = ControlConnection.this.channel;
ControlConnection.this.channel = channel;
if (previousChannel != null) {
// We were reconnecting: make sure previous channel gets closed (it may
// still be open if reconnection was forced)
LOG.debug(
"[{}] Forcefully closing previous channel {}", logPrefix, channel);
"[{}] Forcefully closing previous channel {}",
logPrefix,
previousChannel);
previousChannel.forceClose();
}
context.getEventBus().fire(ChannelEvent.channelOpened(node));
Expand Down Expand Up @@ -538,9 +559,10 @@ private void onDistanceEvent(DistanceEvent event) {
&& !channel.closeFuture().isDone()
&& event.node.getEndPoint().equals(channel.getEndPoint())) {
LOG.debug(
"[{}] Control node {} became IGNORED, reconnecting to a different node",
"[{}] Control node {} with channel {} became IGNORED, reconnecting to a different node",
logPrefix,
event.node);
event.node,
channel);
reconnectNow();
}
}
Expand All @@ -553,9 +575,10 @@ private void onStateEvent(NodeStateEvent event) {
&& !channel.closeFuture().isDone()
&& event.node.getEndPoint().equals(channel.getEndPoint())) {
LOG.debug(
"[{}] Control node {} was removed or forced down, reconnecting to a different node",
"[{}] Control node {} with channel {} was removed or forced down, reconnecting to a different node",
logPrefix,
event.node);
event.node,
channel);
reconnectNow();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntUnaryOperator;
import java.util.stream.Collectors;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -168,8 +169,12 @@ public void init(@NonNull Map<UUID, Node> nodes, @NonNull DistanceReporter dista
// This includes state == UNKNOWN. If the node turns out to be unreachable, this will be
// detected when we try to open a pool to it, it will get marked down and this will be
// signaled back to this policy, which will then remove it from the live set.
LOG.debug("[{}] {} added to initial live set", logPrefix, node);
liveNodes.add(node);
}
if (LOG.isTraceEnabled()) {
logLiveNodesByDc();
}
}
}

Expand Down Expand Up @@ -364,20 +369,29 @@ public void onUp(@NonNull Node node) {
}
if (distance != NodeDistance.IGNORED && liveNodes.add(node)) {
LOG.debug("[{}] {} came back UP, added to live set", logPrefix, node);
if (LOG.isTraceEnabled()) {
logLiveNodesByDc();
}
}
}

@Override
public void onDown(@NonNull Node node) {
if (liveNodes.remove(node)) {
LOG.debug("[{}] {} went DOWN, removed from live set", logPrefix, node);
if (LOG.isTraceEnabled()) {
logLiveNodesByDc();
}
}
}

@Override
public void onRemove(@NonNull Node node) {
if (liveNodes.remove(node)) {
LOG.debug("[{}] {} was removed, removed from live set", logPrefix, node);
if (LOG.isTraceEnabled()) {
logLiveNodesByDc();
}
}
}

Expand Down Expand Up @@ -422,4 +436,25 @@ protected NodeDistance computeNodeDistance(@NonNull Node node) {
public void close() {
// nothing to do
}

// logs current list of
private void logLiveNodesByDc() {
// check trace level enabled just in case
if (!LOG.isTraceEnabled()) {
return;
}

LOG.trace(
"[{}] Current live nodes by dc: {{}}",
logPrefix,
liveNodes.dcs().stream()
.map(
dc ->
dc
+ ": "
+ liveNodes.dc(dc).stream()
.map(Objects::toString)
.collect(Collectors.joining(", ")))
.collect(Collectors.joining(", ")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
import java.util.Map;
import java.util.UUID;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class AddNodeRefresh extends NodesRefresh {

private static final Logger LOG = LoggerFactory.getLogger(AddNodeRefresh.class);

@VisibleForTesting final NodeInfo newNodeInfo;

AddNodeRefresh(NodeInfo newNodeInfo) {
Expand All @@ -36,6 +40,8 @@ public class AddNodeRefresh extends NodesRefresh {
@Override
public Result compute(
DefaultMetadata oldMetadata, boolean tokenMapEnabled, InternalDriverContext context) {
String logPrefix = context.getSessionName();

Map<UUID, Node> oldNodes = oldMetadata.getNodes();
Node existing = oldNodes.get(newNodeInfo.getHostId());
if (existing == null) {
Expand All @@ -46,6 +52,15 @@ public Result compute(
.putAll(oldNodes)
.put(newNode.getHostId(), newNode)
.build();

if (LOG.isDebugEnabled()) {
LOG.debug(
"[{}] Adding new node {} with system-table address {}",
logPrefix,
newNode,
newNodeInfo.getBroadcastRpcAddress().orElse(null));
}

return new Result(
oldMetadata.withNodes(newNodes, tokenMapEnabled, false, null, context),
ImmutableList.of(NodeStateEvent.added(newNode)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ public NodeMetricUpdater getMetricUpdater() {
public String toString() {
// Include the hash code because this class uses reference equality
return String.format(
"Node(endPoint=%s, hostId=%s, hashCode=%x)", getEndPoint(), getHostId(), hashCode());
"Node(endPoint=%s, hostId=%s, hashCode=%x, dc=%s)",
getEndPoint(), getHostId(), hashCode(), getDatacenter());
}

/** Note: deliberately not exposed by the public interface. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -215,10 +216,67 @@ public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {
}
}
}

if (LOG.isTraceEnabled()) {
logNodeInfos(nodeInfos);
}

return nodeInfos;
});
}

private String nodeInfoToString(NodeInfo nodeInfo) {
List<String> nodeInfoFields = new ArrayList<>();
nodeInfoFields.add("endpoint=" + nodeInfo.getEndPoint());
nodeInfoFields.add("broadcastRpcAddress=" + nodeInfo.getBroadcastRpcAddress().orElse(null));
nodeInfoFields.add("broadcastAddress=" + nodeInfo.getBroadcastAddress().orElse(null));
nodeInfoFields.add("listenAddress=" + nodeInfo.getListenAddress().orElse(null));
nodeInfoFields.add("datacenter=" + nodeInfo.getDatacenter());
nodeInfoFields.add("rack=" + nodeInfo.getRack());
nodeInfoFields.add("cassandraVersion=" + nodeInfo.getCassandraVersion());
nodeInfoFields.add("partitioner=" + nodeInfo.getPartitioner());
nodeInfoFields.add(
"tokens="
+ Optional.ofNullable(nodeInfo.getTokens())
.map(tokens -> "[" + String.join(",", tokens) + "]")
.orElse(null));
nodeInfoFields.add(
"extras="
+ Optional.ofNullable(nodeInfo.getExtras())
.map(
extras ->
"{"
+ extras.entrySet().stream()
.map(entry -> entry.getKey() + "=" + entry.getValue())
.collect(Collectors.joining(","))
+ "}")
.orElse(null));
nodeInfoFields.add("hostId=" + nodeInfo.getHostId());
nodeInfoFields.add("schemaVersion=" + nodeInfo.getSchemaVersion());
return "{" + String.join(", ", nodeInfoFields) + "}";
}

private void logSingleNodeInfo(NodeInfo nodeInfo) {
// check trace level enabled just in case
if (!LOG.isTraceEnabled()) {
return;
}

LOG.trace("[{}] System-table node entry: {}", logPrefix, nodeInfoToString(nodeInfo));
}

private void logNodeInfos(List<NodeInfo> nodeInfos) {
// check trace level enabled just in case
if (!LOG.isTraceEnabled()) {
return;
}

LOG.trace(
"[{}] Full system-table node list: [{}]",
logPrefix,
nodeInfos.stream().map(this::nodeInfoToString).collect(Collectors.joining(", ")));
}

@Override
public CompletionStage<Boolean> checkSchemaAgreement() {
if (closeFuture.isDone()) {
Expand Down Expand Up @@ -276,8 +334,16 @@ private Optional<NodeInfo> firstPeerRowAsNodeInfo(AdminResult result, EndPoint l
if (isPeerValid(row)) {
return Optional.ofNullable(getBroadcastRpcAddress(row, localEndPoint))
.map(
broadcastRpcAddress ->
nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build());
broadcastRpcAddress -> {
DefaultNodeInfo nodeInfo =
nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build();

if (LOG.isTraceEnabled()) {
logSingleNodeInfo(nodeInfo);
}

return nodeInfo;
});
}
}
return Optional.empty();
Expand Down Expand Up @@ -440,7 +506,11 @@ private Optional<NodeInfo> findInPeers(
if (broadcastRpcAddress != null
&& broadcastRpcAddress.equals(broadcastRpcAddressToFind)
&& isPeerValid(row)) {
return Optional.of(nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build());
DefaultNodeInfo nodeInfo = nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build();
if (LOG.isTraceEnabled()) {
logSingleNodeInfo(nodeInfo);
}
return Optional.of(nodeInfo);
}
}
LOG.debug("[{}] Could not find any peer row matching {}", logPrefix, broadcastRpcAddressToFind);
Expand Down
Loading

0 comments on commit f3dd1c4

Please sign in to comment.