Skip to content

Commit

Permalink
Adding gRPC server for cluster mode (#3572)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Oct 8, 2023
1 parent 8e02e6e commit 3fc0a7e
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@
import org.apache.skywalking.oap.server.library.module.ModuleConfig;

public class CoreModuleConfig extends ModuleConfig {
private String gRPCHost;
private int gRPCPort;
private boolean gRPCSslEnabled = false;
private String gRPCSslKeyPath;
private String gRPCSslCertChainPath;
private String gRPCSslTrustedCAPath;
private int gRPCThreadPoolSize;
private int gRPCThreadPoolQueueSize;
private int gRPCMaxConcurrentCallsPerConnection;
private int gRPCMaxMessageSize;

/**
* The max length of the service name.
*/
Expand Down Expand Up @@ -195,4 +206,84 @@ public int getRemoteTimeout() {
public void setRemoteTimeout(int remoteTimeout) {
this.remoteTimeout = remoteTimeout;
}

public String getGRPCHost() {
return gRPCHost;
}

public void setGRPCHost(String gRPCHost) {
this.gRPCHost = gRPCHost;
}

public int getGRPCPort() {
return gRPCPort;
}

public void setGRPCPort(int gRPCPort) {
this.gRPCPort = gRPCPort;
}

public boolean getGRPCSslEnabled() {
return gRPCSslEnabled;
}

public void setGRPCSslEnabled(boolean gRPCSslEnabled) {
this.gRPCSslEnabled = gRPCSslEnabled;
}

public String getGRPCSslKeyPath() {
return gRPCSslKeyPath;
}

public void setGRPCSslKeyPath(String gRPCSslKeyPath) {
this.gRPCSslKeyPath = gRPCSslKeyPath;
}

public String getGRPCSslCertChainPath() {
return gRPCSslCertChainPath;
}

public void setGRPCSslCertChainPath(String gRPCSslCertChainPath) {
this.gRPCSslCertChainPath = gRPCSslCertChainPath;
}

public String getGRPCSslTrustedCAPath() {
return gRPCSslTrustedCAPath;
}

public void setGRPCSslTrustedCAPath(String gRPCSslTrustedCAPath) {
this.gRPCSslTrustedCAPath = gRPCSslTrustedCAPath;
}

public int getGRPCThreadPoolSize() {
return gRPCThreadPoolSize;
}

public void setGRPCThreadPoolSize(int gRPCThreadPoolSize) {
this.gRPCThreadPoolSize = gRPCThreadPoolSize;
}

public int getGRPCThreadPoolQueueSize() {
return gRPCThreadPoolQueueSize;
}

public void setGRPCThreadPoolQueueSize(int gRPCThreadPoolQueueSize) {
this.gRPCThreadPoolQueueSize = gRPCThreadPoolQueueSize;
}

public int getGRPCMaxConcurrentCallsPerConnection() {
return gRPCMaxConcurrentCallsPerConnection;
}

public void setGRPCMaxConcurrentCallsPerConnection(int gRPCMaxConcurrentCallsPerConnection) {
this.gRPCMaxConcurrentCallsPerConnection = gRPCMaxConcurrentCallsPerConnection;
}

public int getGRPCMaxMessageSize() {
return gRPCMaxMessageSize;
}

public void setGRPCMaxMessageSize(int gRPCMaxMessageSize) {
this.gRPCMaxMessageSize = gRPCMaxMessageSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
package zipkin.server.core;

import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressAliasCache;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.command.CommandService;
import org.apache.skywalking.oap.server.core.config.ConfigService;
import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
Expand Down Expand Up @@ -48,8 +52,12 @@
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.remote.health.HealthCheckServiceHandler;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
import org.apache.skywalking.oap.server.core.source.DefaultScopeDefine;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
Expand All @@ -69,8 +77,10 @@
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
import zipkin.server.core.services.EmptyComponentLibraryCatalogService;
import zipkin.server.core.services.EmptyGRPCHandlerRegister;
import zipkin.server.core.services.EmptyHTTPHandlerRegister;
import zipkin.server.core.services.EmptyNetworkAddressAliasCache;
import zipkin.server.core.services.ZipkinConfigService;
Expand All @@ -85,6 +95,8 @@ public class CoreModuleProvider extends ModuleProvider {
private final ZipkinSourceReceiverImpl receiver;
private final AnnotationScan annotationScan;
private final StorageModels storageModels;
private RemoteClientManager remoteClientManager;
private GRPCServer grpcServer;

public CoreModuleProvider() {
this.annotationScan = new AnnotationScan();
Expand Down Expand Up @@ -138,12 +150,43 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
throw new ModuleStartException(e.getMessage(), e);
}

if (moduleConfig.getGRPCSslEnabled()) {
grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(),
moduleConfig.getGRPCSslCertChainPath(),
moduleConfig.getGRPCSslKeyPath(),
null
);
} else {
grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
}
if (moduleConfig.getGRPCMaxConcurrentCallsPerConnection() > 0) {
grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getGRPCMaxConcurrentCallsPerConnection());
}
if (moduleConfig.getGRPCMaxMessageSize() > 0) {
grpcServer.setMaxMessageSize(moduleConfig.getGRPCMaxMessageSize());
}
if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
}
if (moduleConfig.getGRPCThreadPoolSize() > 0) {
grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
}
grpcServer.initialize();

if (moduleConfig.getGRPCSslEnabled()) {
this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout(),
moduleConfig.getGRPCSslTrustedCAPath()
);
} else {
this.remoteClientManager = new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout());
}

