Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.linkedin.d2.discovery.stores.zk.ZKPersistentConnection;
import com.linkedin.d2.discovery.stores.zk.ZooKeeper;
import com.linkedin.d2.jmx.XdsServerMetricsProvider;
import com.linkedin.d2.jmx.XdsClientOtelMetricsProvider;
import com.linkedin.d2.jmx.JmxManager;
import com.linkedin.d2.xds.XdsClientValidator;
import com.linkedin.d2.jmx.NoOpJmxManager;
Expand Down Expand Up @@ -237,6 +238,7 @@ public D2Client build()
_config.xdsChannelLoadBalancingPolicyConfig,
_config.subscribeToUriGlobCollection,
_config._xdsServerMetricsProvider,
_config._xdsClientOtelMetricsProvider,
_config.loadBalanceStreamException,
_config.xdsInitialResourceVersionsEnabled,
_config.disableDetectLiRawD2Client,
Expand Down Expand Up @@ -856,6 +858,11 @@ public D2ClientBuilder setXdsServerMetricsProvider(XdsServerMetricsProvider xdsS
return this;
}

public D2ClientBuilder setXdsClientOtelMetricsProvider(XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider) {
_config._xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider;
return this;
}

public D2ClientBuilder setLoadBalanceStreamException(boolean loadBalanceStreamException) {
_config.loadBalanceStreamException = loadBalanceStreamException;
return this;
Expand Down
143 changes: 143 additions & 0 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import com.linkedin.d2.jmx.JmxManager;
import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider;
import com.linkedin.d2.jmx.NoOpJmxManager;
import com.linkedin.d2.jmx.XdsClientOtelMetricsProvider;
import com.linkedin.d2.jmx.NoOpXdsClientOtelMetricsProvider;
import com.linkedin.r2.transport.common.TransportClientFactory;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import java.time.Duration;
Expand Down Expand Up @@ -181,6 +183,7 @@ public class D2ClientConfig

public boolean subscribeToUriGlobCollection = false;
public XdsServerMetricsProvider _xdsServerMetricsProvider = new NoOpXdsServerMetricsProvider();
public XdsClientOtelMetricsProvider _xdsClientOtelMetricsProvider = new NoOpXdsClientOtelMetricsProvider();
public boolean loadBalanceStreamException = false;
public boolean xdsInitialResourceVersionsEnabled = false;
public Integer xdsStreamMaxRetryBackoffSeconds = null;
Expand Down Expand Up @@ -293,6 +296,145 @@ public D2ClientConfig()
D2CalleeInfoRecorder d2CalleeInfoRecorder,
Boolean enableIndisDownstreamServicesFetcher,
Duration indisDownstreamServicesFetchTimeout)
{
this(zkHosts, xdsServer, hostName, zkSessionTimeoutInMs, zkStartupTimeoutInMs, lbWaitTimeout, lbWaitUnit,
flagFile, basePath, fsBasePath, indisFsBasePath, componentFactory, clientFactories, lbWithFacilitiesFactory,
sslContext, grpcSslContext, sslParameters, isSSLEnabled, shutdownAsynchronously, isSymlinkAware,
clientServicesConfig, d2ServicePath, useNewEphemeralStoreWatcher, healthCheckOperations, executorService,
retry, restRetryEnabled, streamRetryEnabled, retryLimit, retryUpdateIntervalMs, retryAggregatedIntervalNum,
warmUp, warmUpTimeoutSeconds, indisWarmUpTimeoutSeconds, warmUpConcurrentRequests,
indisWarmUpConcurrentRequests, downstreamServicesFetcher, indisDownstreamServicesFetcher,
backupRequestsEnabled, backupRequestsStrategyStatsConsumer,
backupRequestsLatencyNotificationInterval,
backupRequestsLatencyNotificationIntervalUnit,
enableBackupRequestsClientAsync,
backupRequestsExecutorService,
emitter,
partitionAccessorRegistry,
zooKeeperDecorator,
enableSaveUriDataOnDisk,
loadBalancerStrategyFactories,
requestTimeoutHandlerEnabled,
sslSessionValidatorFactory,
zkConnection,
startUpExecutorService,
indisStartUpExecutorService,
jmxManager,
d2JmxManagerPrefix,
zookeeperReadWindowMs,
enableRelativeLoadBalancer,
deterministicSubsettingMetadataProvider,
canaryDistributionProvider,
enableClusterFailout,
failoutConfigProviderFactory,
failoutRedirectStrategy,
serviceDiscoveryEventEmitter,
dualReadStateManager,
xdsExecutorService,
xdsStreamReadyTimeout,
dualReadNewLbExecutor,
xdsChannelLoadBalancingPolicy,
xdsChannelLoadBalancingPolicyConfig,
subscribeToUriGlobCollection,
xdsServerMetricsProvider,
new NoOpXdsClientOtelMetricsProvider(),
loadBalanceStreamException,
xdsInitialResourceVersionsEnabled,
disableDetectLiRawD2Client,
isLiRawD2Client,
xdsStreamMaxRetryBackoffSeconds,
xdsChannelKeepAliveTimeMins,
xdsMinimumJavaVersion,
actionOnPrecheckFailure,
d2CalleeInfoRecorder,
enableIndisDownstreamServicesFetcher,
indisDownstreamServicesFetchTimeout);
}

