Skip to content

Commit

Permalink
Reduce port configuration and set all data collection handler bind to…
Browse files Browse the repository at this point in the history
… server port
  • Loading branch information
mrproliu committed Oct 31, 2023
1 parent 77845e8 commit b90970d
Show file tree
Hide file tree
Showing 13 changed files with 183 additions and 485 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,106 +17,4 @@
import org.apache.skywalking.oap.server.library.module.ModuleConfig;

public class OTLPTraceConfig extends ModuleConfig {
private String gRPCHost;
/**
* Only setting the real port(not 0) makes the gRPC server online.
*/
private int gRPCPort;
private int maxConcurrentCallsPerConnection;
private int maxMessageSize;
private int gRPCThreadPoolSize;
private int gRPCThreadPoolQueueSize;
private String authentication;
private boolean gRPCSslEnabled = false;
private String gRPCSslKeyPath;
private String gRPCSslCertChainPath;
private String gRPCSslTrustedCAsPath;

public String getGRPCHost() {
return gRPCHost;
}

public void setGRPCHost(String gRPCHost) {
this.gRPCHost = gRPCHost;
}

public int getGRPCPort() {
return gRPCPort;
}

public void setGRPCPort(int gRPCPort) {
this.gRPCPort = gRPCPort;
}

public int getMaxConcurrentCallsPerConnection() {
return maxConcurrentCallsPerConnection;
}

public void setMaxConcurrentCallsPerConnection(int maxConcurrentCallsPerConnection) {
this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection;
}

public int getMaxMessageSize() {
return maxMessageSize;
}

public void setMaxMessageSize(int maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}

public int getGRPCThreadPoolSize() {
return gRPCThreadPoolSize;
}

public void setGRPCThreadPoolSize(int gRPCThreadPoolSize) {
this.gRPCThreadPoolSize = gRPCThreadPoolSize;
}

public int getGRPCThreadPoolQueueSize() {
return gRPCThreadPoolQueueSize;
}

public void setGRPCThreadPoolQueueSize(int gRPCThreadPoolQueueSize) {
this.gRPCThreadPoolQueueSize = gRPCThreadPoolQueueSize;
}

public String getAuthentication() {
return authentication;
}

public void setAuthentication(String authentication) {
this.authentication = authentication;
}

public boolean getGRPCSslEnabled() {
return gRPCSslEnabled;
}

public void setGRPCSslEnabled(boolean gRPCSslEnabled) {
this.gRPCSslEnabled = gRPCSslEnabled;
}

public String getGRPCSslKeyPath() {
return gRPCSslKeyPath;
}

public void setGRPCSslKeyPath(String gRPCSslKeyPath) {
this.gRPCSslKeyPath = gRPCSslKeyPath;
}

public String getGRPCSslCertChainPath() {
return gRPCSslCertChainPath;
}

public void setGRPCSslCertChainPath(String gRPCSslCertChainPath) {
this.gRPCSslCertChainPath = gRPCSslCertChainPath;
}

public String getGRPCSslTrustedCAsPath() {
return gRPCSslTrustedCAsPath;
}

public void setGRPCSslTrustedCAsPath(String gRPCSslTrustedCAsPath) {
this.gRPCSslTrustedCAsPath = gRPCSslTrustedCAsPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,13 @@

package zipkin.server.receiver.otlp;

import org.apache.logging.log4j.util.Strings;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverModule;
import org.apache.skywalking.oap.server.receiver.otel.otlp.OpenTelemetryMetricRequestProcessor;
Expand All @@ -34,7 +30,6 @@
public class OTLPTraceProvider extends ModuleProvider {
private OTLPTraceConfig moduleConfig;
private OpenTelemetryTraceHandler traceHandler;
private GRPCServer grpcServer;

@Override
public String name() {
Expand Down Expand Up @@ -70,53 +65,13 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {

@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
GRPCHandlerRegister handlerRegister;
if (moduleConfig.getGRPCPort() > 0) {
if (moduleConfig.getGRPCSslEnabled()) {
grpcServer = new GRPCServer(
Strings.isBlank(moduleConfig.getGRPCHost()) ? "0.0.0.0" : moduleConfig.getGRPCHost(),
moduleConfig.getGRPCPort(),
moduleConfig.getGRPCSslCertChainPath(),
moduleConfig.getGRPCSslKeyPath(),
moduleConfig.getGRPCSslTrustedCAsPath()
);
} else {
grpcServer = new GRPCServer(
Strings.isBlank(moduleConfig.getGRPCHost()) ? "0.0.0.0" : moduleConfig.getGRPCHost(),
moduleConfig.getGRPCPort()
);
}
if (moduleConfig.getMaxMessageSize() > 0) {
grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize());
}
if (moduleConfig.getMaxConcurrentCallsPerConnection() > 0) {
grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getMaxConcurrentCallsPerConnection());
}
if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
}
if (moduleConfig.getGRPCThreadPoolSize() > 0) {
grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
}
grpcServer.initialize();

handlerRegister = new GRPCHandlerRegisterImpl(grpcServer);
} else {
handlerRegister = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class);
}
GRPCHandlerRegister handlerRegister = getManager().find(CoreModule.NAME).provider().getService(GRPCHandlerRegister.class);
traceHandler = new OTLPTraceHandler(handlerRegister, getManager());
traceHandler.active();
}