final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig();
this.registerServiceImplementation(MeterSystem.class, new MeterSystem(getManager()));
this.registerServiceImplementation(ConfigService.class, new ZipkinConfigService(moduleConfig, this));
this.registerServiceImplementation(ServerStatusService.class, new ServerStatusService(getManager()));
this.registerServiceImplementation(DownSamplingConfigService.class, new DownSamplingConfigService(Collections.emptyList()));
this.registerServiceImplementation(GRPCHandlerRegister.class, new EmptyGRPCHandlerRegister());
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
this.registerServiceImplementation(HTTPHandlerRegister.class, new EmptyHTTPHandlerRegister());
this.registerServiceImplementation(IComponentLibraryCatalogService.class, new EmptyComponentLibraryCatalogService());
this.registerServiceImplementation(SourceReceiver.class, receiver);
Expand Down Expand Up @@ -177,7 +220,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
this.registerServiceImplementation(ContinuousProfilingQueryService.class, new ContinuousProfilingQueryService(getManager()));
this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));
this.registerServiceImplementation(OALEngineLoaderService.class, new OALEngineLoaderService(getManager()));
this.registerServiceImplementation(RemoteClientManager.class, new RemoteClientManager(getManager(), moduleConfig.getRemoteTimeout()));
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
this.registerServiceImplementation(UITemplateManagementService.class, new UITemplateManagementService(getManager()));
this.registerServiceImplementation(UIMenuManagementService.class, new UIMenuManagementService(getManager(), swConfig));

Expand All @@ -198,16 +241,39 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {

@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
grpcServer.addHandler(new RemoteServiceHandler(getManager()));
grpcServer.addHandler(new HealthCheckServiceHandler());

try {
receiver.scan();
annotationScan.scan();
} catch (IOException | IllegalAccessException | InstantiationException | StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}

Address gRPCServerInstanceAddress = new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true);
TelemetryRelatedContext.INSTANCE.setId(gRPCServerInstanceAddress.toString());
ClusterCoordinator coordinator = this.getManager()
.find(ClusterModule.NAME)
.provider()
.getService(ClusterCoordinator.class);
coordinator.registerWatcher(remoteClientManager);
coordinator.start();
RemoteInstance gRPCServerInstance = new RemoteInstance(gRPCServerInstanceAddress);
coordinator.registerRemote(gRPCServerInstance);
}

@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
try {
if (!RunningMode.isInitMode()) {
grpcServer.start();
remoteClientManager.start();
}
} catch (ServerException e) {
throw new ModuleStartException(e.getMessage(), e);
}

final org.apache.skywalking.oap.server.core.CoreModuleConfig swConfig = this.moduleConfig.toSkyWalkingConfig();
PersistenceTimer.INSTANCE.start(getManager(), swConfig);
DataTTLKeeperTimer.INSTANCE.start(getManager(), swConfig);
Expand Down
10 changes: 10 additions & 0 deletions zipkin-server/server-starter/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ core:
prepareThreads: ${ZIPKIN_PREPARE_THREADS:2}
# The period of doing data persistence. Unit is second.Default value is 25s
persistentPeriod: ${ZIPKIN_PERSISTENT_PERIOD:25}
gRPCHost: ${ZIPKIN_GRPC_HOST:0.0.0.0}
gRPCPort: ${ZIPKIN_GRPC_PORT:11800}
gRPCThreadPoolQueueSize: ${ZIPKIN_GRPC_POOL_QUEUE_SIZE:-1}
gRPCThreadPoolSize: ${ZIPKIN_GRPC_THREAD_POOL_SIZE:-1}
gRPCSslEnabled: ${ZIPKIN_GRPC_SSL_ENABLED:false}
gRPCSslKeyPath: ${ZIPKIN_GRPC_SSL_KEY_PATH:""}
gRPCSslCertChainPath: ${ZIPKIN_GRPC_SSL_CERT_CHAIN_PATH:""}
gRPCSslTrustedCAPath: ${ZIPKIN_GRPC_SSL_TRUSTED_CA_PATH:""}
gRPCMaxConcurrentCallsPerConnection: ${ZIPKIN_GRPC_MAX_CONCURRENT_CALL:0}
gRPCMaxMessageSize: ${ZIPKIN_GRPC_MAX_MESSAGE_SIZE:0}

storage:
selector: ${ZIPKIN_STORAGE:h2}
Expand Down

0 comments on commit 3fc0a7e

Please sign in to comment.