From 7a43c50184c137a0873b34b4839fe8dfedfb1138 Mon Sep 17 00:00:00 2001 From: adiraj_linkedin Date: Wed, 19 Nov 2025 10:30:31 +0530 Subject: [PATCH 1/4] added the otel changes --- .../linkedin/d2/balancer/D2ClientConfig.java | 137 ++++++++++++++++++ .../linkedin/d2/jmx/D2ClientJmxManager.java | 26 ++++ .../jmx/NoOpXdsClientOtelMetricsProvider.java | 63 ++++++++ .../com/linkedin/d2/jmx/XdsClientJmx.java | 38 ++++- .../d2/jmx/XdsClientOtelMetricsProvider.java | 29 ++++ .../com/linkedin/d2/xds/XdsClientImpl.java | 89 ++++++++++-- .../XdsLoadBalancerWithFacilitiesFactory.java | 1 + .../linkedin/d2/xds/TestXdsClientImpl.java | 2 +- 8 files changed, 371 insertions(+), 14 deletions(-) create mode 100644 d2/src/main/java/com/linkedin/d2/jmx/NoOpXdsClientOtelMetricsProvider.java create mode 100644 d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index d0ffa9e826..44ea58566c 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -44,6 +44,8 @@ import com.linkedin.d2.jmx.JmxManager; import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider; import com.linkedin.d2.jmx.NoOpJmxManager; +import com.linkedin.d2.jmx.XdsClientOtelMetricsProvider; +import com.linkedin.d2.jmx.NoOpXdsClientOtelMetricsProvider; import com.linkedin.r2.transport.common.TransportClientFactory; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import java.time.Duration; @@ -181,6 +183,7 @@ public class D2ClientConfig public boolean subscribeToUriGlobCollection = false; public XdsServerMetricsProvider _xdsServerMetricsProvider = new NoOpXdsServerMetricsProvider(); + public XdsClientOtelMetricsProvider _otelMetricsProvider = new NoOpXdsClientOtelMetricsProvider(); public boolean loadBalanceStreamException = false; public boolean xdsInitialResourceVersionsEnabled = false; public Integer xdsStreamMaxRetryBackoffSeconds = null; @@ -293,6 +296,139 @@ public D2ClientConfig() D2CalleeInfoRecorder d2CalleeInfoRecorder, Boolean enableIndisDownstreamServicesFetcher, Duration indisDownstreamServicesFetchTimeout) + { + this(zkHosts, xdsServer, hostName, zkSessionTimeoutInMs, zkStartupTimeoutInMs, lbWaitTimeout, lbWaitUnit, + flagFile, basePath, fsBasePath, indisFsBasePath, componentFactory, clientFactories, lbWithFacilitiesFactory, + sslContext, grpcSslContext, sslParameters, isSSLEnabled, shutdownAsynchronously, isSymlinkAware, + clientServicesConfig, d2ServicePath, useNewEphemeralStoreWatcher, healthCheckOperations, executorService, + retry, restRetryEnabled, streamRetryEnabled, retryLimit, retryUpdateIntervalMs, retryAggregatedIntervalNum, + warmUp, warmUpTimeoutSeconds, indisWarmUpTimeoutSeconds, warmUpConcurrentRequests, + indisWarmUpConcurrentRequests, downstreamServicesFetcher, indisDownstreamServicesFetcher, + backupRequestsEnabled, backupRequestsStrategyStatsConsumer, + backupRequestsLatencyNotificationInterval, + backupRequestsLatencyNotificationIntervalUnit, + enableBackupRequestsClientAsync, + backupRequestsExecutorService, + emitter, + partitionAccessorRegistry, + zooKeeperDecorator, + enableSaveUriDataOnDisk, + loadBalancerStrategyFactories, + requestTimeoutHandlerEnabled, + sslSessionValidatorFactory, + zkConnection, + startUpExecutorService, + indisStartUpExecutorService, + jmxManager, + d2JmxManagerPrefix, + zookeeperReadWindowMs, + enableRelativeLoadBalancer, + deterministicSubsettingMetadataProvider, + canaryDistributionProvider, + enableClusterFailout, + failoutConfigProviderFactory, + failoutRedirectStrategy, + serviceDiscoveryEventEmitter, + dualReadStateManager, + xdsExecutorService, + xdsStreamReadyTimeout, + dualReadNewLbExecutor, + xdsChannelLoadBalancingPolicy, + xdsChannelLoadBalancingPolicyConfig, + subscribeToUriGlobCollection, + xdsServerMetricsProvider, + new NoOpXdsClientOtelMetricsProvider(), + loadBalanceStreamException, + xdsInitialResourceVersionsEnabled, + disableDetectLiRawD2Client, + isLiRawD2Client, + xdsStreamMaxRetryBackoffSeconds, + xdsChannelKeepAliveTimeMins, + xdsMinimumJavaVersion, + actionOnPrecheckFailure); + } + + D2ClientConfig(String zkHosts, + String xdsServer, + String hostName, + long zkSessionTimeoutInMs, + long zkStartupTimeoutInMs, + long lbWaitTimeout, + TimeUnit lbWaitUnit, + String flagFile, + String basePath, + String fsBasePath, + String indisFsBasePath, + ComponentFactory componentFactory, + Map clientFactories, + LoadBalancerWithFacilitiesFactory lbWithFacilitiesFactory, + SSLContext sslContext, + SslContext grpcSslContext, + SSLParameters sslParameters, + boolean isSSLEnabled, + boolean shutdownAsynchronously, + boolean isSymlinkAware, + Map> clientServicesConfig, + String d2ServicePath, + boolean useNewEphemeralStoreWatcher, + HealthCheckOperations healthCheckOperations, + ScheduledExecutorService executorService, + boolean retry, + boolean restRetryEnabled, + boolean streamRetryEnabled, + int retryLimit, + long retryUpdateIntervalMs, + int retryAggregatedIntervalNum, + boolean warmUp, + int warmUpTimeoutSeconds, + int indisWarmUpTimeoutSeconds, + int warmUpConcurrentRequests, + int indisWarmUpConcurrentRequests, + DownstreamServicesFetcher downstreamServicesFetcher, + DownstreamServicesFetcher indisDownstreamServicesFetcher, + boolean backupRequestsEnabled, + BackupRequestsStrategyStatsConsumer backupRequestsStrategyStatsConsumer, + long backupRequestsLatencyNotificationInterval, + TimeUnit backupRequestsLatencyNotificationIntervalUnit, + boolean enableBackupRequestsClientAsync, + ScheduledExecutorService backupRequestsExecutorService, + EventEmitter emitter, + PartitionAccessorRegistry partitionAccessorRegistry, + Function zooKeeperDecorator, + boolean enableSaveUriDataOnDisk, + Map> loadBalancerStrategyFactories, + boolean requestTimeoutHandlerEnabled, + SslSessionValidatorFactory sslSessionValidatorFactory, + ZKPersistentConnection zkConnection, + ScheduledExecutorService startUpExecutorService, + ScheduledExecutorService indisStartUpExecutorService, + JmxManager jmxManager, + String d2JmxManagerPrefix, + int zookeeperReadWindowMs, + boolean enableRelativeLoadBalancer, + DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider, + CanaryDistributionProvider canaryDistributionProvider, + boolean enableClusterFailout, + FailoutConfigProviderFactory failoutConfigProviderFactory, + FailoutRedirectStrategy failoutRedirectStrategy, + ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter, + DualReadStateManager dualReadStateManager, + ScheduledExecutorService xdsExecutorService, + Long xdsStreamReadyTimeout, + ExecutorService dualReadNewLbExecutor, + String xdsChannelLoadBalancingPolicy, + Map xdsChannelLoadBalancingPolicyConfig, + boolean subscribeToUriGlobCollection, + XdsServerMetricsProvider xdsServerMetricsProvider, + XdsClientOtelMetricsProvider otelMetricsProvider, + boolean loadBalanceStreamException, + boolean xdsInitialResourceVersionsEnabled, + boolean disableDetectLiRawD2Client, + boolean isLiRawD2Client, + Integer xdsStreamMaxRetryBackoffSeconds, + Long xdsChannelKeepAliveTimeMins, + String xdsMinimumJavaVersion, + XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure) { this.zkHosts = zkHosts; this.xdsServer = xdsServer; @@ -367,6 +503,7 @@ public D2ClientConfig() this.xdsChannelKeepAliveTimeMins = xdsChannelKeepAliveTimeMins; this.subscribeToUriGlobCollection = subscribeToUriGlobCollection; this._xdsServerMetricsProvider = xdsServerMetricsProvider; + this._otelMetricsProvider = otelMetricsProvider; this.loadBalanceStreamException = loadBalanceStreamException; this.xdsInitialResourceVersionsEnabled = xdsInitialResourceVersionsEnabled; this.disableDetectLiRawD2Client = disableDetectLiRawD2Client; diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java index 7795d8c80f..90710230cb 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java @@ -306,9 +306,35 @@ public void registerXdsClientJmx(XdsClientJmx xdsClientJmx) _log.warn("Setting XdsClientJmx for Non-XDS source type: {}", _discoverySourceType); } final String jmxName = String.format("%s-XdsClientJmx", getGlobalPrefix(null)); + + // Extract the actual client name from the JMX name pattern + String clientName = extractClientNameFromGlobalPrefix(getGlobalPrefix(null)); + xdsClientJmx.setClientName(clientName); + _jmxManager.registerXdsClientJmxBean(jmxName, xdsClientJmx); } + /** + * Extracts the client name from the JMX name. + * @param jmxName The full JMX name ending with "-XdsClientJmx" + * @return The extracted client name + */ + private String extractClientNameFromGlobalPrefix(String prefix) + { + if(prefix == null){ + return "-"; + } + int lastDash = prefix.lastIndexOf('-'); + if (lastDash == -1 || lastDash == prefix.length() - 1) { + return "-"; + } + String clientName = prefix.substring(lastDash + 1); + if (!clientName.endsWith("Client")) { + return "-"; + } + return clientName; + } + private void doRegisterLoadBalancer(SimpleLoadBalancer balancer, @Nullable DualReadModeProvider.DualReadMode mode) { final String jmxName = String.format("%s-LoadBalancer", getGlobalPrefix(mode)); diff --git a/d2/src/main/java/com/linkedin/d2/jmx/NoOpXdsClientOtelMetricsProvider.java b/d2/src/main/java/com/linkedin/d2/jmx/NoOpXdsClientOtelMetricsProvider.java new file mode 100644 index 0000000000..bc5933dc28 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/NoOpXdsClientOtelMetricsProvider.java @@ -0,0 +1,63 @@ +package com.linkedin.d2.jmx; + +/** + * No-Op implementation of XdsClientOtelMetricsProvider. + * Used when OpenTelemetry metrics are disabled. + */ +public class NoOpXdsClientOtelMetricsProvider implements XdsClientOtelMetricsProvider { + + @Override + public void recordConnectionLost(String clientName) { + // No-op + } + + @Override + public void recordConnectionClosed(String clientName) { + // No-op + } + + @Override + public void recordReconnection(String clientName) { + // No-op + } + + @Override + public void recordRequestSent(String clientName) { + // No-op + } + + @Override + public void recordResponseReceived(String clientName) { + // No-op + } + + @Override + public void recordInitialResourceVersionSent(String clientName, int count) { + // No-op + } + + @Override + public void recordResourceNotFound(String clientName) { + // No-op + } + + @Override + public void recordResourceInvalid(String clientName) { + // No-op + } + + @Override + public void recordServerLatency(String clientName, long latencyMs) { + // No-op + } + + @Override + public void updateConnectionState(String clientName, boolean isConnected) { + // No-op + } + + @Override + public void updateActiveInitialWaitTime(String clientName, long waitTimeMs) { + // No-op + } +} \ No newline at end of file diff --git a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java index 0b0f643293..bffa6203ae 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java @@ -37,18 +37,39 @@ public class XdsClientJmx implements XdsClientJmxMBean private final AtomicInteger _resourceNotFoundCount = new AtomicInteger(); private final AtomicInteger _resourceInvalidCount = new AtomicInteger(); private final XdsServerMetricsProvider _xdsServerMetricsProvider; + private final XdsClientOtelMetricsProvider _otelMetricsProvider; + + private String _clientName = "-"; + @Nullable private XdsClientImpl _xdsClient = null; @Deprecated public XdsClientJmx() { - this(new NoOpXdsServerMetricsProvider()); + this(new NoOpXdsServerMetricsProvider(), new NoOpXdsClientOtelMetricsProvider()); } public XdsClientJmx(XdsServerMetricsProvider xdsServerMetricsProvider) + { + this(xdsServerMetricsProvider, new NoOpXdsClientOtelMetricsProvider()); + } + + public XdsClientJmx(XdsServerMetricsProvider xdsServerMetricsProvider, + XdsClientOtelMetricsProvider otelMetricsProvider) { _xdsServerMetricsProvider = xdsServerMetricsProvider == null ? new NoOpXdsServerMetricsProvider() : xdsServerMetricsProvider; + _otelMetricsProvider = otelMetricsProvider == null ? + new NoOpXdsClientOtelMetricsProvider() : otelMetricsProvider; + } + + // Method to set client name (called from D2ClientJmxManager) + public void setClientName(String clientName) { + _clientName = clientName != "-" ? clientName : "-"; + } + + public String getClientName() { + return _clientName; } public void setXdsClient(XdsClientImpl xdsClient) @@ -146,55 +167,66 @@ public int isDisconnected() @Override public long getActiveInitialWaitTimeMillis() { + long waitTime = -1; if (_xdsClient != null) { - return _xdsClient.getActiveInitialWaitTimeMillis(); + waitTime = _xdsClient.getActiveInitialWaitTimeMillis(); + _otelMetricsProvider.updateActiveInitialWaitTime(_clientName, waitTime); } - return -1; + return waitTime; } public void incrementConnectionLostCount() { _connectionLostCount.incrementAndGet(); + _otelMetricsProvider.recordConnectionLost(_clientName); } public void incrementConnectionClosedCount() { _connectionClosedCount.incrementAndGet(); + _otelMetricsProvider.recordConnectionClosed(_clientName); } public void incrementReconnectionCount() { _reconnectionCount.incrementAndGet(); + _otelMetricsProvider.recordReconnection(_clientName); } public void incrementRequestSentCount() { _resquestSentCount.incrementAndGet(); + _otelMetricsProvider.recordRequestSent(_clientName); } public void addToIrvSentCount(int delta) { _irvSentCount.addAndGet(delta); + _otelMetricsProvider.recordInitialResourceVersionSent(_clientName, delta); } public void incrementResponseReceivedCount() { _responseReceivedCount.incrementAndGet(); + _otelMetricsProvider.recordResponseReceived(_clientName); } public void setIsConnected(boolean connected) { _isConnected.getAndSet(connected); + _otelMetricsProvider.updateConnectionState(_clientName, connected); } public void incrementResourceNotFoundCount() { _resourceNotFoundCount.incrementAndGet(); + _otelMetricsProvider.recordResourceNotFound(_clientName); } public void incrementResourceInvalidCount() { _resourceInvalidCount.incrementAndGet(); + _otelMetricsProvider.recordResourceInvalid(_clientName); } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java new file mode 100644 index 0000000000..a6bd706676 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java @@ -0,0 +1,29 @@ +package com.linkedin.d2.jmx; + +/** + * Interface for OpenTelemetry metrics collection for XDS Client. + * Provides raw event-based metrics that will be processed by OpenTelemetry in the container. + */ +public interface XdsClientOtelMetricsProvider { + + // Connection state changes (event-driven counters) + void recordConnectionLost(String clientName); + void recordConnectionClosed(String clientName); + void recordReconnection(String clientName); + + // Request/Response counters (event-driven) + void recordRequestSent(String clientName); + void recordResponseReceived(String clientName); + void recordInitialResourceVersionSent(String clientName, int count); + + // Error counters (event-driven) + void recordResourceNotFound(String clientName); + void recordResourceInvalid(String clientName); + + // Server latency histogram (event-driven) + void recordServerLatency(String clientName, long latencyMs); + + // Gauge updates (on-demand) + void updateConnectionState(String clientName, boolean isConnected); + void updateActiveInitialWaitTime(String clientName, long waitTimeMs); +} \ No newline at end of file diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index e4faa58326..210625f6fe 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -23,8 +23,10 @@ import com.google.common.collect.Maps; import com.google.protobuf.util.Timestamps; import com.google.rpc.Code; +import com.linkedin.d2.jmx.NoOpXdsClientOtelMetricsProvider; import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider; import com.linkedin.d2.jmx.XdsClientJmx; +import com.linkedin.d2.jmx.XdsClientOtelMetricsProvider; import com.linkedin.d2.jmx.XdsServerMetricsProvider; import com.linkedin.d2.xds.GlobCollectionUtils.D2UriIdentifier; import com.linkedin.util.RateLimitedLogger; @@ -115,6 +117,7 @@ public class XdsClientImpl extends XdsClient private final XdsClientJmx _xdsClientJmx; private final XdsServerMetricsProvider _serverMetricsProvider; + private final XdsClientOtelMetricsProvider _otelMetricsProvider; private final boolean _initialResourceVersionsEnabled; private final String _minimumJavaVersion; private final XdsClientValidator.ActionOnPrecheckFailure _actionOnPrecheckFailure; @@ -200,6 +203,22 @@ public XdsClientImpl(Node node, Integer maxRetryBackoffSeconds, String minimumJavaVersion, XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure) + { + this(node, managedChannel, executorService, readyTimeoutMillis, subscribeToUriGlobCollection, + serverMetricsProvider, new NoOpXdsClientOtelMetricsProvider(), irvSupport, maxRetryBackoffSeconds, XdsClientValidator.DEFAULT_MINIMUM_JAVA_VERSION, XdsClientValidator.DEFAULT_ACTION_ON_PRECHECK_FAILURE); + } + + public XdsClientImpl(Node node, + ManagedChannel managedChannel, + ScheduledExecutorService executorService, + long readyTimeoutMillis, + boolean subscribeToUriGlobCollection, + XdsServerMetricsProvider serverMetricsProvider, + XdsClientOtelMetricsProvider otelMetricsProvider, + boolean irvSupport, + Integer maxRetryBackoffSeconds, + String minimumJavaVersion, + XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure) { _readyTimeoutMillis = readyTimeoutMillis; _node = node; @@ -212,8 +231,9 @@ public XdsClientImpl(Node node, _log.info("Glob collection support enabled"); } - _xdsClientJmx = new XdsClientJmx(serverMetricsProvider); + _xdsClientJmx = new XdsClientJmx(serverMetricsProvider, otelMetricsProvider); _serverMetricsProvider = serverMetricsProvider == null ? new NoOpXdsServerMetricsProvider() : serverMetricsProvider; + _otelMetricsProvider = otelMetricsProvider == null ? new NoOpXdsClientOtelMetricsProvider() : otelMetricsProvider; _initialResourceVersionsEnabled = irvSupport; if (_initialResourceVersionsEnabled) { @@ -799,7 +819,9 @@ private void handleResourceUpdate(Map updates, if (wildcardSubscriber != null) { - wildcardSubscriber.onData(entry.getKey(), entry.getValue(), _serverMetricsProvider); + // wildcardSubscriber.onData(entry.getKey(), entry.getValue(), _serverMetricsProvider); + String clientName = _xdsClientJmx.getClientName(); + wildcardSubscriber.onData(entry.getKey(), entry.getValue(), _serverMetricsProvider, _otelMetricsProvider, clientName); } } } @@ -996,6 +1018,12 @@ void addWatcher(ResourceWatcher watcher) @VisibleForTesting void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider) + { + onData(data, metricsProvider, null); + } + + @VisibleForTesting + void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider, XdsClientOtelMetricsProvider otelMetricsProvider) { SubscriberFetchState prev = _fetchState.getAndSet(FETCHED); if (!FETCHED.equals(prev)) @@ -1009,7 +1037,11 @@ void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider) { // Even though the data is the same, the subscriber is waiting for init data after either startup // or a reconnection, so we need to track latency. - trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev); + // trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev); + // Get client name and OpenTelemetry provider from XdsClientJmx + String clientName = _xdsClientJmx.getClientName(); + // data updated, track xds server latency + trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev, otelMetricsProvider, clientName); } _log.debug("Received resource update data equal to the current data. Will not perform any update."); return; @@ -1191,6 +1223,13 @@ void addWatcher(WildcardResourceWatcher watcher) @VisibleForTesting void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider metricsProvider) + { + onData(resourceName, data, metricsProvider, null, "-"); + } + + @VisibleForTesting + void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider metricsProvider, + XdsClientOtelMetricsProvider otelMetricsProvider, String clientName) { if (Objects.equals(_data.get(resourceName), data)) { @@ -1198,7 +1237,9 @@ void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider m { // Even though the data is the same, the subscriber is waiting for init data after either startup // or a reconnection, so we need to track latency. - trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get()); + // trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get()); + // Now we can pass OpenTelemetry provider and client name for wildcard subscribers too + trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get(), otelMetricsProvider, clientName); } _log.debug("Received resource update data equal to the current data. Will not perform the update."); return; @@ -1345,6 +1386,13 @@ private boolean shouldSubscribeUriGlobCollection(ResourceType type) private static void trackServerLatency(ResourceUpdate resourceUpdate, ResourceUpdate currentData, XdsServerMetricsProvider metricsProvider, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState) + { + trackServerLatency(resourceUpdate, currentData, metricsProvider, subscribedAt, isIrvEnabled, fetchState, null, "-"); + } + + private static void trackServerLatency(ResourceUpdate resourceUpdate, ResourceUpdate currentData, + XdsServerMetricsProvider metricsProvider, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState, + XdsClientOtelMetricsProvider otelMetricsProvider, String clientName) { long now = SystemClock.instance().currentTimeMillis(); if (resourceUpdate instanceof NodeUpdate) @@ -1355,7 +1403,7 @@ private static void trackServerLatency(ResourceUpdate resourceUpdate, ResourceUp return; } trackServerLatencyHelper(metricsProvider, now, nodeData.getStat().getMtime(), subscribedAt, - isIrvEnabled, fetchState); + isIrvEnabled, fetchState, otelMetricsProvider, clientName); } else if (resourceUpdate instanceof D2URIMapUpdate) { @@ -1370,9 +1418,9 @@ else if (resourceUpdate instanceof D2URIMapUpdate) Map.Entry::getKey, e -> e.getValue().leftValue()) // new data of updated uris ); - trackServerLatencyForUris(updatedUris, update, metricsProvider, now, subscribedAt, isIrvEnabled, fetchState); + trackServerLatencyForUris(updatedUris, update, metricsProvider, now, subscribedAt, isIrvEnabled, fetchState, otelMetricsProvider, clientName); trackServerLatencyForUris(rawDiff.entriesOnlyOnLeft(), update, metricsProvider, now, subscribedAt, - isIrvEnabled, fetchState); // newly added uris + isIrvEnabled, fetchState, otelMetricsProvider, clientName); // newly added uris } else if (resourceUpdate instanceof D2URIUpdate) { @@ -1382,7 +1430,7 @@ else if (resourceUpdate instanceof D2URIUpdate) { update.setIsStaleModifiedTime( trackServerLatencyHelper(metricsProvider, now, Timestamps.toMillis(uri.getModifiedTime()), subscribedAt, - isIrvEnabled, fetchState) + isIrvEnabled, fetchState, otelMetricsProvider, clientName) ); } } @@ -1391,10 +1439,17 @@ else if (resourceUpdate instanceof D2URIUpdate) private static void trackServerLatencyForUris(Map uriMap, D2URIMapUpdate update, XdsServerMetricsProvider metricsProvider, long end, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState) + { + trackServerLatencyForUris(uriMap, update, metricsProvider, end, subscribedAt, isIrvEnabled, fetchState, null, "-"); + } + + private static void trackServerLatencyForUris(Map uriMap, D2URIMapUpdate update, + XdsServerMetricsProvider metricsProvider, long end, long subscribedAt, boolean isIrvEnabled, + SubscriberFetchState fetchState, XdsClientOtelMetricsProvider otelMetricsProvider, String clientName) { uriMap.forEach((k, v) -> { boolean isStaleModifiedTime = trackServerLatencyHelper(metricsProvider, end, Timestamps.toMillis(v.getModifiedTime()), subscribedAt, - isIrvEnabled, fetchState); + isIrvEnabled, fetchState, otelMetricsProvider, clientName); update.setIsStaleModifiedTime(k, isStaleModifiedTime); } ); @@ -1410,6 +1465,13 @@ private static void trackServerLatencyForUris(Map uriMap, D // on the resource modified time. private static boolean trackServerLatencyHelper(XdsServerMetricsProvider metricsProvider, long end, long modifiedAt, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState) + { + return trackServerLatencyHelper(metricsProvider, end, modifiedAt, subscribedAt, isIrvEnabled, fetchState, null, "-"); + } + + private static boolean trackServerLatencyHelper(XdsServerMetricsProvider metricsProvider, + long end, long modifiedAt, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState, + XdsClientOtelMetricsProvider otelMetricsProvider, String clientName) { long start; boolean isStaleModifiedAt; @@ -1423,7 +1485,14 @@ private static boolean trackServerLatencyHelper(XdsServerMetricsProvider metrics start = Math.max(modifiedAt, subscribedAt); isStaleModifiedAt = modifiedAt < subscribedAt; } - metricsProvider.trackLatency(end - start); + long latency = end - start; + metricsProvider.trackLatency(latency); + + // Record OpenTelemetry latency if provider is available + if (otelMetricsProvider != null) + { + otelMetricsProvider.recordServerLatency(clientName, latency); + } return isStaleModifiedAt; } diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index 4c77c81433..2c7ba2f101 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -69,6 +69,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) xdsStreamReadyTimeout, config.subscribeToUriGlobCollection, config._xdsServerMetricsProvider, + config._otelMetricsProvider, config.xdsInitialResourceVersionsEnabled, config.xdsStreamMaxRetryBackoffSeconds, config.xdsMinimumJavaVersion, diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index 57c27f34aa..d826a1ec2e 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -542,7 +542,7 @@ public void testHandleD2URIMapUpdateWithEmptyResponse() { fixture.verifyAckSent(2); // onData is called only once. Empty response does not trigger onData calls. verify(fixture._clusterSubscriber).onData(any(), any()); - verify(fixture._uriMapWildcardSubscriber).onData(any(), any(), any()); + verify(fixture._uriMapWildcardSubscriber).onData(any(), any(), any(), any(), any()); } @Test(dataProvider = "providerWatcherFlags") From 54a840160654a5bfb31004feff485deae171dc75 Mon Sep 17 00:00:00 2001 From: adiraj_linkedin Date: Sun, 23 Nov 2025 10:51:58 +0530 Subject: [PATCH 2/4] Added more logic for the XdsClient Otel metrics --- .../linkedin/d2/balancer/D2ClientBuilder.java | 7 ++ .../linkedin/d2/balancer/D2ClientConfig.java | 16 +++-- .../linkedin/d2/jmx/D2ClientJmxManager.java | 25 +------ .../com/linkedin/d2/jmx/XdsClientJmx.java | 34 ++++----- .../d2/jmx/XdsClientOtelMetricsProvider.java | 70 +++++++++++++++++-- .../com/linkedin/d2/xds/XdsClientImpl.java | 68 +++++++++--------- .../XdsLoadBalancerWithFacilitiesFactory.java | 2 +- .../linkedin/d2/xds/TestXdsClientImpl.java | 8 ++- 8 files changed, 141 insertions(+), 89 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java index 88dba60f21..c330297a20 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java @@ -52,6 +52,7 @@ import com.linkedin.d2.discovery.stores.zk.ZKPersistentConnection; import com.linkedin.d2.discovery.stores.zk.ZooKeeper; import com.linkedin.d2.jmx.XdsServerMetricsProvider; +import com.linkedin.d2.jmx.XdsClientOtelMetricsProvider; import com.linkedin.d2.jmx.JmxManager; import com.linkedin.d2.xds.XdsClientValidator; import com.linkedin.d2.jmx.NoOpJmxManager; @@ -237,6 +238,7 @@ public D2Client build() _config.xdsChannelLoadBalancingPolicyConfig, _config.subscribeToUriGlobCollection, _config._xdsServerMetricsProvider, + _config._xdsClientOtelMetricsProvider, _config.loadBalanceStreamException, _config.xdsInitialResourceVersionsEnabled, _config.disableDetectLiRawD2Client, @@ -856,6 +858,11 @@ public D2ClientBuilder setXdsServerMetricsProvider(XdsServerMetricsProvider xdsS return this; } + public D2ClientBuilder setXdsClientOtelMetricsProvider(XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider) { + _config._xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider; + return this; + } + public D2ClientBuilder setLoadBalanceStreamException(boolean loadBalanceStreamException) { _config.loadBalanceStreamException = loadBalanceStreamException; return this; diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index 44ea58566c..cad73b3c45 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -183,7 +183,7 @@ public class D2ClientConfig public boolean subscribeToUriGlobCollection = false; public XdsServerMetricsProvider _xdsServerMetricsProvider = new NoOpXdsServerMetricsProvider(); - public XdsClientOtelMetricsProvider _otelMetricsProvider = new NoOpXdsClientOtelMetricsProvider(); + public XdsClientOtelMetricsProvider _xdsClientOtelMetricsProvider = new NoOpXdsClientOtelMetricsProvider(); public boolean loadBalanceStreamException = false; public boolean xdsInitialResourceVersionsEnabled = false; public Integer xdsStreamMaxRetryBackoffSeconds = null; @@ -345,7 +345,10 @@ public D2ClientConfig() xdsStreamMaxRetryBackoffSeconds, xdsChannelKeepAliveTimeMins, xdsMinimumJavaVersion, - actionOnPrecheckFailure); + actionOnPrecheckFailure, + d2CalleeInfoRecorder, + enableIndisDownstreamServicesFetcher, + indisDownstreamServicesFetchTimeout); } D2ClientConfig(String zkHosts, @@ -420,7 +423,7 @@ public D2ClientConfig() Map xdsChannelLoadBalancingPolicyConfig, boolean subscribeToUriGlobCollection, XdsServerMetricsProvider xdsServerMetricsProvider, - XdsClientOtelMetricsProvider otelMetricsProvider, + XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, boolean loadBalanceStreamException, boolean xdsInitialResourceVersionsEnabled, boolean disableDetectLiRawD2Client, @@ -428,7 +431,10 @@ public D2ClientConfig() Integer xdsStreamMaxRetryBackoffSeconds, Long xdsChannelKeepAliveTimeMins, String xdsMinimumJavaVersion, - XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure) + XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure, + D2CalleeInfoRecorder d2CalleeInfoRecorder, + Boolean enableIndisDownstreamServicesFetcher, + Duration indisDownstreamServicesFetchTimeout) { this.zkHosts = zkHosts; this.xdsServer = xdsServer; @@ -503,7 +509,7 @@ public D2ClientConfig() this.xdsChannelKeepAliveTimeMins = xdsChannelKeepAliveTimeMins; this.subscribeToUriGlobCollection = subscribeToUriGlobCollection; this._xdsServerMetricsProvider = xdsServerMetricsProvider; - this._otelMetricsProvider = otelMetricsProvider; + this._xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider; this.loadBalanceStreamException = loadBalanceStreamException; this.xdsInitialResourceVersionsEnabled = xdsInitialResourceVersionsEnabled; this.disableDetectLiRawD2Client = disableDetectLiRawD2Client; diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java index 90710230cb..f640f5bd9a 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java @@ -307,34 +307,13 @@ public void registerXdsClientJmx(XdsClientJmx xdsClientJmx) } final String jmxName = String.format("%s-XdsClientJmx", getGlobalPrefix(null)); - // Extract the actual client name from the JMX name pattern - String clientName = extractClientNameFromGlobalPrefix(getGlobalPrefix(null)); + // Get the client name from global prefix + String clientName = getGlobalPrefix(null); xdsClientJmx.setClientName(clientName); _jmxManager.registerXdsClientJmxBean(jmxName, xdsClientJmx); } - /** - * Extracts the client name from the JMX name. - * @param jmxName The full JMX name ending with "-XdsClientJmx" - * @return The extracted client name - */ - private String extractClientNameFromGlobalPrefix(String prefix) - { - if(prefix == null){ - return "-"; - } - int lastDash = prefix.lastIndexOf('-'); - if (lastDash == -1 || lastDash == prefix.length() - 1) { - return "-"; - } - String clientName = prefix.substring(lastDash + 1); - if (!clientName.endsWith("Client")) { - return "-"; - } - return clientName; - } - private void doRegisterLoadBalancer(SimpleLoadBalancer balancer, @Nullable DualReadModeProvider.DualReadMode mode) { final String jmxName = String.format("%s-LoadBalancer", getGlobalPrefix(mode)); diff --git a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java index bffa6203ae..c406e1b152 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java @@ -37,7 +37,7 @@ public class XdsClientJmx implements XdsClientJmxMBean private final AtomicInteger _resourceNotFoundCount = new AtomicInteger(); private final AtomicInteger _resourceInvalidCount = new AtomicInteger(); private final XdsServerMetricsProvider _xdsServerMetricsProvider; - private final XdsClientOtelMetricsProvider _otelMetricsProvider; + private final XdsClientOtelMetricsProvider _xdsClientOtelMetricsProvider; private String _clientName = "-"; @@ -46,26 +46,26 @@ public class XdsClientJmx implements XdsClientJmxMBean @Deprecated public XdsClientJmx() { - this(new NoOpXdsServerMetricsProvider(), new NoOpXdsClientOtelMetricsProvider()); + this(new NoOpXdsServerMetricsProvider(), null); } public XdsClientJmx(XdsServerMetricsProvider xdsServerMetricsProvider) { - this(xdsServerMetricsProvider, new NoOpXdsClientOtelMetricsProvider()); + this(xdsServerMetricsProvider, null); } public XdsClientJmx(XdsServerMetricsProvider xdsServerMetricsProvider, - XdsClientOtelMetricsProvider otelMetricsProvider) + XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider) { _xdsServerMetricsProvider = xdsServerMetricsProvider == null ? new NoOpXdsServerMetricsProvider() : xdsServerMetricsProvider; - _otelMetricsProvider = otelMetricsProvider == null ? - new NoOpXdsClientOtelMetricsProvider() : otelMetricsProvider; + _xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider == null ? + new NoOpXdsClientOtelMetricsProvider() : xdsClientOtelMetricsProvider; } // Method to set client name (called from D2ClientJmxManager) public void setClientName(String clientName) { - _clientName = clientName != "-" ? clientName : "-"; + _clientName = clientName; } public String getClientName() { @@ -171,7 +171,7 @@ public long getActiveInitialWaitTimeMillis() if (_xdsClient != null) { waitTime = _xdsClient.getActiveInitialWaitTimeMillis(); - _otelMetricsProvider.updateActiveInitialWaitTime(_clientName, waitTime); + _xdsClientOtelMetricsProvider.updateActiveInitialWaitTime(_clientName, waitTime); } return waitTime; } @@ -179,54 +179,54 @@ public long getActiveInitialWaitTimeMillis() public void incrementConnectionLostCount() { _connectionLostCount.incrementAndGet(); - _otelMetricsProvider.recordConnectionLost(_clientName); + _xdsClientOtelMetricsProvider.recordConnectionLost(_clientName); } public void incrementConnectionClosedCount() { _connectionClosedCount.incrementAndGet(); - _otelMetricsProvider.recordConnectionClosed(_clientName); + _xdsClientOtelMetricsProvider.recordConnectionClosed(_clientName); } public void incrementReconnectionCount() { _reconnectionCount.incrementAndGet(); - _otelMetricsProvider.recordReconnection(_clientName); + _xdsClientOtelMetricsProvider.recordReconnection(_clientName); } public void incrementRequestSentCount() { _resquestSentCount.incrementAndGet(); - _otelMetricsProvider.recordRequestSent(_clientName); + _xdsClientOtelMetricsProvider.recordRequestSent(_clientName); } public void addToIrvSentCount(int delta) { _irvSentCount.addAndGet(delta); - _otelMetricsProvider.recordInitialResourceVersionSent(_clientName, delta); + _xdsClientOtelMetricsProvider.recordInitialResourceVersionSent(_clientName, delta); } public void incrementResponseReceivedCount() { _responseReceivedCount.incrementAndGet(); - _otelMetricsProvider.recordResponseReceived(_clientName); + _xdsClientOtelMetricsProvider.recordResponseReceived(_clientName); } public void setIsConnected(boolean connected) { _isConnected.getAndSet(connected); - _otelMetricsProvider.updateConnectionState(_clientName, connected); + _xdsClientOtelMetricsProvider.updateConnectionState(_clientName, connected); } public void incrementResourceNotFoundCount() { _resourceNotFoundCount.incrementAndGet(); - _otelMetricsProvider.recordResourceNotFound(_clientName); + _xdsClientOtelMetricsProvider.recordResourceNotFound(_clientName); } public void incrementResourceInvalidCount() { _resourceInvalidCount.incrementAndGet(); - _otelMetricsProvider.recordResourceInvalid(_clientName); + _xdsClientOtelMetricsProvider.recordResourceInvalid(_clientName); } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java index a6bd706676..9ed59b0194 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java @@ -6,24 +6,84 @@ */ public interface XdsClientOtelMetricsProvider { - // Connection state changes (event-driven counters) + /** + * Records a connection lost event in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ void recordConnectionLost(String clientName); + + /** + * Records a connection closed event in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ void recordConnectionClosed(String clientName); + + /** + * Records a reconnection event in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ void recordReconnection(String clientName); - // Request/Response counters (event-driven) + /** + * Records a request sent event in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ void recordRequestSent(String clientName); + + /** + * Records a response received event in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ void recordResponseReceived(String clientName); + + /** + * Records initial resource version sent count in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + * @param count the count to add + */ void recordInitialResourceVersionSent(String clientName, int count); - // Error counters (event-driven) + /** + * Records a resource not found error in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ void recordResourceNotFound(String clientName); + + /** + * Records a resource invalid error in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ void recordResourceInvalid(String clientName); - // Server latency histogram (event-driven) + /** + * Records server latency in the OpenTelemetry histogram. + * + * @param clientName the name of the XDS client + * @param latencyMs the latency in milliseconds + */ void recordServerLatency(String clientName, long latencyMs); - // Gauge updates (on-demand) + /** + * Updates the connection state for a client. + * + * @param clientName the name of the XDS client + * @param isConnected whether the client is connected + */ void updateConnectionState(String clientName, boolean isConnected); + + /** + * Updates the active initial wait time for a client. + * + * @param clientName the name of the XDS client + * @param waitTimeMs the wait time in milliseconds + */ void updateActiveInitialWaitTime(String clientName, long waitTimeMs); } \ No newline at end of file diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 210625f6fe..8b4d56aa68 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -84,6 +84,7 @@ public class XdsClientImpl extends XdsClient new RateLimitedLogger(_log, TimeUnit.MINUTES.toMillis(1), SystemClock.instance()); public static final long DEFAULT_READY_TIMEOUT_MILLIS = 2000L; public static final Integer DEFAULT_MAX_RETRY_BACKOFF_SECS = 30; // default value for max retry backoff seconds + private static final String NO_VALUE = "-"; /** * The resource subscribers maps the resource type to its subscribers. Note that the {@link ResourceType#D2_URI} @@ -117,7 +118,7 @@ public class XdsClientImpl extends XdsClient private final XdsClientJmx _xdsClientJmx; private final XdsServerMetricsProvider _serverMetricsProvider; - private final XdsClientOtelMetricsProvider _otelMetricsProvider; + private final XdsClientOtelMetricsProvider _xdsClientOtelMetricsProvider; private final boolean _initialResourceVersionsEnabled; private final String _minimumJavaVersion; private final XdsClientValidator.ActionOnPrecheckFailure _actionOnPrecheckFailure; @@ -205,7 +206,7 @@ public XdsClientImpl(Node node, XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure) { this(node, managedChannel, executorService, readyTimeoutMillis, subscribeToUriGlobCollection, - serverMetricsProvider, new NoOpXdsClientOtelMetricsProvider(), irvSupport, maxRetryBackoffSeconds, XdsClientValidator.DEFAULT_MINIMUM_JAVA_VERSION, XdsClientValidator.DEFAULT_ACTION_ON_PRECHECK_FAILURE); + serverMetricsProvider, null, irvSupport, maxRetryBackoffSeconds, XdsClientValidator.DEFAULT_MINIMUM_JAVA_VERSION, XdsClientValidator.DEFAULT_ACTION_ON_PRECHECK_FAILURE); } public XdsClientImpl(Node node, @@ -214,7 +215,7 @@ public XdsClientImpl(Node node, long readyTimeoutMillis, boolean subscribeToUriGlobCollection, XdsServerMetricsProvider serverMetricsProvider, - XdsClientOtelMetricsProvider otelMetricsProvider, + XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, boolean irvSupport, Integer maxRetryBackoffSeconds, String minimumJavaVersion, @@ -231,9 +232,9 @@ public XdsClientImpl(Node node, _log.info("Glob collection support enabled"); } - _xdsClientJmx = new XdsClientJmx(serverMetricsProvider, otelMetricsProvider); + _xdsClientJmx = new XdsClientJmx(serverMetricsProvider, xdsClientOtelMetricsProvider); _serverMetricsProvider = serverMetricsProvider == null ? new NoOpXdsServerMetricsProvider() : serverMetricsProvider; - _otelMetricsProvider = otelMetricsProvider == null ? new NoOpXdsClientOtelMetricsProvider() : otelMetricsProvider; + _xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider == null ? new NoOpXdsClientOtelMetricsProvider() : xdsClientOtelMetricsProvider; _initialResourceVersionsEnabled = irvSupport; if (_initialResourceVersionsEnabled) { @@ -698,7 +699,7 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) || uriSubscriber.getData() == null // The URI was corrupted and there was no previous version of this URI ) { - uriSubscriber.onData(new D2URIUpdate(uri), _serverMetricsProvider); + uriSubscriber.onData(new D2URIUpdate(uri), _serverMetricsProvider, _xdsClientOtelMetricsProvider, _xdsClientJmx.getClientName()); } } @@ -806,6 +807,7 @@ private void processResourceChanges(ResourceType type, Map updates, ResourceType type) { + String clientName = _xdsClientJmx.getClientName(); Map subscribers = getResourceSubscriberMap(type); WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); @@ -814,14 +816,12 @@ private void handleResourceUpdate(Map updates, ResourceSubscriber subscriber = subscribers.get(entry.getKey()); if (subscriber != null) { - subscriber.onData(entry.getValue(), _serverMetricsProvider); + subscriber.onData(entry.getValue(), _serverMetricsProvider, _xdsClientOtelMetricsProvider, clientName); } if (wildcardSubscriber != null) { - // wildcardSubscriber.onData(entry.getKey(), entry.getValue(), _serverMetricsProvider); - String clientName = _xdsClientJmx.getClientName(); - wildcardSubscriber.onData(entry.getKey(), entry.getValue(), _serverMetricsProvider, _otelMetricsProvider, clientName); + wildcardSubscriber.onData(entry.getKey(), entry.getValue(), _serverMetricsProvider, _xdsClientOtelMetricsProvider, clientName); } } } @@ -1019,11 +1019,11 @@ void addWatcher(ResourceWatcher watcher) @VisibleForTesting void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider) { - onData(data, metricsProvider, null); + onData(data, metricsProvider, null, NO_VALUE); } @VisibleForTesting - void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider, XdsClientOtelMetricsProvider otelMetricsProvider) + void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider, XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, String clientName) { SubscriberFetchState prev = _fetchState.getAndSet(FETCHED); if (!FETCHED.equals(prev)) @@ -1037,11 +1037,8 @@ void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider, XdsCl { // Even though the data is the same, the subscriber is waiting for init data after either startup // or a reconnection, so we need to track latency. - // trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev); - // Get client name and OpenTelemetry provider from XdsClientJmx - String clientName = _xdsClientJmx.getClientName(); // data updated, track xds server latency - trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev, otelMetricsProvider, clientName); + trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev, xdsClientOtelMetricsProvider, clientName); } _log.debug("Received resource update data equal to the current data. Will not perform any update."); return; @@ -1050,7 +1047,7 @@ void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider, XdsCl // null value guard to avoid overwriting the property with null if (data != null && data.isValid()) { - trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev); + trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev, xdsClientOtelMetricsProvider, clientName); _data = data; } else @@ -1224,12 +1221,12 @@ void addWatcher(WildcardResourceWatcher watcher) @VisibleForTesting void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider metricsProvider) { - onData(resourceName, data, metricsProvider, null, "-"); + onData(resourceName, data, metricsProvider, null, NO_VALUE); } @VisibleForTesting void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider metricsProvider, - XdsClientOtelMetricsProvider otelMetricsProvider, String clientName) + XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, String clientName) { if (Objects.equals(_data.get(resourceName), data)) { @@ -1237,9 +1234,8 @@ void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider m { // Even though the data is the same, the subscriber is waiting for init data after either startup // or a reconnection, so we need to track latency. - // trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get()); - // Now we can pass OpenTelemetry provider and client name for wildcard subscribers too - trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get(), otelMetricsProvider, clientName); + // we are passing OpenTelemetry provider for wildcard subscribers too + trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get(), xdsClientOtelMetricsProvider, clientName); } _log.debug("Received resource update data equal to the current data. Will not perform the update."); return; @@ -1247,7 +1243,7 @@ void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider m // null value guard to avoid overwriting the property with null if (data != null && data.isValid()) { - trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get()); + trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get(), xdsClientOtelMetricsProvider, clientName); _data.put(resourceName, data); } else @@ -1387,12 +1383,12 @@ private boolean shouldSubscribeUriGlobCollection(ResourceType type) private static void trackServerLatency(ResourceUpdate resourceUpdate, ResourceUpdate currentData, XdsServerMetricsProvider metricsProvider, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState) { - trackServerLatency(resourceUpdate, currentData, metricsProvider, subscribedAt, isIrvEnabled, fetchState, null, "-"); + trackServerLatency(resourceUpdate, currentData, metricsProvider, subscribedAt, isIrvEnabled, fetchState, null, NO_VALUE); } private static void trackServerLatency(ResourceUpdate resourceUpdate, ResourceUpdate currentData, XdsServerMetricsProvider metricsProvider, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState, - XdsClientOtelMetricsProvider otelMetricsProvider, String clientName) + XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, String clientName) { long now = SystemClock.instance().currentTimeMillis(); if (resourceUpdate instanceof NodeUpdate) @@ -1403,7 +1399,7 @@ private static void trackServerLatency(ResourceUpdate resourceUpdate, ResourceUp return; } trackServerLatencyHelper(metricsProvider, now, nodeData.getStat().getMtime(), subscribedAt, - isIrvEnabled, fetchState, otelMetricsProvider, clientName); + isIrvEnabled, fetchState, xdsClientOtelMetricsProvider, clientName); } else if (resourceUpdate instanceof D2URIMapUpdate) { @@ -1418,9 +1414,9 @@ else if (resourceUpdate instanceof D2URIMapUpdate) Map.Entry::getKey, e -> e.getValue().leftValue()) // new data of updated uris ); - trackServerLatencyForUris(updatedUris, update, metricsProvider, now, subscribedAt, isIrvEnabled, fetchState, otelMetricsProvider, clientName); + trackServerLatencyForUris(updatedUris, update, metricsProvider, now, subscribedAt, isIrvEnabled, fetchState, xdsClientOtelMetricsProvider, clientName); trackServerLatencyForUris(rawDiff.entriesOnlyOnLeft(), update, metricsProvider, now, subscribedAt, - isIrvEnabled, fetchState, otelMetricsProvider, clientName); // newly added uris + isIrvEnabled, fetchState, xdsClientOtelMetricsProvider, clientName); // newly added uris } else if (resourceUpdate instanceof D2URIUpdate) { @@ -1430,7 +1426,7 @@ else if (resourceUpdate instanceof D2URIUpdate) { update.setIsStaleModifiedTime( trackServerLatencyHelper(metricsProvider, now, Timestamps.toMillis(uri.getModifiedTime()), subscribedAt, - isIrvEnabled, fetchState, otelMetricsProvider, clientName) + isIrvEnabled, fetchState, xdsClientOtelMetricsProvider, clientName) ); } } @@ -1440,16 +1436,16 @@ private static void trackServerLatencyForUris(Map uriMap, D XdsServerMetricsProvider metricsProvider, long end, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState) { - trackServerLatencyForUris(uriMap, update, metricsProvider, end, subscribedAt, isIrvEnabled, fetchState, null, "-"); + trackServerLatencyForUris(uriMap, update, metricsProvider, end, subscribedAt, isIrvEnabled, fetchState, null, NO_VALUE); } private static void trackServerLatencyForUris(Map uriMap, D2URIMapUpdate update, XdsServerMetricsProvider metricsProvider, long end, long subscribedAt, boolean isIrvEnabled, - SubscriberFetchState fetchState, XdsClientOtelMetricsProvider otelMetricsProvider, String clientName) + SubscriberFetchState fetchState, XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, String clientName) { uriMap.forEach((k, v) -> { boolean isStaleModifiedTime = trackServerLatencyHelper(metricsProvider, end, Timestamps.toMillis(v.getModifiedTime()), subscribedAt, - isIrvEnabled, fetchState, otelMetricsProvider, clientName); + isIrvEnabled, fetchState, xdsClientOtelMetricsProvider, clientName); update.setIsStaleModifiedTime(k, isStaleModifiedTime); } ); @@ -1466,12 +1462,12 @@ private static void trackServerLatencyForUris(Map uriMap, D private static boolean trackServerLatencyHelper(XdsServerMetricsProvider metricsProvider, long end, long modifiedAt, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState) { - return trackServerLatencyHelper(metricsProvider, end, modifiedAt, subscribedAt, isIrvEnabled, fetchState, null, "-"); + return trackServerLatencyHelper(metricsProvider, end, modifiedAt, subscribedAt, isIrvEnabled, fetchState, null, NO_VALUE); } private static boolean trackServerLatencyHelper(XdsServerMetricsProvider metricsProvider, long end, long modifiedAt, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState, - XdsClientOtelMetricsProvider otelMetricsProvider, String clientName) + XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, String clientName) { long start; boolean isStaleModifiedAt; @@ -1489,9 +1485,9 @@ private static boolean trackServerLatencyHelper(XdsServerMetricsProvider metrics metricsProvider.trackLatency(latency); // Record OpenTelemetry latency if provider is available - if (otelMetricsProvider != null) + if (xdsClientOtelMetricsProvider != null) { - otelMetricsProvider.recordServerLatency(clientName, latency); + xdsClientOtelMetricsProvider.recordServerLatency(clientName, latency); } return isStaleModifiedAt; } diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index 2c7ba2f101..2d014fede4 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -69,7 +69,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) xdsStreamReadyTimeout, config.subscribeToUriGlobCollection, config._xdsServerMetricsProvider, - config._otelMetricsProvider, + config._xdsClientOtelMetricsProvider, config.xdsInitialResourceVersionsEnabled, config.xdsStreamMaxRetryBackoffSeconds, config.xdsMinimumJavaVersion, diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index d826a1ec2e..56e2aab7ca 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -8,6 +8,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.util.Timestamps; import com.linkedin.d2.jmx.XdsClientJmx; +import com.linkedin.d2.jmx.XdsClientOtelMetricsProvider; import com.linkedin.d2.jmx.XdsServerMetricsProvider; import com.linkedin.d2.xds.XdsClient.D2URIMapUpdate; import com.linkedin.d2.xds.XdsClient.ResourceType; @@ -541,7 +542,7 @@ public void testHandleD2URIMapUpdateWithEmptyResponse() { fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_WITH_EMPTY_URI_MAP_RESPONSE); fixture.verifyAckSent(2); // onData is called only once. Empty response does not trigger onData calls. - verify(fixture._clusterSubscriber).onData(any(), any()); + verify(fixture._clusterSubscriber).onData(any(), any(), any(), any()); verify(fixture._uriMapWildcardSubscriber).onData(any(), any(), any(), any(), any()); } @@ -1262,6 +1263,8 @@ private static final class XdsClientImplFixture { @Mock XdsServerMetricsProvider _serverMetricsProvider; @Mock + XdsClientOtelMetricsProvider _xdsClientOtelMetricsProvider; + @Mock Clock _clock; @Captor @@ -1295,6 +1298,7 @@ private static final class XdsClientImplFixture { doNothing().when(_resourceWatcher).onChanged(any()); doNothing().when(_wildcardResourceWatcher).onChanged(any(), any()); doNothing().when(_serverMetricsProvider).trackLatency(anyLong()); + doNothing().when(_xdsClientOtelMetricsProvider).recordServerLatency(anyString(), anyLong()); for (ResourceSubscriber subscriber : Lists.newArrayList(_nodeSubscriber, _clusterSubscriber, _d2UriSubscriber, _calleesSubscriber)) { @@ -1311,7 +1315,7 @@ private static final class XdsClientImplFixture { _executorService = spy(Executors.newScheduledThreadPool(1)); _xdsClientImpl = spy(new XdsClientImpl(null, mock(ManagedChannel.class), _executorService, 0, useGlobCollections, - _serverMetricsProvider, useIRV)); + _serverMetricsProvider, _xdsClientOtelMetricsProvider, useIRV, null, null, null)); _xdsClientImpl._adsStream = _adsStream; doNothing().when(_xdsClientImpl).startRpcStreamLocal(); From 0df69f4cb23b0ecd94eaa13c7235313df9c8895c Mon Sep 17 00:00:00 2001 From: adiraj_linkedin Date: Mon, 24 Nov 2025 11:59:40 +0530 Subject: [PATCH 3/4] Use NoOpXdsClientOtelMetricsProvider as default to eliminate null handling in XdsClientImpl --- .../com/linkedin/d2/xds/XdsClientImpl.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 8b4d56aa68..b05dee51ed 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -1019,7 +1019,7 @@ void addWatcher(ResourceWatcher watcher) @VisibleForTesting void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider) { - onData(data, metricsProvider, null, NO_VALUE); + onData(data, metricsProvider, new NoOpXdsClientOtelMetricsProvider(), NO_VALUE); } @VisibleForTesting @@ -1221,7 +1221,7 @@ void addWatcher(WildcardResourceWatcher watcher) @VisibleForTesting void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider metricsProvider) { - onData(resourceName, data, metricsProvider, null, NO_VALUE); + onData(resourceName, data, metricsProvider, new NoOpXdsClientOtelMetricsProvider(), NO_VALUE); } @VisibleForTesting @@ -1383,7 +1383,7 @@ private boolean shouldSubscribeUriGlobCollection(ResourceType type) private static void trackServerLatency(ResourceUpdate resourceUpdate, ResourceUpdate currentData, XdsServerMetricsProvider metricsProvider, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState) { - trackServerLatency(resourceUpdate, currentData, metricsProvider, subscribedAt, isIrvEnabled, fetchState, null, NO_VALUE); + trackServerLatency(resourceUpdate, currentData, metricsProvider, subscribedAt, isIrvEnabled, fetchState, new NoOpXdsClientOtelMetricsProvider(), NO_VALUE); } private static void trackServerLatency(ResourceUpdate resourceUpdate, ResourceUpdate currentData, @@ -1436,7 +1436,7 @@ private static void trackServerLatencyForUris(Map uriMap, D XdsServerMetricsProvider metricsProvider, long end, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState) { - trackServerLatencyForUris(uriMap, update, metricsProvider, end, subscribedAt, isIrvEnabled, fetchState, null, NO_VALUE); + trackServerLatencyForUris(uriMap, update, metricsProvider, end, subscribedAt, isIrvEnabled, fetchState, new NoOpXdsClientOtelMetricsProvider(), NO_VALUE); } private static void trackServerLatencyForUris(Map uriMap, D2URIMapUpdate update, @@ -1462,7 +1462,7 @@ private static void trackServerLatencyForUris(Map uriMap, D private static boolean trackServerLatencyHelper(XdsServerMetricsProvider metricsProvider, long end, long modifiedAt, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState) { - return trackServerLatencyHelper(metricsProvider, end, modifiedAt, subscribedAt, isIrvEnabled, fetchState, null, NO_VALUE); + return trackServerLatencyHelper(metricsProvider, end, modifiedAt, subscribedAt, isIrvEnabled, fetchState, new NoOpXdsClientOtelMetricsProvider(), NO_VALUE); } private static boolean trackServerLatencyHelper(XdsServerMetricsProvider metricsProvider, @@ -1484,11 +1484,9 @@ private static boolean trackServerLatencyHelper(XdsServerMetricsProvider metrics long latency = end - start; metricsProvider.trackLatency(latency); - // Record OpenTelemetry latency if provider is available - if (xdsClientOtelMetricsProvider != null) - { - xdsClientOtelMetricsProvider.recordServerLatency(clientName, latency); - } + // Record OpenTelemetry latency + xdsClientOtelMetricsProvider.recordServerLatency(clientName, latency); + return isStaleModifiedAt; } From ea5faa45de350bf626d90fd3beda6bab223c3264 Mon Sep 17 00:00:00 2001 From: Aditya Raj Date: Wed, 26 Nov 2025 15:11:56 +0530 Subject: [PATCH 4/4] Update comments in XdsClientOtelMetricsProvider.java --- .../java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java index 9ed59b0194..5e57d5f5ef 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java @@ -2,7 +2,6 @@ /** * Interface for OpenTelemetry metrics collection for XDS Client. - * Provides raw event-based metrics that will be processed by OpenTelemetry in the container. */ public interface XdsClientOtelMetricsProvider { @@ -86,4 +85,4 @@ public interface XdsClientOtelMetricsProvider { * @param waitTimeMs the wait time in milliseconds */ void updateActiveInitialWaitTime(String clientName, long waitTimeMs); -} \ No newline at end of file +}