Skip to content

Commit

Permalink
refactor: otlp use its own thread pool (#831)
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo authored Apr 8, 2024
1 parent 125304c commit 5a6786b
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private static void shutdown(ExecutorService es) {
}
}

private static int cpu() {
public static int cpu() {
// You will get wrong cpu count when running in virtualization environment, such as 'docker'.
// You should pass an env 'CPU' to use as available cpu count.
// Otherwise, you will get many threads in pool.
Expand All @@ -156,7 +156,7 @@ private static int cpu() {
return Runtime.getRuntime().availableProcessors();
}

private static ExecutorService executor(int min, int max, int keepalive, int queue,
public static ExecutorService executor(int min, int max, int keepalive, int queue,
String nameFormat) {
ThreadFactory tf = new ThreadFactoryBuilder() //
.setNameFormat(nameFormat) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public OTLPGrpcServer otlpGrpcServer() {
public OTLPMetricsHandler otlpMetricsHandler() {
return new ConsoleOTLPMetricsHandler();
}

@Bean
public OtlpConfig otlpConfig() {
return new OtlpConfig();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand Down Expand Up @@ -44,9 +45,11 @@ public class OTLPGrpcServer {
private OTLPTraceHandler otlpTraceHandler;
@Autowired(required = false)
private OTLPLogsHandler otlpLogsHandler;
private List<Server> servers = new ArrayList<>();
@Autowired
private CommonThreadPools commonThreadPools;
private OtlpConfig otlpConfig;

private List<Server> servers = new ArrayList<>();
private ExecutorService es;

@PostConstruct
public void start() throws IOException {
Expand All @@ -56,8 +59,12 @@ public void start() throws IOException {
}

private Server start(int port) throws IOException {
// otlp uses its own thread pool
int cpu = CommonThreadPools.cpu();
es = CommonThreadPools.executor(cpu * 2, cpu * 2, 0, 4096, "otlp-server-%d");

ServerBuilder<?> sb = ServerBuilder.forPort(port) //
.executor(commonThreadPools.getRpcServer()) //
.executor(es) //
.maxInboundMessageSize(100 * 1024 * 1024); //

if (otlpMetricsHandler != null) {
Expand All @@ -75,6 +82,11 @@ public void export(ExportMetricsServiceRequest r,
@Override
public void export(ExportTraceServiceRequest request,
StreamObserver<ExportTraceServiceResponse> o) {
if (!otlpConfig.getTrace().isEnabled()) {
log.warn("[otlp] trace is disabled, discard request");
o.onNext(ExportTraceServiceResponse.getDefaultInstance());
return;
}
otlpTraceHandler.export(request, o);
}
});
Expand Down Expand Up @@ -105,5 +117,8 @@ public void stop() {
for (Server server : servers) {
server.shutdown();
}
if (es != null) {
es.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -53,6 +54,8 @@ public class OTLPWebController {
private OTLPTraceHandler otlpTraceHandler;
@Autowired(required = false)
private OTLPLogsHandler otlpLogsHandler;
@Autowired
private OtlpConfig otlpConfig;

@PostMapping(value = "/metrics", consumes = {JSON, PROTOBUF})
public ResponseEntity<?> metrics(HttpServletRequest httpRequest)
Expand All @@ -75,6 +78,13 @@ public ResponseEntity<?> traces(HttpServletRequest httpRequest)
return ResponseEntity.internalServerError().body("traces unsupported");
}

if (!otlpConfig.getTrace().isEnabled()) {
String str = JsonFormat.printer().print(ExportTraceServiceResponse.getDefaultInstance());
return ResponseEntity.status(HttpStatus.OK) //
.contentType(MediaType.APPLICATION_JSON) //
.body(str); //
}

return handle("traces", //
httpRequest, //
ExportTraceServiceRequest.parser(), //
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.otlp.server;

import org.springframework.boot.context.properties.bind.Binder;

import com.xzchaoo.commons.basic.config.spring.AbstractConfig;

import lombok.Data;
import lombok.Getter;

/**
* <p>
* created at 2024/4/8
*
* @author xzchaoo
*/
@Getter
public class OtlpConfig extends AbstractConfig {
private volatile Trace trace = new Trace();

@Override
protected void refresh(Binder binder) {
binder.bind("otlp.trace", Trace.class).ifBound(x -> trace = x);
}

@Data
public static class Trace {
/**
* If false, the processing of trace data will be skipped.
*/
private boolean enabled = true;
}

}

0 comments on commit 5a6786b

Please sign in to comment.