From f3dd1c4db3efd56c95bac66caa6684538e9848b5 Mon Sep 17 00:00:00 2001 From: Henry Hughes Date: Fri, 21 Jul 2023 12:25:00 -0700 Subject: [PATCH] JAVA-3098: Improve ability to trace active connections across ControlConnection, BasicLoadBalancingPolicy, DefaultTopologyMonitor and NodeRefresh classes --- .../core/control/ControlConnection.java | 49 ++++++++---- .../BasicLoadBalancingPolicy.java | 35 +++++++++ .../core/metadata/AddNodeRefresh.java | 15 ++++ .../internal/core/metadata/DefaultNode.java | 3 +- .../core/metadata/DefaultTopologyMonitor.java | 76 ++++++++++++++++++- .../core/metadata/FullNodeListRefresh.java | 20 ++++- .../core/metadata/InitialNodeListRefresh.java | 12 ++- 7 files changed, 190 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java index 3fcfd120086..f72ffea73e3 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java @@ -54,11 +54,13 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Queue; import java.util.WeakHashMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Consumer; +import java.util.stream.Collectors; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -353,6 +355,14 @@ private void connect( Runnable onSuccess, Consumer onFailure) { assert adminExecutor.inEventLoop(); + + if (LOG.isTraceEnabled()) { + LOG.trace( + "[{}] Control connection candidate nodes: [{}]", + logPrefix, + nodes.stream().map(Objects::toString).collect(Collectors.joining(", "))); + } + Node node = nodes.poll(); if (node == null) { onFailure.accept(AllNodesFailedException.fromErrors(errors)); @@ -368,11 +378,17 @@ private void connect( NodeStateEvent lastStateEvent = lastStateEvents.get(node); if (error != null) { if (closeWasCalled || initFuture.isCancelled()) { + LOG.trace( + "[{}] Error connecting to {} after close called", logPrefix, node); onSuccess.run(); // abort, we don't really care about the result } else { if (error instanceof AuthenticationException) { Loggers.warnWithException( - LOG, "[{}] Authentication error", logPrefix, error); + LOG, + "[{}] Authentication error connecting to {}", + logPrefix, + node, + error); } else { if (config .getDefaultProfile() @@ -399,39 +415,44 @@ private void connect( } } else if (closeWasCalled || initFuture.isCancelled()) { LOG.debug( - "[{}] New channel opened ({}) but the control connection was closed, closing it", + "[{}] New channel opened ({}) to {} but the control connection was closed, closing it", logPrefix, - channel); + channel, + node); channel.forceClose(); onSuccess.run(); } else if (lastDistanceEvent != null && lastDistanceEvent.distance == NodeDistance.IGNORED) { LOG.debug( - "[{}] New channel opened ({}) but node became ignored, " + "[{}] New channel opened ({}) to {} but node became ignored, " + "closing and trying next node", logPrefix, - channel); + channel, + node); channel.forceClose(); connect(nodes, errors, onSuccess, onFailure); } else if (lastStateEvent != null && (lastStateEvent.newState == null /*(removed)*/ || lastStateEvent.newState == NodeState.FORCED_DOWN)) { LOG.debug( - "[{}] New channel opened ({}) but node was removed or forced down, " + "[{}] New channel opened ({}) to {} but node was removed or forced down, " + "closing and trying next node", logPrefix, - channel); + channel, + node); channel.forceClose(); connect(nodes, errors, onSuccess, onFailure); } else { - LOG.debug("[{}] New channel opened {}", logPrefix, channel); + LOG.debug("[{}] New channel opened {} to {}", logPrefix, channel, node); DriverChannel previousChannel = ControlConnection.this.channel; ControlConnection.this.channel = channel; if (previousChannel != null) { // We were reconnecting: make sure previous channel gets closed (it may // still be open if reconnection was forced) LOG.debug( - "[{}] Forcefully closing previous channel {}", logPrefix, channel); + "[{}] Forcefully closing previous channel {}", + logPrefix, + previousChannel); previousChannel.forceClose(); } context.getEventBus().fire(ChannelEvent.channelOpened(node)); @@ -538,9 +559,10 @@ private void onDistanceEvent(DistanceEvent event) { && !channel.closeFuture().isDone() && event.node.getEndPoint().equals(channel.getEndPoint())) { LOG.debug( - "[{}] Control node {} became IGNORED, reconnecting to a different node", + "[{}] Control node {} with channel {} became IGNORED, reconnecting to a different node", logPrefix, - event.node); + event.node, + channel); reconnectNow(); } } @@ -553,9 +575,10 @@ private void onStateEvent(NodeStateEvent event) { && !channel.closeFuture().isDone() && event.node.getEndPoint().equals(channel.getEndPoint())) { LOG.debug( - "[{}] Control node {} was removed or forced down, reconnecting to a different node", + "[{}] Control node {} with channel {} was removed or forced down, reconnecting to a different node", logPrefix, - event.node); + event.node, + channel); reconnectNow(); } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index 9e8184879ea..97a1212b075 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -55,6 +55,7 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntUnaryOperator; +import java.util.stream.Collectors; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,8 +169,12 @@ public void init(@NonNull Map nodes, @NonNull DistanceReporter dista // This includes state == UNKNOWN. If the node turns out to be unreachable, this will be // detected when we try to open a pool to it, it will get marked down and this will be // signaled back to this policy, which will then remove it from the live set. + LOG.debug("[{}] {} added to initial live set", logPrefix, node); liveNodes.add(node); } + if (LOG.isTraceEnabled()) { + logLiveNodesByDc(); + } } } @@ -364,6 +369,9 @@ public void onUp(@NonNull Node node) { } if (distance != NodeDistance.IGNORED && liveNodes.add(node)) { LOG.debug("[{}] {} came back UP, added to live set", logPrefix, node); + if (LOG.isTraceEnabled()) { + logLiveNodesByDc(); + } } } @@ -371,6 +379,9 @@ public void onUp(@NonNull Node node) { public void onDown(@NonNull Node node) { if (liveNodes.remove(node)) { LOG.debug("[{}] {} went DOWN, removed from live set", logPrefix, node); + if (LOG.isTraceEnabled()) { + logLiveNodesByDc(); + } } } @@ -378,6 +389,9 @@ public void onDown(@NonNull Node node) { public void onRemove(@NonNull Node node) { if (liveNodes.remove(node)) { LOG.debug("[{}] {} was removed, removed from live set", logPrefix, node); + if (LOG.isTraceEnabled()) { + logLiveNodesByDc(); + } } } @@ -422,4 +436,25 @@ protected NodeDistance computeNodeDistance(@NonNull Node node) { public void close() { // nothing to do } + + // logs current list of + private void logLiveNodesByDc() { + // check trace level enabled just in case + if (!LOG.isTraceEnabled()) { + return; + } + + LOG.trace( + "[{}] Current live nodes by dc: {{}}", + logPrefix, + liveNodes.dcs().stream() + .map( + dc -> + dc + + ": " + + liveNodes.dc(dc).stream() + .map(Objects::toString) + .collect(Collectors.joining(", "))) + .collect(Collectors.joining(", "))); + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefresh.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefresh.java index e8806b7651a..7ad33c03312 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefresh.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/AddNodeRefresh.java @@ -23,10 +23,14 @@ import java.util.Map; import java.util.UUID; import net.jcip.annotations.ThreadSafe; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @ThreadSafe public class AddNodeRefresh extends NodesRefresh { + private static final Logger LOG = LoggerFactory.getLogger(AddNodeRefresh.class); + @VisibleForTesting final NodeInfo newNodeInfo; AddNodeRefresh(NodeInfo newNodeInfo) { @@ -36,6 +40,8 @@ public class AddNodeRefresh extends NodesRefresh { @Override public Result compute( DefaultMetadata oldMetadata, boolean tokenMapEnabled, InternalDriverContext context) { + String logPrefix = context.getSessionName(); + Map oldNodes = oldMetadata.getNodes(); Node existing = oldNodes.get(newNodeInfo.getHostId()); if (existing == null) { @@ -46,6 +52,15 @@ public Result compute( .putAll(oldNodes) .put(newNode.getHostId(), newNode) .build(); + + if (LOG.isDebugEnabled()) { + LOG.debug( + "[{}] Adding new node {} with system-table address {}", + logPrefix, + newNode, + newNodeInfo.getBroadcastRpcAddress().orElse(null)); + } + return new Result( oldMetadata.withNodes(newNodes, tokenMapEnabled, false, null, context), ImmutableList.of(NodeStateEvent.added(newNode))); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNode.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNode.java index 839a4a61231..ef850d9efbb 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNode.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultNode.java @@ -186,7 +186,8 @@ public NodeMetricUpdater getMetricUpdater() { public String toString() { // Include the hash code because this class uses reference equality return String.format( - "Node(endPoint=%s, hostId=%s, hashCode=%x)", getEndPoint(), getHostId(), hashCode()); + "Node(endPoint=%s, hostId=%s, hashCode=%x, dc=%s)", + getEndPoint(), getHostId(), hashCode(), getDatacenter()); } /** Note: deliberately not exposed by the public interface. */ diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java index da5fc2115eb..9dce717e8b0 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/DefaultTopologyMonitor.java @@ -51,6 +51,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -215,10 +216,67 @@ public CompletionStage> refreshNodeList() { } } } + + if (LOG.isTraceEnabled()) { + logNodeInfos(nodeInfos); + } + return nodeInfos; }); } + private String nodeInfoToString(NodeInfo nodeInfo) { + List nodeInfoFields = new ArrayList<>(); + nodeInfoFields.add("endpoint=" + nodeInfo.getEndPoint()); + nodeInfoFields.add("broadcastRpcAddress=" + nodeInfo.getBroadcastRpcAddress().orElse(null)); + nodeInfoFields.add("broadcastAddress=" + nodeInfo.getBroadcastAddress().orElse(null)); + nodeInfoFields.add("listenAddress=" + nodeInfo.getListenAddress().orElse(null)); + nodeInfoFields.add("datacenter=" + nodeInfo.getDatacenter()); + nodeInfoFields.add("rack=" + nodeInfo.getRack()); + nodeInfoFields.add("cassandraVersion=" + nodeInfo.getCassandraVersion()); + nodeInfoFields.add("partitioner=" + nodeInfo.getPartitioner()); + nodeInfoFields.add( + "tokens=" + + Optional.ofNullable(nodeInfo.getTokens()) + .map(tokens -> "[" + String.join(",", tokens) + "]") + .orElse(null)); + nodeInfoFields.add( + "extras=" + + Optional.ofNullable(nodeInfo.getExtras()) + .map( + extras -> + "{" + + extras.entrySet().stream() + .map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.joining(",")) + + "}") + .orElse(null)); + nodeInfoFields.add("hostId=" + nodeInfo.getHostId()); + nodeInfoFields.add("schemaVersion=" + nodeInfo.getSchemaVersion()); + return "{" + String.join(", ", nodeInfoFields) + "}"; + } + + private void logSingleNodeInfo(NodeInfo nodeInfo) { + // check trace level enabled just in case + if (!LOG.isTraceEnabled()) { + return; + } + + LOG.trace("[{}] System-table node entry: {}", logPrefix, nodeInfoToString(nodeInfo)); + } + + private void logNodeInfos(List nodeInfos) { + // check trace level enabled just in case + if (!LOG.isTraceEnabled()) { + return; + } + + LOG.trace( + "[{}] Full system-table node list: [{}]", + logPrefix, + nodeInfos.stream().map(this::nodeInfoToString).collect(Collectors.joining(", "))); + } + @Override public CompletionStage checkSchemaAgreement() { if (closeFuture.isDone()) { @@ -276,8 +334,16 @@ private Optional firstPeerRowAsNodeInfo(AdminResult result, EndPoint l if (isPeerValid(row)) { return Optional.ofNullable(getBroadcastRpcAddress(row, localEndPoint)) .map( - broadcastRpcAddress -> - nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build()); + broadcastRpcAddress -> { + DefaultNodeInfo nodeInfo = + nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build(); + + if (LOG.isTraceEnabled()) { + logSingleNodeInfo(nodeInfo); + } + + return nodeInfo; + }); } } return Optional.empty(); @@ -440,7 +506,11 @@ private Optional findInPeers( if (broadcastRpcAddress != null && broadcastRpcAddress.equals(broadcastRpcAddressToFind) && isPeerValid(row)) { - return Optional.of(nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build()); + DefaultNodeInfo nodeInfo = nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build(); + if (LOG.isTraceEnabled()) { + logSingleNodeInfo(nodeInfo); + } + return Optional.of(nodeInfo); } } LOG.debug("[{}] Could not find any peer row matching {}", logPrefix, broadcastRpcAddressToFind); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/FullNodeListRefresh.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/FullNodeListRefresh.java index 14496bb3399..ab320dc0ff7 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/FullNodeListRefresh.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/FullNodeListRefresh.java @@ -27,8 +27,10 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import net.jcip.annotations.ThreadSafe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,7 +75,11 @@ public Result compute( DefaultNode node = (DefaultNode) oldNodes.get(id); if (node == null) { node = new DefaultNode(nodeInfo.getEndPoint(), context); - LOG.debug("[{}] Adding new node {}", logPrefix, node); + LOG.debug( + "[{}] Adding new node {} with system-table address {}", + logPrefix, + node, + nodeInfo.getBroadcastRpcAddress().orElse(null)); added.put(id, node); } if (tokenFactory == null && nodeInfo.getPartitioner() != null) { @@ -85,6 +91,18 @@ public Result compute( Set removed = Sets.difference(oldNodes.keySet(), seen); + if (!removed.isEmpty()) { + if (LOG.isTraceEnabled()) { + LOG.trace( + "[{}] Removing nodes: [{}]", + logPrefix, + removed.stream() + .map(oldNodes::get) + .map(Objects::toString) + .collect(Collectors.joining(", "))); + } + } + if (added.isEmpty() && removed.isEmpty()) { // The list didn't change if (!oldMetadata.getTokenMap().isPresent() && tokenFactory != null) { // First time we found out what the partitioner is => set the token factory and trigger a diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java index e676d9eb2ee..12b76f45c7b 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metadata/InitialNodeListRefresh.java @@ -75,9 +75,17 @@ public Result compute( DefaultNode node = findIn(contactPoints, endPoint); if (node == null) { node = new DefaultNode(endPoint, context); - LOG.debug("[{}] Adding new node {}", logPrefix, node); + LOG.debug( + "[{}] Adding new node {} with system-table address {}", + logPrefix, + node, + nodeInfo.getBroadcastRpcAddress().orElse(null)); } else { - LOG.debug("[{}] Copying contact point {}", logPrefix, node); + LOG.debug( + "[{}] Copying contact point {} with system-table address {}", + logPrefix, + node, + nodeInfo.getBroadcastRpcAddress().orElse(null)); } if (tokenMapEnabled && tokenFactory == null && nodeInfo.getPartitioner() != null) { tokenFactory = tokenFactoryRegistry.tokenFactoryFor(nodeInfo.getPartitioner());