diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java index 88dba60f21..c330297a20 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java @@ -52,6 +52,7 @@ import com.linkedin.d2.discovery.stores.zk.ZKPersistentConnection; import com.linkedin.d2.discovery.stores.zk.ZooKeeper; import com.linkedin.d2.jmx.XdsServerMetricsProvider; +import com.linkedin.d2.jmx.XdsClientOtelMetricsProvider; import com.linkedin.d2.jmx.JmxManager; import com.linkedin.d2.xds.XdsClientValidator; import com.linkedin.d2.jmx.NoOpJmxManager; @@ -237,6 +238,7 @@ public D2Client build() _config.xdsChannelLoadBalancingPolicyConfig, _config.subscribeToUriGlobCollection, _config._xdsServerMetricsProvider, + _config._xdsClientOtelMetricsProvider, _config.loadBalanceStreamException, _config.xdsInitialResourceVersionsEnabled, _config.disableDetectLiRawD2Client, @@ -856,6 +858,11 @@ public D2ClientBuilder setXdsServerMetricsProvider(XdsServerMetricsProvider xdsS return this; } + public D2ClientBuilder setXdsClientOtelMetricsProvider(XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider) { + _config._xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider; + return this; + } + public D2ClientBuilder setLoadBalanceStreamException(boolean loadBalanceStreamException) { _config.loadBalanceStreamException = loadBalanceStreamException; return this; diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index d0ffa9e826..cad73b3c45 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -44,6 +44,8 @@ import com.linkedin.d2.jmx.JmxManager; import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider; import com.linkedin.d2.jmx.NoOpJmxManager; +import com.linkedin.d2.jmx.XdsClientOtelMetricsProvider; +import com.linkedin.d2.jmx.NoOpXdsClientOtelMetricsProvider; import com.linkedin.r2.transport.common.TransportClientFactory; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import java.time.Duration; @@ -181,6 +183,7 @@ public class D2ClientConfig public boolean subscribeToUriGlobCollection = false; public XdsServerMetricsProvider _xdsServerMetricsProvider = new NoOpXdsServerMetricsProvider(); + public XdsClientOtelMetricsProvider _xdsClientOtelMetricsProvider = new NoOpXdsClientOtelMetricsProvider(); public boolean loadBalanceStreamException = false; public boolean xdsInitialResourceVersionsEnabled = false; public Integer xdsStreamMaxRetryBackoffSeconds = null; @@ -293,6 +296,145 @@ public D2ClientConfig() D2CalleeInfoRecorder d2CalleeInfoRecorder, Boolean enableIndisDownstreamServicesFetcher, Duration indisDownstreamServicesFetchTimeout) + { + this(zkHosts, xdsServer, hostName, zkSessionTimeoutInMs, zkStartupTimeoutInMs, lbWaitTimeout, lbWaitUnit, + flagFile, basePath, fsBasePath, indisFsBasePath, componentFactory, clientFactories, lbWithFacilitiesFactory, + sslContext, grpcSslContext, sslParameters, isSSLEnabled, shutdownAsynchronously, isSymlinkAware, + clientServicesConfig, d2ServicePath, useNewEphemeralStoreWatcher, healthCheckOperations, executorService, + retry, restRetryEnabled, streamRetryEnabled, retryLimit, retryUpdateIntervalMs, retryAggregatedIntervalNum, + warmUp, warmUpTimeoutSeconds, indisWarmUpTimeoutSeconds, warmUpConcurrentRequests, + indisWarmUpConcurrentRequests, downstreamServicesFetcher, indisDownstreamServicesFetcher, + backupRequestsEnabled, backupRequestsStrategyStatsConsumer, + backupRequestsLatencyNotificationInterval, + backupRequestsLatencyNotificationIntervalUnit, + enableBackupRequestsClientAsync, + backupRequestsExecutorService, + emitter, + partitionAccessorRegistry, + zooKeeperDecorator, + enableSaveUriDataOnDisk, + loadBalancerStrategyFactories, + requestTimeoutHandlerEnabled, + sslSessionValidatorFactory, + zkConnection, + startUpExecutorService, + indisStartUpExecutorService, + jmxManager, + d2JmxManagerPrefix, + zookeeperReadWindowMs, + enableRelativeLoadBalancer, + deterministicSubsettingMetadataProvider, + canaryDistributionProvider, + enableClusterFailout, + failoutConfigProviderFactory, + failoutRedirectStrategy, + serviceDiscoveryEventEmitter, + dualReadStateManager, + xdsExecutorService, + xdsStreamReadyTimeout, + dualReadNewLbExecutor, + xdsChannelLoadBalancingPolicy, + xdsChannelLoadBalancingPolicyConfig, + subscribeToUriGlobCollection, + xdsServerMetricsProvider, + new NoOpXdsClientOtelMetricsProvider(), + loadBalanceStreamException, + xdsInitialResourceVersionsEnabled, + disableDetectLiRawD2Client, + isLiRawD2Client, + xdsStreamMaxRetryBackoffSeconds, + xdsChannelKeepAliveTimeMins, + xdsMinimumJavaVersion, + actionOnPrecheckFailure, + d2CalleeInfoRecorder, + enableIndisDownstreamServicesFetcher, + indisDownstreamServicesFetchTimeout); + } + + D2ClientConfig(String zkHosts, + String xdsServer, + String hostName, + long zkSessionTimeoutInMs, + long zkStartupTimeoutInMs, + long lbWaitTimeout, + TimeUnit lbWaitUnit, + String flagFile, + String basePath, + String fsBasePath, + String indisFsBasePath, + ComponentFactory componentFactory, + Map clientFactories, + LoadBalancerWithFacilitiesFactory lbWithFacilitiesFactory, + SSLContext sslContext, + SslContext grpcSslContext, + SSLParameters sslParameters, + boolean isSSLEnabled, + boolean shutdownAsynchronously, + boolean isSymlinkAware, + Map> clientServicesConfig, + String d2ServicePath, + boolean useNewEphemeralStoreWatcher, + HealthCheckOperations healthCheckOperations, + ScheduledExecutorService executorService, + boolean retry, + boolean restRetryEnabled, + boolean streamRetryEnabled, + int retryLimit, + long retryUpdateIntervalMs, + int retryAggregatedIntervalNum, + boolean warmUp, + int warmUpTimeoutSeconds, + int indisWarmUpTimeoutSeconds, + int warmUpConcurrentRequests, + int indisWarmUpConcurrentRequests, + DownstreamServicesFetcher downstreamServicesFetcher, + DownstreamServicesFetcher indisDownstreamServicesFetcher, + boolean backupRequestsEnabled, + BackupRequestsStrategyStatsConsumer backupRequestsStrategyStatsConsumer, + long backupRequestsLatencyNotificationInterval, + TimeUnit backupRequestsLatencyNotificationIntervalUnit, + boolean enableBackupRequestsClientAsync, + ScheduledExecutorService backupRequestsExecutorService, + EventEmitter emitter, + PartitionAccessorRegistry partitionAccessorRegistry, + Function zooKeeperDecorator, + boolean enableSaveUriDataOnDisk, + Map> loadBalancerStrategyFactories, + boolean requestTimeoutHandlerEnabled, + SslSessionValidatorFactory sslSessionValidatorFactory, + ZKPersistentConnection zkConnection, + ScheduledExecutorService startUpExecutorService, + ScheduledExecutorService indisStartUpExecutorService, + JmxManager jmxManager, + String d2JmxManagerPrefix, + int zookeeperReadWindowMs, + boolean enableRelativeLoadBalancer, + DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider, + CanaryDistributionProvider canaryDistributionProvider, + boolean enableClusterFailout, + FailoutConfigProviderFactory failoutConfigProviderFactory, + FailoutRedirectStrategy failoutRedirectStrategy, + ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter, + DualReadStateManager dualReadStateManager, + ScheduledExecutorService xdsExecutorService, + Long xdsStreamReadyTimeout, + ExecutorService dualReadNewLbExecutor, + String xdsChannelLoadBalancingPolicy, + Map xdsChannelLoadBalancingPolicyConfig, + boolean subscribeToUriGlobCollection, + XdsServerMetricsProvider xdsServerMetricsProvider, + XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, + boolean loadBalanceStreamException, + boolean xdsInitialResourceVersionsEnabled, + boolean disableDetectLiRawD2Client, + boolean isLiRawD2Client, + Integer xdsStreamMaxRetryBackoffSeconds, + Long xdsChannelKeepAliveTimeMins, + String xdsMinimumJavaVersion, + XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure, + D2CalleeInfoRecorder d2CalleeInfoRecorder, + Boolean enableIndisDownstreamServicesFetcher, + Duration indisDownstreamServicesFetchTimeout) { this.zkHosts = zkHosts; this.xdsServer = xdsServer; @@ -367,6 +509,7 @@ public D2ClientConfig() this.xdsChannelKeepAliveTimeMins = xdsChannelKeepAliveTimeMins; this.subscribeToUriGlobCollection = subscribeToUriGlobCollection; this._xdsServerMetricsProvider = xdsServerMetricsProvider; + this._xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider; this.loadBalanceStreamException = loadBalanceStreamException; this.xdsInitialResourceVersionsEnabled = xdsInitialResourceVersionsEnabled; this.disableDetectLiRawD2Client = disableDetectLiRawD2Client; diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java index 7795d8c80f..f640f5bd9a 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java @@ -306,6 +306,11 @@ public void registerXdsClientJmx(XdsClientJmx xdsClientJmx) _log.warn("Setting XdsClientJmx for Non-XDS source type: {}", _discoverySourceType); } final String jmxName = String.format("%s-XdsClientJmx", getGlobalPrefix(null)); + + // Get the client name from global prefix + String clientName = getGlobalPrefix(null); + xdsClientJmx.setClientName(clientName); + _jmxManager.registerXdsClientJmxBean(jmxName, xdsClientJmx); } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/NoOpXdsClientOtelMetricsProvider.java b/d2/src/main/java/com/linkedin/d2/jmx/NoOpXdsClientOtelMetricsProvider.java new file mode 100644 index 0000000000..bc5933dc28 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/NoOpXdsClientOtelMetricsProvider.java @@ -0,0 +1,63 @@ +package com.linkedin.d2.jmx; + +/** + * No-Op implementation of XdsClientOtelMetricsProvider. + * Used when OpenTelemetry metrics are disabled. + */ +public class NoOpXdsClientOtelMetricsProvider implements XdsClientOtelMetricsProvider { + + @Override + public void recordConnectionLost(String clientName) { + // No-op + } + + @Override + public void recordConnectionClosed(String clientName) { + // No-op + } + + @Override + public void recordReconnection(String clientName) { + // No-op + } + + @Override + public void recordRequestSent(String clientName) { + // No-op + } + + @Override + public void recordResponseReceived(String clientName) { + // No-op + } + + @Override + public void recordInitialResourceVersionSent(String clientName, int count) { + // No-op + } + + @Override + public void recordResourceNotFound(String clientName) { + // No-op + } + + @Override + public void recordResourceInvalid(String clientName) { + // No-op + } + + @Override + public void recordServerLatency(String clientName, long latencyMs) { + // No-op + } + + @Override + public void updateConnectionState(String clientName, boolean isConnected) { + // No-op + } + + @Override + public void updateActiveInitialWaitTime(String clientName, long waitTimeMs) { + // No-op + } +} \ No newline at end of file diff --git a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java index 0b0f643293..c406e1b152 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java @@ -37,18 +37,39 @@ public class XdsClientJmx implements XdsClientJmxMBean private final AtomicInteger _resourceNotFoundCount = new AtomicInteger(); private final AtomicInteger _resourceInvalidCount = new AtomicInteger(); private final XdsServerMetricsProvider _xdsServerMetricsProvider; + private final XdsClientOtelMetricsProvider _xdsClientOtelMetricsProvider; + + private String _clientName = "-"; + @Nullable private XdsClientImpl _xdsClient = null; @Deprecated public XdsClientJmx() { - this(new NoOpXdsServerMetricsProvider()); + this(new NoOpXdsServerMetricsProvider(), null); } public XdsClientJmx(XdsServerMetricsProvider xdsServerMetricsProvider) + { + this(xdsServerMetricsProvider, null); + } + + public XdsClientJmx(XdsServerMetricsProvider xdsServerMetricsProvider, + XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider) { _xdsServerMetricsProvider = xdsServerMetricsProvider == null ? new NoOpXdsServerMetricsProvider() : xdsServerMetricsProvider; + _xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider == null ? + new NoOpXdsClientOtelMetricsProvider() : xdsClientOtelMetricsProvider; + } + + // Method to set client name (called from D2ClientJmxManager) + public void setClientName(String clientName) { + _clientName = clientName; + } + + public String getClientName() { + return _clientName; } public void setXdsClient(XdsClientImpl xdsClient) @@ -146,55 +167,66 @@ public int isDisconnected() @Override public long getActiveInitialWaitTimeMillis() { + long waitTime = -1; if (_xdsClient != null) { - return _xdsClient.getActiveInitialWaitTimeMillis(); + waitTime = _xdsClient.getActiveInitialWaitTimeMillis(); + _xdsClientOtelMetricsProvider.updateActiveInitialWaitTime(_clientName, waitTime); } - return -1; + return waitTime; } public void incrementConnectionLostCount() { _connectionLostCount.incrementAndGet(); + _xdsClientOtelMetricsProvider.recordConnectionLost(_clientName); } public void incrementConnectionClosedCount() { _connectionClosedCount.incrementAndGet(); + _xdsClientOtelMetricsProvider.recordConnectionClosed(_clientName); } public void incrementReconnectionCount() { _reconnectionCount.incrementAndGet(); + _xdsClientOtelMetricsProvider.recordReconnection(_clientName); } public void incrementRequestSentCount() { _resquestSentCount.incrementAndGet(); + _xdsClientOtelMetricsProvider.recordRequestSent(_clientName); } public void addToIrvSentCount(int delta) { _irvSentCount.addAndGet(delta); + _xdsClientOtelMetricsProvider.recordInitialResourceVersionSent(_clientName, delta); } public void incrementResponseReceivedCount() { _responseReceivedCount.incrementAndGet(); + _xdsClientOtelMetricsProvider.recordResponseReceived(_clientName); } public void setIsConnected(boolean connected) { _isConnected.getAndSet(connected); + _xdsClientOtelMetricsProvider.updateConnectionState(_clientName, connected); } public void incrementResourceNotFoundCount() { _resourceNotFoundCount.incrementAndGet(); + _xdsClientOtelMetricsProvider.recordResourceNotFound(_clientName); } public void incrementResourceInvalidCount() { _resourceInvalidCount.incrementAndGet(); + _xdsClientOtelMetricsProvider.recordResourceInvalid(_clientName); } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java new file mode 100644 index 0000000000..5e57d5f5ef --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/XdsClientOtelMetricsProvider.java @@ -0,0 +1,88 @@ +package com.linkedin.d2.jmx; + +/** + * Interface for OpenTelemetry metrics collection for XDS Client. + */ +public interface XdsClientOtelMetricsProvider { + + /** + * Records a connection lost event in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ + void recordConnectionLost(String clientName); + + /** + * Records a connection closed event in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ + void recordConnectionClosed(String clientName); + + /** + * Records a reconnection event in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ + void recordReconnection(String clientName); + + /** + * Records a request sent event in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ + void recordRequestSent(String clientName); + + /** + * Records a response received event in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ + void recordResponseReceived(String clientName); + + /** + * Records initial resource version sent count in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + * @param count the count to add + */ + void recordInitialResourceVersionSent(String clientName, int count); + + /** + * Records a resource not found error in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ + void recordResourceNotFound(String clientName); + + /** + * Records a resource invalid error in the OpenTelemetry counter. + * + * @param clientName the name of the XDS client + */ + void recordResourceInvalid(String clientName); + + /** + * Records server latency in the OpenTelemetry histogram. + * + * @param clientName the name of the XDS client + * @param latencyMs the latency in milliseconds + */ + void recordServerLatency(String clientName, long latencyMs); + + /** + * Updates the connection state for a client. + * + * @param clientName the name of the XDS client + * @param isConnected whether the client is connected + */ + void updateConnectionState(String clientName, boolean isConnected); + + /** + * Updates the active initial wait time for a client. + * + * @param clientName the name of the XDS client + * @param waitTimeMs the wait time in milliseconds + */ + void updateActiveInitialWaitTime(String clientName, long waitTimeMs); +} diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index e4faa58326..b05dee51ed 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -23,8 +23,10 @@ import com.google.common.collect.Maps; import com.google.protobuf.util.Timestamps; import com.google.rpc.Code; +import com.linkedin.d2.jmx.NoOpXdsClientOtelMetricsProvider; import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider; import com.linkedin.d2.jmx.XdsClientJmx; +import com.linkedin.d2.jmx.XdsClientOtelMetricsProvider; import com.linkedin.d2.jmx.XdsServerMetricsProvider; import com.linkedin.d2.xds.GlobCollectionUtils.D2UriIdentifier; import com.linkedin.util.RateLimitedLogger; @@ -82,6 +84,7 @@ public class XdsClientImpl extends XdsClient new RateLimitedLogger(_log, TimeUnit.MINUTES.toMillis(1), SystemClock.instance()); public static final long DEFAULT_READY_TIMEOUT_MILLIS = 2000L; public static final Integer DEFAULT_MAX_RETRY_BACKOFF_SECS = 30; // default value for max retry backoff seconds + private static final String NO_VALUE = "-"; /** * The resource subscribers maps the resource type to its subscribers. Note that the {@link ResourceType#D2_URI} @@ -115,6 +118,7 @@ public class XdsClientImpl extends XdsClient private final XdsClientJmx _xdsClientJmx; private final XdsServerMetricsProvider _serverMetricsProvider; + private final XdsClientOtelMetricsProvider _xdsClientOtelMetricsProvider; private final boolean _initialResourceVersionsEnabled; private final String _minimumJavaVersion; private final XdsClientValidator.ActionOnPrecheckFailure _actionOnPrecheckFailure; @@ -200,6 +204,22 @@ public XdsClientImpl(Node node, Integer maxRetryBackoffSeconds, String minimumJavaVersion, XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure) + { + this(node, managedChannel, executorService, readyTimeoutMillis, subscribeToUriGlobCollection, + serverMetricsProvider, null, irvSupport, maxRetryBackoffSeconds, XdsClientValidator.DEFAULT_MINIMUM_JAVA_VERSION, XdsClientValidator.DEFAULT_ACTION_ON_PRECHECK_FAILURE); + } + + public XdsClientImpl(Node node, + ManagedChannel managedChannel, + ScheduledExecutorService executorService, + long readyTimeoutMillis, + boolean subscribeToUriGlobCollection, + XdsServerMetricsProvider serverMetricsProvider, + XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, + boolean irvSupport, + Integer maxRetryBackoffSeconds, + String minimumJavaVersion, + XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure) { _readyTimeoutMillis = readyTimeoutMillis; _node = node; @@ -212,8 +232,9 @@ public XdsClientImpl(Node node, _log.info("Glob collection support enabled"); } - _xdsClientJmx = new XdsClientJmx(serverMetricsProvider); + _xdsClientJmx = new XdsClientJmx(serverMetricsProvider, xdsClientOtelMetricsProvider); _serverMetricsProvider = serverMetricsProvider == null ? new NoOpXdsServerMetricsProvider() : serverMetricsProvider; + _xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider == null ? new NoOpXdsClientOtelMetricsProvider() : xdsClientOtelMetricsProvider; _initialResourceVersionsEnabled = irvSupport; if (_initialResourceVersionsEnabled) { @@ -678,7 +699,7 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) || uriSubscriber.getData() == null // The URI was corrupted and there was no previous version of this URI ) { - uriSubscriber.onData(new D2URIUpdate(uri), _serverMetricsProvider); + uriSubscriber.onData(new D2URIUpdate(uri), _serverMetricsProvider, _xdsClientOtelMetricsProvider, _xdsClientJmx.getClientName()); } } @@ -786,6 +807,7 @@ private void processResourceChanges(ResourceType type, Map updates, ResourceType type) { + String clientName = _xdsClientJmx.getClientName(); Map subscribers = getResourceSubscriberMap(type); WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); @@ -794,12 +816,12 @@ private void handleResourceUpdate(Map updates, ResourceSubscriber subscriber = subscribers.get(entry.getKey()); if (subscriber != null) { - subscriber.onData(entry.getValue(), _serverMetricsProvider); + subscriber.onData(entry.getValue(), _serverMetricsProvider, _xdsClientOtelMetricsProvider, clientName); } if (wildcardSubscriber != null) { - wildcardSubscriber.onData(entry.getKey(), entry.getValue(), _serverMetricsProvider); + wildcardSubscriber.onData(entry.getKey(), entry.getValue(), _serverMetricsProvider, _xdsClientOtelMetricsProvider, clientName); } } } @@ -996,6 +1018,12 @@ void addWatcher(ResourceWatcher watcher) @VisibleForTesting void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider) + { + onData(data, metricsProvider, new NoOpXdsClientOtelMetricsProvider(), NO_VALUE); + } + + @VisibleForTesting + void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider, XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, String clientName) { SubscriberFetchState prev = _fetchState.getAndSet(FETCHED); if (!FETCHED.equals(prev)) @@ -1009,7 +1037,8 @@ void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider) { // Even though the data is the same, the subscriber is waiting for init data after either startup // or a reconnection, so we need to track latency. - trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev); + // data updated, track xds server latency + trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev, xdsClientOtelMetricsProvider, clientName); } _log.debug("Received resource update data equal to the current data. Will not perform any update."); return; @@ -1018,7 +1047,7 @@ void onData(ResourceUpdate data, XdsServerMetricsProvider metricsProvider) // null value guard to avoid overwriting the property with null if (data != null && data.isValid()) { - trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev); + trackServerLatency(data, _data, metricsProvider, _subscribedAt.get(), _isIrvEnabled, prev, xdsClientOtelMetricsProvider, clientName); _data = data; } else @@ -1191,6 +1220,13 @@ void addWatcher(WildcardResourceWatcher watcher) @VisibleForTesting void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider metricsProvider) + { + onData(resourceName, data, metricsProvider, new NoOpXdsClientOtelMetricsProvider(), NO_VALUE); + } + + @VisibleForTesting + void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider metricsProvider, + XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, String clientName) { if (Objects.equals(_data.get(resourceName), data)) { @@ -1198,7 +1234,8 @@ void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider m { // Even though the data is the same, the subscriber is waiting for init data after either startup // or a reconnection, so we need to track latency. - trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get()); + // we are passing OpenTelemetry provider for wildcard subscribers too + trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get(), xdsClientOtelMetricsProvider, clientName); } _log.debug("Received resource update data equal to the current data. Will not perform the update."); return; @@ -1206,7 +1243,7 @@ void onData(String resourceName, ResourceUpdate data, XdsServerMetricsProvider m // null value guard to avoid overwriting the property with null if (data != null && data.isValid()) { - trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get()); + trackServerLatency(data, _data.get(resourceName), metricsProvider, _subscribedAt.get(), _isIrvEnabled, _fetchState.get(), xdsClientOtelMetricsProvider, clientName); _data.put(resourceName, data); } else @@ -1345,6 +1382,13 @@ private boolean shouldSubscribeUriGlobCollection(ResourceType type) private static void trackServerLatency(ResourceUpdate resourceUpdate, ResourceUpdate currentData, XdsServerMetricsProvider metricsProvider, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState) + { + trackServerLatency(resourceUpdate, currentData, metricsProvider, subscribedAt, isIrvEnabled, fetchState, new NoOpXdsClientOtelMetricsProvider(), NO_VALUE); + } + + private static void trackServerLatency(ResourceUpdate resourceUpdate, ResourceUpdate currentData, + XdsServerMetricsProvider metricsProvider, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState, + XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, String clientName) { long now = SystemClock.instance().currentTimeMillis(); if (resourceUpdate instanceof NodeUpdate) @@ -1355,7 +1399,7 @@ private static void trackServerLatency(ResourceUpdate resourceUpdate, ResourceUp return; } trackServerLatencyHelper(metricsProvider, now, nodeData.getStat().getMtime(), subscribedAt, - isIrvEnabled, fetchState); + isIrvEnabled, fetchState, xdsClientOtelMetricsProvider, clientName); } else if (resourceUpdate instanceof D2URIMapUpdate) { @@ -1370,9 +1414,9 @@ else if (resourceUpdate instanceof D2URIMapUpdate) Map.Entry::getKey, e -> e.getValue().leftValue()) // new data of updated uris ); - trackServerLatencyForUris(updatedUris, update, metricsProvider, now, subscribedAt, isIrvEnabled, fetchState); + trackServerLatencyForUris(updatedUris, update, metricsProvider, now, subscribedAt, isIrvEnabled, fetchState, xdsClientOtelMetricsProvider, clientName); trackServerLatencyForUris(rawDiff.entriesOnlyOnLeft(), update, metricsProvider, now, subscribedAt, - isIrvEnabled, fetchState); // newly added uris + isIrvEnabled, fetchState, xdsClientOtelMetricsProvider, clientName); // newly added uris } else if (resourceUpdate instanceof D2URIUpdate) { @@ -1382,7 +1426,7 @@ else if (resourceUpdate instanceof D2URIUpdate) { update.setIsStaleModifiedTime( trackServerLatencyHelper(metricsProvider, now, Timestamps.toMillis(uri.getModifiedTime()), subscribedAt, - isIrvEnabled, fetchState) + isIrvEnabled, fetchState, xdsClientOtelMetricsProvider, clientName) ); } } @@ -1391,10 +1435,17 @@ else if (resourceUpdate instanceof D2URIUpdate) private static void trackServerLatencyForUris(Map uriMap, D2URIMapUpdate update, XdsServerMetricsProvider metricsProvider, long end, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState) + { + trackServerLatencyForUris(uriMap, update, metricsProvider, end, subscribedAt, isIrvEnabled, fetchState, new NoOpXdsClientOtelMetricsProvider(), NO_VALUE); + } + + private static void trackServerLatencyForUris(Map uriMap, D2URIMapUpdate update, + XdsServerMetricsProvider metricsProvider, long end, long subscribedAt, boolean isIrvEnabled, + SubscriberFetchState fetchState, XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, String clientName) { uriMap.forEach((k, v) -> { boolean isStaleModifiedTime = trackServerLatencyHelper(metricsProvider, end, Timestamps.toMillis(v.getModifiedTime()), subscribedAt, - isIrvEnabled, fetchState); + isIrvEnabled, fetchState, xdsClientOtelMetricsProvider, clientName); update.setIsStaleModifiedTime(k, isStaleModifiedTime); } ); @@ -1410,6 +1461,13 @@ private static void trackServerLatencyForUris(Map uriMap, D // on the resource modified time. private static boolean trackServerLatencyHelper(XdsServerMetricsProvider metricsProvider, long end, long modifiedAt, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState) + { + return trackServerLatencyHelper(metricsProvider, end, modifiedAt, subscribedAt, isIrvEnabled, fetchState, new NoOpXdsClientOtelMetricsProvider(), NO_VALUE); + } + + private static boolean trackServerLatencyHelper(XdsServerMetricsProvider metricsProvider, + long end, long modifiedAt, long subscribedAt, boolean isIrvEnabled, SubscriberFetchState fetchState, + XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider, String clientName) { long start; boolean isStaleModifiedAt; @@ -1423,7 +1481,12 @@ private static boolean trackServerLatencyHelper(XdsServerMetricsProvider metrics start = Math.max(modifiedAt, subscribedAt); isStaleModifiedAt = modifiedAt < subscribedAt; } - metricsProvider.trackLatency(end - start); + long latency = end - start; + metricsProvider.trackLatency(latency); + + // Record OpenTelemetry latency + xdsClientOtelMetricsProvider.recordServerLatency(clientName, latency); + return isStaleModifiedAt; } diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index 4c77c81433..2d014fede4 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -69,6 +69,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) xdsStreamReadyTimeout, config.subscribeToUriGlobCollection, config._xdsServerMetricsProvider, + config._xdsClientOtelMetricsProvider, config.xdsInitialResourceVersionsEnabled, config.xdsStreamMaxRetryBackoffSeconds, config.xdsMinimumJavaVersion, diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index 57c27f34aa..56e2aab7ca 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -8,6 +8,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.util.Timestamps; import com.linkedin.d2.jmx.XdsClientJmx; +import com.linkedin.d2.jmx.XdsClientOtelMetricsProvider; import com.linkedin.d2.jmx.XdsServerMetricsProvider; import com.linkedin.d2.xds.XdsClient.D2URIMapUpdate; import com.linkedin.d2.xds.XdsClient.ResourceType; @@ -541,8 +542,8 @@ public void testHandleD2URIMapUpdateWithEmptyResponse() { fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_WITH_EMPTY_URI_MAP_RESPONSE); fixture.verifyAckSent(2); // onData is called only once. Empty response does not trigger onData calls. - verify(fixture._clusterSubscriber).onData(any(), any()); - verify(fixture._uriMapWildcardSubscriber).onData(any(), any(), any()); + verify(fixture._clusterSubscriber).onData(any(), any(), any(), any()); + verify(fixture._uriMapWildcardSubscriber).onData(any(), any(), any(), any(), any()); } @Test(dataProvider = "providerWatcherFlags") @@ -1262,6 +1263,8 @@ private static final class XdsClientImplFixture { @Mock XdsServerMetricsProvider _serverMetricsProvider; @Mock + XdsClientOtelMetricsProvider _xdsClientOtelMetricsProvider; + @Mock Clock _clock; @Captor @@ -1295,6 +1298,7 @@ private static final class XdsClientImplFixture { doNothing().when(_resourceWatcher).onChanged(any()); doNothing().when(_wildcardResourceWatcher).onChanged(any(), any()); doNothing().when(_serverMetricsProvider).trackLatency(anyLong()); + doNothing().when(_xdsClientOtelMetricsProvider).recordServerLatency(anyString(), anyLong()); for (ResourceSubscriber subscriber : Lists.newArrayList(_nodeSubscriber, _clusterSubscriber, _d2UriSubscriber, _calleesSubscriber)) { @@ -1311,7 +1315,7 @@ private static final class XdsClientImplFixture { _executorService = spy(Executors.newScheduledThreadPool(1)); _xdsClientImpl = spy(new XdsClientImpl(null, mock(ManagedChannel.class), _executorService, 0, useGlobCollections, - _serverMetricsProvider, useIRV)); + _serverMetricsProvider, _xdsClientOtelMetricsProvider, useIRV, null, null, null)); _xdsClientImpl._adsStream = _adsStream; doNothing().when(_xdsClientImpl).startRpcStreamLocal();