Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: otlp use its own thread pool #831

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private static void shutdown(ExecutorService es) {
}
}

private static int cpu() {
public static int cpu() {
// You will get wrong cpu count when running in virtualization environment, such as 'docker'.
// You should pass an env 'CPU' to use as available cpu count.
// Otherwise, you will get many threads in pool.
Expand All @@ -156,7 +156,7 @@ private static int cpu() {
return Runtime.getRuntime().availableProcessors();
}

private static ExecutorService executor(int min, int max, int keepalive, int queue,
public static ExecutorService executor(int min, int max, int keepalive, int queue,
String nameFormat) {
ThreadFactory tf = new ThreadFactoryBuilder() //
.setNameFormat(nameFormat) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public OTLPGrpcServer otlpGrpcServer() {
public OTLPMetricsHandler otlpMetricsHandler() {
return new ConsoleOTLPMetricsHandler();
}

@Bean
public OtlpConfig otlpConfig() {
return new OtlpConfig();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand Down Expand Up @@ -44,9 +45,11 @@ public class OTLPGrpcServer {
private OTLPTraceHandler otlpTraceHandler;
@Autowired(required = false)
private OTLPLogsHandler otlpLogsHandler;
private List<Server> servers = new ArrayList<>();
@Autowired
private CommonThreadPools commonThreadPools;
private OtlpConfig otlpConfig;

private List<Server> servers = new ArrayList<>();
private ExecutorService es;

@PostConstruct
public void start() throws IOException {
Expand All @@ -56,8 +59,12 @@ public void start() throws IOException {
}

private Server start(int port) throws IOException {
// otlp uses its own thread pool
int cpu = CommonThreadPools.cpu();
es = CommonThreadPools.executor(cpu * 2, cpu * 2, 0, 4096, "otlp-server-%d");

ServerBuilder<?> sb = ServerBuilder.forPort(port) //
.executor(commonThreadPools.getRpcServer()) //
.executor(es) //
.maxInboundMessageSize(100 * 1024 * 1024); //

if (otlpMetricsHandler != null) {
Expand All @@ -75,6 +82,11 @@ public void export(ExportMetricsServiceRequest r,
@Override
public void export(ExportTraceServiceRequest request,
StreamObserver<ExportTraceServiceResponse> o) {
if (!otlpConfig.getTrace().isEnabled()) {
log.warn("[otlp] trace is disabled, discard request");
o.onNext(ExportTraceServiceResponse.getDefaultInstance());
return;
}
otlpTraceHandler.export(request, o);
}
});
Expand Down Expand Up @@ -105,5 +117,8 @@ public void stop() {
for (Server server : servers) {
server.shutdown();
}
if (es != null) {
es.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -53,6 +54,8 @@ public class OTLPWebController {
private OTLPTraceHandler otlpTraceHandler;
@Autowired(required = false)
private OTLPLogsHandler otlpLogsHandler;
@Autowired
private OtlpConfig otlpConfig;

@PostMapping(value = "/metrics", consumes = {JSON, PROTOBUF})
public ResponseEntity<?> metrics(HttpServletRequest httpRequest)
Expand All @@ -75,6 +78,13 @@ public ResponseEntity<?> traces(HttpServletRequest httpRequest)
return ResponseEntity.internalServerError().body("traces unsupported");
}

if (!otlpConfig.getTrace().isEnabled()) {
String str = JsonFormat.printer().print(ExportTraceServiceResponse.getDefaultInstance());
return ResponseEntity.status(HttpStatus.OK) //
.contentType(MediaType.APPLICATION_JSON) //
.body(str); //
}

return handle("traces", //
httpRequest, //
ExportTraceServiceRequest.parser(), //
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.otlp.server;

import org.springframework.boot.context.properties.bind.Binder;

import com.xzchaoo.commons.basic.config.spring.AbstractConfig;

import lombok.Data;
import lombok.Getter;

/**
* <p>
* created at 2024/4/8
*
* @author xzchaoo
*/
@Getter
public class OtlpConfig extends AbstractConfig {
private volatile Trace trace = new Trace();

@Override
protected void refresh(Binder binder) {
binder.bind("otlp.trace", Trace.class).ifBound(x -> trace = x);
}

@Data
public static class Trace {
/**
* If false, the processing of trace data will be skipped.
*/
private boolean enabled = true;
}

}
Loading