@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
if (grpcServer != null) {
try {
grpcServer.start();
} catch (ServerException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,71 +17,4 @@
import org.apache.skywalking.oap.server.library.module.ModuleConfig;

public class ZipkinHTTPReceiverConfig extends ModuleConfig {
private String restHost;
private int restPort;
private String restContextPath;
private int restMaxThreads = 200;
private long restIdleTimeOut = 30000;
private int restAcceptQueueSize = 0;
/**
* The maximum size in bytes allowed for request headers.
* Use -1 to disable it.
*/
private int restMaxRequestHeaderSize = 8192;

public String getRestHost() {
return restHost;
}

public void setRestHost(String restHost) {
this.restHost = restHost;
}

public int getRestPort() {
return restPort;
}

public void setRestPort(int restPort) {
this.restPort = restPort;
}

public String getRestContextPath() {
return restContextPath;
}

public void setRestContextPath(String restContextPath) {
this.restContextPath = restContextPath;
}

public int getRestMaxThreads() {
return restMaxThreads;
}

public void setRestMaxThreads(int restMaxThreads) {
this.restMaxThreads = restMaxThreads;
}

public long getRestIdleTimeOut() {
return restIdleTimeOut;
}

public void setRestIdleTimeOut(long restIdleTimeOut) {
this.restIdleTimeOut = restIdleTimeOut;
}

public int getRestAcceptQueueSize() {
return restAcceptQueueSize;
}

public void setRestAcceptQueueSize(int restAcceptQueueSize) {
this.restAcceptQueueSize = restAcceptQueueSize;
}

public int getRestMaxRequestHeaderSize() {
return restMaxRequestHeaderSize;
}

public void setRestMaxRequestHeaderSize(int restMaxRequestHeaderSize) {
this.restMaxRequestHeaderSize = restMaxRequestHeaderSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.http.HTTPServer;
import org.apache.skywalking.oap.server.library.server.http.HTTPServerConfig;
import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.ZipkinSpanHTTPHandler;
import zipkin.server.core.services.HTTPConfigurableServer;

import java.util.Arrays;

public class ZipkinHTTPReceiverProvider extends ModuleProvider {
private ZipkinHTTPReceiverConfig moduleConfig;
private ZipkinSpanHTTPHandler httpHandler;
private HTTPServer httpServer;

@Override
public String name() {
Expand Down Expand Up @@ -63,39 +59,19 @@ public void onInitialized(ZipkinHTTPReceiverConfig initialized) {

@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
if (moduleConfig.getRestPort() > 0) {
HTTPServerConfig httpServerConfig = HTTPServerConfig.builder()
.host(moduleConfig.getRestHost())
.port(moduleConfig.getRestPort())
.contextPath(moduleConfig.getRestContextPath())
.idleTimeOut(moduleConfig.getRestIdleTimeOut())
.maxThreads(moduleConfig.getRestMaxThreads())
.acceptQueueSize(moduleConfig.getRestAcceptQueueSize())
.maxRequestHeaderSize(moduleConfig.getRestMaxRequestHeaderSize())
.build();
httpServer = new HTTPConfigurableServer(httpServerConfig);
httpServer.initialize();
}
}

@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
final SpanForwardService spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class);
httpHandler = new ZipkinSpanHTTPHandler(spanForward, getManager());

if (httpServer != null) {
httpServer.addHandler(httpHandler, Arrays.asList(HttpMethod.POST, HttpMethod.GET));
} else {
final HTTPHandlerRegister httpRegister = getManager().find(CoreModule.NAME).provider().getService(HTTPHandlerRegister.class);
httpRegister.addHandler(httpHandler, Arrays.asList(HttpMethod.POST, HttpMethod.GET));
}
final HTTPHandlerRegister httpRegister = getManager().find(CoreModule.NAME).provider().getService(HTTPHandlerRegister.class);
httpRegister.addHandler(httpHandler, Arrays.asList(HttpMethod.POST, HttpMethod.GET));
}

@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
if (httpServer != null) {
httpServer.start();
}
}

@Override
Expand All @@ -104,8 +80,4 @@ public String[] requiredModules() {
CoreModule.NAME,
};
}

public ZipkinSpanHTTPHandler getHttpHandler() {
return httpHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void setup() throws ModuleStartException {
final ZipkinHTTPReceiverProvider provider = new ZipkinHTTPReceiverProvider();
provider.setManager(moduleManager);
final ZipkinHTTPReceiverConfig config = new ZipkinHTTPReceiverConfig();
config.setRestPort(-1);
Whitebox.setInternalState(provider, ZipkinHTTPReceiverConfig.class, config);
provider.prepare();
provider.start();
Expand Down
5 changes: 5 additions & 0 deletions zipkin-server/server-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<artifactId>classgraph</artifactId>
<version>4.8.162</version>
</dependency>
<dependency>
<groupId>${armeria.groupId}</groupId>
<artifactId>armeria-grpc</artifactId>
<version>${armeria.version}</version>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit b90970d

Please sign in to comment.