D2ClientConfig(String zkHosts,
String xdsServer,
String hostName,
long zkSessionTimeoutInMs,
long zkStartupTimeoutInMs,
long lbWaitTimeout,
TimeUnit lbWaitUnit,
String flagFile,
String basePath,
String fsBasePath,
String indisFsBasePath,
ComponentFactory componentFactory,
Map<String, TransportClientFactory> clientFactories,
LoadBalancerWithFacilitiesFactory lbWithFacilitiesFactory,
SSLContext sslContext,
SslContext grpcSslContext,
SSLParameters sslParameters,
boolean isSSLEnabled,
boolean shutdownAsynchronously,
boolean isSymlinkAware,
Map<String, Map<String, Object>> clientServicesConfig,
String d2ServicePath,
boolean useNewEphemeralStoreWatcher,
HealthCheckOperations healthCheckOperations,
ScheduledExecutorService executorService,
boolean retry,
boolean restRetryEnabled,
boolean streamRetryEnabled,
int retryLimit,
long retryUpdateIntervalMs,
int retryAggregatedIntervalNum,
boolean warmUp,
int warmUpTimeoutSeconds,
int indisWarmUpTimeoutSeconds,
int warmUpConcurrentRequests,
int indisWarmUpConcurrentRequests,
DownstreamServicesFetcher downstreamServicesFetcher,
DownstreamServicesFetcher indisDownstreamServicesFetcher,
boolean backupRequestsEnabled,
BackupRequestsStrategyStatsConsumer backupRequestsStrategyStatsConsumer,
long backupRequestsLatencyNotificationInterval,
TimeUnit backupRequestsLatencyNotificationIntervalUnit,
boolean enableBackupRequestsClientAsync,
ScheduledExecutorService backupRequestsExecutorService,
EventEmitter emitter,
PartitionAccessorRegistry partitionAccessorRegistry,
Function<ZooKeeper, ZooKeeper> zooKeeperDecorator,
boolean enableSaveUriDataOnDisk,
Map<String, LoadBalancerStrategyFactory<? extends LoadBalancerStrategy>> loadBalancerStrategyFactories,
boolean requestTimeoutHandlerEnabled,
SslSessionValidatorFactory sslSessionValidatorFactory,
ZKPersistentConnection zkConnection,
ScheduledExecutorService startUpExecutorService,
ScheduledExecutorService indisStartUpExecutorService,
JmxManager jmxManager,
String d2JmxManagerPrefix,
int zookeeperReadWindowMs,
boolean enableRelativeLoadBalancer,
DeterministicSubsettingMetadataProvider deterministicSubsettingMetadataProvider,
CanaryDistributionProvider canaryDistributionProvider,
boolean enableClusterFailout,
FailoutConfigProviderFactory failoutConfigProviderFactory,
FailoutRedirectStrategy failoutRedirectStrategy,
ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter,
DualReadStateManager dualReadStateManager,
ScheduledExecutorService xdsExecutorService,
Long xdsStreamReadyTimeout,
ExecutorService dualReadNewLbExecutor,
String xdsChannelLoadBalancingPolicy,
Map<String, ?> xdsChannelLoadBalancingPolicyConfig,
boolean subscribeToUriGlobCollection,
XdsServerMetricsProvider xdsServerMetricsProvider,
XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider,
boolean loadBalanceStreamException,
boolean xdsInitialResourceVersionsEnabled,
boolean disableDetectLiRawD2Client,
boolean isLiRawD2Client,
Integer xdsStreamMaxRetryBackoffSeconds,
Long xdsChannelKeepAliveTimeMins,
String xdsMinimumJavaVersion,
XdsClientValidator.ActionOnPrecheckFailure actionOnPrecheckFailure,
D2CalleeInfoRecorder d2CalleeInfoRecorder,
Boolean enableIndisDownstreamServicesFetcher,
Duration indisDownstreamServicesFetchTimeout)
{
this.zkHosts = zkHosts;
this.xdsServer = xdsServer;
Expand Down Expand Up @@ -367,6 +509,7 @@ public D2ClientConfig()
this.xdsChannelKeepAliveTimeMins = xdsChannelKeepAliveTimeMins;
this.subscribeToUriGlobCollection = subscribeToUriGlobCollection;
this._xdsServerMetricsProvider = xdsServerMetricsProvider;
this._xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider;
this.loadBalanceStreamException = loadBalanceStreamException;
this.xdsInitialResourceVersionsEnabled = xdsInitialResourceVersionsEnabled;
this.disableDetectLiRawD2Client = disableDetectLiRawD2Client;
Expand Down
5 changes: 5 additions & 0 deletions d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ public void registerXdsClientJmx(XdsClientJmx xdsClientJmx)
_log.warn("Setting XdsClientJmx for Non-XDS source type: {}", _discoverySourceType);
}
final String jmxName = String.format("%s-XdsClientJmx", getGlobalPrefix(null));

// Get the client name from global prefix
String clientName = getGlobalPrefix(null);
xdsClientJmx.setClientName(clientName);

_jmxManager.registerXdsClientJmxBean(jmxName, xdsClientJmx);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.linkedin.d2.jmx;

/**
* No-Op implementation of XdsClientOtelMetricsProvider.
* Used when OpenTelemetry metrics are disabled.
*/
public class NoOpXdsClientOtelMetricsProvider implements XdsClientOtelMetricsProvider {

@Override
public void recordConnectionLost(String clientName) {
// No-op
}

@Override
public void recordConnectionClosed(String clientName) {
// No-op
}

@Override
public void recordReconnection(String clientName) {
// No-op
}

@Override
public void recordRequestSent(String clientName) {
// No-op
}

@Override
public void recordResponseReceived(String clientName) {
// No-op
}

@Override
public void recordInitialResourceVersionSent(String clientName, int count) {
// No-op
}

@Override
public void recordResourceNotFound(String clientName) {
// No-op
}

@Override
public void recordResourceInvalid(String clientName) {
// No-op
}

@Override
public void recordServerLatency(String clientName, long latencyMs) {
// No-op
}

@Override
public void updateConnectionState(String clientName, boolean isConnected) {
// No-op
}

@Override
public void updateActiveInitialWaitTime(String clientName, long waitTimeMs) {
// No-op
}
}
38 changes: 35 additions & 3 deletions d2/src/main/java/com/linkedin/d2/jmx/XdsClientJmx.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,39 @@ public class XdsClientJmx implements XdsClientJmxMBean
private final AtomicInteger _resourceNotFoundCount = new AtomicInteger();
private final AtomicInteger _resourceInvalidCount = new AtomicInteger();
private final XdsServerMetricsProvider _xdsServerMetricsProvider;
private final XdsClientOtelMetricsProvider _xdsClientOtelMetricsProvider;

private String _clientName = "-";

@Nullable private XdsClientImpl _xdsClient = null;

@Deprecated
public XdsClientJmx()
{
this(new NoOpXdsServerMetricsProvider());
this(new NoOpXdsServerMetricsProvider(), null);
}

public XdsClientJmx(XdsServerMetricsProvider xdsServerMetricsProvider)
{
this(xdsServerMetricsProvider, null);
}

public XdsClientJmx(XdsServerMetricsProvider xdsServerMetricsProvider,
XdsClientOtelMetricsProvider xdsClientOtelMetricsProvider)
{
_xdsServerMetricsProvider = xdsServerMetricsProvider == null ?
new NoOpXdsServerMetricsProvider() : xdsServerMetricsProvider;
_xdsClientOtelMetricsProvider = xdsClientOtelMetricsProvider == null ?
new NoOpXdsClientOtelMetricsProvider() : xdsClientOtelMetricsProvider;
}

// Method to set client name (called from D2ClientJmxManager)
public void setClientName(String clientName) {
_clientName = clientName;
}

public String getClientName() {
return _clientName;
}

public void setXdsClient(XdsClientImpl xdsClient)
Expand Down Expand Up @@ -146,55 +167,66 @@ public int isDisconnected()
@Override
public long getActiveInitialWaitTimeMillis()
{
long waitTime = -1;
if (_xdsClient != null)
{
return _xdsClient.getActiveInitialWaitTimeMillis();
waitTime = _xdsClient.getActiveInitialWaitTimeMillis();
_xdsClientOtelMetricsProvider.updateActiveInitialWaitTime(_clientName, waitTime);
}
return -1;
return waitTime;
}

public void incrementConnectionLostCount()
{
_connectionLostCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordConnectionLost(_clientName);
}

public void incrementConnectionClosedCount()
{
_connectionClosedCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordConnectionClosed(_clientName);
}

public void incrementReconnectionCount()
{
_reconnectionCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordReconnection(_clientName);
}

public void incrementRequestSentCount()
{
_resquestSentCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordRequestSent(_clientName);
}

public void addToIrvSentCount(int delta)
{
_irvSentCount.addAndGet(delta);
_xdsClientOtelMetricsProvider.recordInitialResourceVersionSent(_clientName, delta);
}

public void incrementResponseReceivedCount()
{
_responseReceivedCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordResponseReceived(_clientName);
}

public void setIsConnected(boolean connected)
{
_isConnected.getAndSet(connected);
_xdsClientOtelMetricsProvider.updateConnectionState(_clientName, connected);
}

public void incrementResourceNotFoundCount()
{
_resourceNotFoundCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordResourceNotFound(_clientName);
}

public void incrementResourceInvalidCount()
{
_resourceInvalidCount.incrementAndGet();
_xdsClientOtelMetricsProvider.recordResourceInvalid(_clientName);
}
}
Loading