Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(apm): support sofatracer #568

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
*/
package io.holoinsight.server.apm.common.constants;

import java.util.ArrayList;
import java.util.List;

public class Const {
public static final String NONE = "0";
public static final String SERVICE_ID_CONNECTOR = ".";
Expand Down Expand Up @@ -49,4 +52,28 @@ public class Const {
// custom tags
public static final String OTLP_SPANLAYER = "spanLayer";

public static final String REAL_TRACE_ID = "realTraceId";
public static final String REAL_SPAN_ID = "realSpanId";
public static final String REAL_PARENT_SPAN_ID = "realParentSpanId";

// Since the otlp protocol traceId requires byte[16],
// the otlp traceId is different from the original traceId collected by the agent,
// and holoinsight needs to store the original traceId collected by the agent
public static final List<String> REAL_TRACE_ID_TAGS = new ArrayList() {
{
add("sw8.trace_id");
add("sofatracer.trace_id");
}
};
public static final List<String> REAL_SPAN_ID_TAGS = new ArrayList() {
{
add("sofatracer.span_id");
}
};
public static final List<String> REAL_PARENT_SPAN_ID_TAGS = new ArrayList() {
{
add("sofatracer.parent_span_id");
}
};

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.apm.common.model.query;

import io.holoinsight.server.apm.common.model.specification.sw.Span;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

/**
* @author jiwliu
* @version : Trace.java, v 0.1 2022年09月20日 15:35 xiangwanpeng Exp $
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class TraceTree implements Serializable {

private static final long serialVersionUID = 3436393920339302223L;

private Span span;
private List<TraceTree> children = new ArrayList<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.holoinsight.server.apm.common.model.query.Pagination;
import io.holoinsight.server.apm.common.model.query.QueryOrder;
import io.holoinsight.server.apm.common.model.query.TraceBrief;
import io.holoinsight.server.apm.common.model.query.TraceTree;
import io.holoinsight.server.apm.common.model.specification.OtlpMappings;
import io.holoinsight.server.apm.common.model.specification.otel.Event;
import io.holoinsight.server.apm.common.model.specification.otel.KeyValue;
Expand Down Expand Up @@ -105,6 +106,55 @@ public TraceBrief queryBasicTraces(final String tenant, String serviceName,
public Trace queryTrace(String tenant, long start, long end, String traceId, List<Tag> tags)
throws IOException {
Trace trace = new Trace();
trace.setSpans(querySpan(tenant, start, end, traceId, tags));

List<Span> sortedSpans = new LinkedList<>();
if (CollectionUtils.isNotEmpty(trace.getSpans())) {
List<Span> rootSpans = findRoot(trace.getSpans());
if (CollectionUtils.isNotEmpty(rootSpans)) {
rootSpans.forEach(span -> {
List<Span> childrenSpan = new ArrayList<>();
childrenSpan.add(span);
findChildren(trace.getSpans(), span, childrenSpan);
sortedSpans.addAll(childrenSpan);
});
}
}
trace.getSpans().clear();
sortedSpans.forEach(span -> {
if (StringUtils.isEmpty(span.getParentSpanId())
&& CollectionUtils.isNotEmpty(span.getRefs())) {
span.setParentSpanId(span.getRefs().get(0).getParentSpanId());
}
});
trace.getSpans().addAll(sortedSpans);
return trace;
}

@Override
public List<TraceTree> queryTraceTree(String tenant, long start, long end, String traceId,
List<Tag> tags) throws Exception {
List<TraceTree> result = new ArrayList<>();
List<Span> spans = querySpan(tenant, start, end, traceId, tags);
if (CollectionUtils.isNotEmpty(spans)) {
List<Span> rootSpans = findRoot(spans);
if (CollectionUtils.isNotEmpty(rootSpans)) {
rootSpans.forEach(span -> {
TraceTree root = new TraceTree();
root.setSpan(span);
List<TraceTree> children = new ArrayList<>();
root.setChildren(children);
findChildren1(spans, span, children);
result.add(root);
});
}
}
return result;
}

private List<Span> querySpan(String tenant, long start, long end, String traceId, List<Tag> tags)
throws IOException {
List<Span> spans = new ArrayList<>();

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

Expand All @@ -125,32 +175,11 @@ public Trace queryTrace(String tenant, long start, long end, String traceId, Lis
if (!spanRecords.isEmpty()) {
for (SpanDO spanEsDO : spanRecords) {
if (nonNull(spanEsDO)) {
trace.getSpans().add(buildSpan(spanEsDO));
spans.add(buildSpan(spanEsDO));
}
}
}

List<Span> sortedSpans = new LinkedList<>();
if (CollectionUtils.isNotEmpty(trace.getSpans())) {
List<Span> rootSpans = findRoot(trace.getSpans());
if (CollectionUtils.isNotEmpty(rootSpans)) {
rootSpans.forEach(span -> {
List<Span> childrenSpan = new ArrayList<>();
childrenSpan.add(span);
findChildren(trace.getSpans(), span, childrenSpan);
sortedSpans.addAll(childrenSpan);
});
}
}
trace.getSpans().clear();
sortedSpans.forEach(span -> {
if (StringUtils.isEmpty(span.getParentSpanId())
&& CollectionUtils.isNotEmpty(span.getRefs())) {
span.setParentSpanId(span.getRefs().get(0).getParentSpanId());
}
});
trace.getSpans().addAll(sortedSpans);
return trace;
return spans;
}

private Span buildSpan(SpanDO spanEsDO) {
Expand Down Expand Up @@ -235,24 +264,43 @@ private Span buildSpan(SpanDO spanEsDO) {

private List<Span> findRoot(List<Span> spans) {
List<Span> rootSpans = new ArrayList<>();
spans.forEach(span -> {
ListIterator<Span> iterator = spans.listIterator(spans.size());
while (iterator.hasPrevious()) {
Span span = iterator.previous();
String parentSpanId = span.getParentSpanId();

boolean hasParent = false;
for (Span subSpan : spans) {
if (parentSpanId.equals(subSpan.getSpanId())
|| CollectionUtils.isNotEmpty(span.getRefs())) {
// sofatracer mq/rpc server span(parentSpanId == spanId)
if (subSpan.getSpanId().equals(parentSpanId) && !subSpan.getType().equals(span.getType())) {
hasParent = true;
// if find parent, quick exit
break;
}
}

if (!hasParent) {
span.setRoot(true);
rootSpans.add(span);
// rootSpan.parentSpanId == ""
if (StringUtils.isEmpty(parentSpanId)) {
span.setRoot(true);
rootSpans.add(span);
} else {
// sofatracer may be missing span, supplement the missing span until the root span
Span missingSpan = new Span();
missingSpan.setSpanId(parentSpanId);
missingSpan.setTraceId(span.getTraceId());
missingSpan.setEndpointName("UNKNOWN");
missingSpan.setParentSpanId("");
missingSpan.setType("");
// sofatracer spanId -> parentSpanId: 0.1.1 -> 0.1
if (parentSpanId.contains(".")) {
missingSpan.setParentSpanId(parentSpanId.substring(0, parentSpanId.lastIndexOf(".")));
}
iterator.add(missingSpan);
}
}
});
}

rootSpans.sort(Comparator.comparing(Span::getStartTime));
return rootSpans;
}
Expand All @@ -261,7 +309,26 @@ private void findChildren(List<Span> spans, Span parentSpan, List<Span> children
spans.forEach(span -> {
if (span.getParentSpanId().equals(parentSpan.getSpanId())) {
childrenSpan.add(span);
findChildren(spans, span, childrenSpan);
if (!span.getParentSpanId().equals(span.getSpanId())) {
findChildren(spans, span, childrenSpan);
}
}
});
}

private void findChildren1(List<Span> spans, Span parentSpan, List<TraceTree> children) {
spans.forEach(span -> {
if (span.getParentSpanId().equals(parentSpan.getSpanId())) {
TraceTree child = new TraceTree();
child.setSpan(span);
children.add(child);
List<TraceTree> newChildren = new ArrayList<>();
child.setChildren(newChildren);
// sofatracer mq/rpc server span(parentSpanId == spanId)
// prevent stack overflow
if (!span.getParentSpanId().equals(span.getSpanId())) {
findChildren1(spans, span, newChildren);
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.holoinsight.server.apm.common.model.query.Pagination;
import io.holoinsight.server.apm.common.model.query.QueryOrder;
import io.holoinsight.server.apm.common.model.query.TraceBrief;
import io.holoinsight.server.apm.common.model.query.TraceTree;
import io.holoinsight.server.apm.common.model.specification.sw.Tag;
import io.holoinsight.server.apm.common.model.specification.sw.Trace;
import io.holoinsight.server.apm.common.model.specification.sw.TraceState;
Expand All @@ -27,4 +28,7 @@ TraceBrief queryBasicTraces(final String tenant, final String serviceName,

Trace queryTrace(final String tenant, final long start, final long end, final String traceId,
List<Tag> tags) throws Exception;

List<TraceTree> queryTraceTree(final String tenant, final long start, final long end,
final String traceId, List<Tag> tags) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,20 @@ public List<ServiceErrorDO> analysis(Span span, Map<String, AnyValue> spanAttrMa

public ServiceErrorDO setPublicAttrs(ServiceErrorDO errorInfo, Span span,
Map<String, AnyValue> spanAttrMap, Map<String, AnyValue> resourceAttrMap) {
String realTraceId = resourceAttrMap.containsKey(Const.REAL_TRACE_ID)
? resourceAttrMap.get(Const.REAL_TRACE_ID).getStringValue()
: Hex.encodeHexString(span.getTraceId().toByteArray());
String realSpanId = spanAttrMap.containsKey(Const.REAL_SPAN_ID)
? spanAttrMap.get(Const.REAL_SPAN_ID).getStringValue()
: Hex.encodeHexString(span.getSpanId().toByteArray());
errorInfo.setSpanId(realSpanId);
errorInfo.setTraceId(realTraceId);
errorInfo.setTenant(resourceAttrMap.get(Const.TENANT).getStringValue());
errorInfo.setServiceName(
resourceAttrMap.get(ResourceAttributes.SERVICE_NAME.getKey()).getStringValue());
errorInfo.setEndpointName(span.getName());
errorInfo.setServiceInstanceName(
resourceAttrMap.get(Const.OTLP_RESOURCE_SERVICE_INSTANCE_NAME).getStringValue());
errorInfo.setSpanId(Hex.encodeHexString(span.getSpanId().toByteArray()));
errorInfo.setTraceId(Hex.encodeHexString(span.getTraceId().toByteArray()));
errorInfo.setStartTime(TimeUtils.unixNano2MS(span.getStartTimeUnixNano()));
errorInfo.setTimestamp(TimeUtils.unixNano2MS(span.getEndTimeUnixNano()));
errorInfo.setTimestamp2(TimeUtils.unixNano2MS(span.getEndTimeUnixNano()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ public SlowSqlDO setPublicAttrs(SlowSqlDO slowSqlDO, Span span, Map<String, AnyV
slowSqlDO.setStartTime(TimeUtils.unixNano2MS(span.getStartTimeUnixNano()));
slowSqlDO.setTimestamp(TimeUtils.unixNano2MS(span.getEndTimeUnixNano()));
slowSqlDO.setTimestamp2(TimeUtils.unixNano2MS(span.getEndTimeUnixNano()));
slowSqlDO.setTraceId(Hex.encodeHexString(span.getTraceId().toByteArray()));
String realTraceId = resourceAttrMap.containsKey(Const.REAL_TRACE_ID)
? resourceAttrMap.get(Const.REAL_TRACE_ID).getStringValue()
: Hex.encodeHexString(span.getTraceId().toByteArray());
slowSqlDO.setTraceId(realTraceId);
slowSqlDO
.setStatement(spanAttrMap.get(SemanticAttributes.DB_STATEMENT.getKey()).getStringValue());
return slowSqlDO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ public RPCTrafficSourceBuilder setPublicAttrs(RPCTrafficSourceBuilder sourceBuil
sourceBuilder.setTenant(resourceAttrMap.get(Const.TENANT).getStringValue());
long latency = TimeUtils.unixNano2MS(span.getEndTimeUnixNano())
- TimeUtils.unixNano2MS(span.getStartTimeUnixNano());
sourceBuilder.setTraceId(Hex.encodeHexString(span.getTraceId().toByteArray()));

String realTraceId = resourceAttrMap.containsKey(Const.REAL_TRACE_ID)
? resourceAttrMap.get(Const.REAL_TRACE_ID).getStringValue()
: Hex.encodeHexString(span.getTraceId().toByteArray());
sourceBuilder.setTraceId(realTraceId);
sourceBuilder.setStartTime(TimeUtils.unixNano2MS(span.getStartTimeUnixNano()));
sourceBuilder.setEndTime(TimeUtils.unixNano2MS(span.getEndTimeUnixNano()));
sourceBuilder.setTimeBucket(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package io.holoinsight.server.apm.receiver.common;

import io.holoinsight.server.apm.common.constants.Const;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;

Expand All @@ -16,6 +17,14 @@ public static Map<String, AnyValue> attList2Map(List<KeyValue> attributes) {
Map<String, AnyValue> result = new HashMap<>(attributes.size());
for (io.opentelemetry.proto.common.v1.KeyValue attribute : attributes) {
result.put(attribute.getKey(), attribute.getValue());

if (Const.REAL_TRACE_ID_TAGS.contains(attribute.getKey())) {
result.put(Const.REAL_TRACE_ID, attribute.getValue());
} else if (Const.REAL_SPAN_ID_TAGS.contains(attribute.getKey())) {
result.put(Const.REAL_SPAN_ID, attribute.getValue());
} else if (Const.REAL_PARENT_SPAN_ID_TAGS.contains(attribute.getKey())) {
result.put(Const.REAL_PARENT_SPAN_ID, attribute.getValue());
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package io.holoinsight.server.apm.receiver.trace;

import io.holoinsight.server.apm.common.constants.Const;
import io.holoinsight.server.apm.common.model.specification.otel.Event;
import io.holoinsight.server.apm.common.model.specification.otel.KeyValue;
import io.holoinsight.server.apm.common.model.specification.otel.Link;
Expand Down Expand Up @@ -128,7 +129,8 @@ public void handleResourceSpans(
}

errorInfoList.addAll(errorAnalysis.analysis(span, spanAttrMap, resourceAttrMap));
spanEsDOList.add(SpanDO.fromSpan(transformSpan(span), otelResource));
spanEsDOList.add(SpanDO.fromSpan(transformSpan(span, resourceAttrMap, spanAttrMap),
otelResource));
});
}
});
Expand Down Expand Up @@ -181,8 +183,10 @@ public void buildRelation(List<RPCTrafficSourceBuilder> relationBuilders) throws
serverRelationList.add(ServiceRelationDO.fromServiceRelation(serviceRelation));

ServiceInstanceRelation serviceInstanceRelation = callingIn.toServiceInstanceRelation();
serverInstanceRelationList
.add(ServiceInstanceRelationDO.fromServiceInstanceRelation(serviceInstanceRelation));
if (serviceInstanceRelation != null) {
serverInstanceRelationList
.add(ServiceInstanceRelationDO.fromServiceInstanceRelation(serviceInstanceRelation));
}

EndpointRelation endpointRelation = callingIn.toEndpointRelation();
if (endpointRelation != null) {
Expand Down Expand Up @@ -241,11 +245,21 @@ private Map<String, String> generateSpanIdEndpointMap(List<ScopeSpans> scopeSpan
return result;
}

protected Span transformSpan(io.opentelemetry.proto.trace.v1.Span span) {
protected Span transformSpan(io.opentelemetry.proto.trace.v1.Span span,
Map<String, AnyValue> resourceAttrMap, Map<String, AnyValue> spanAttrMap) {
String realTraceId = resourceAttrMap.containsKey(Const.REAL_TRACE_ID)
? resourceAttrMap.get(Const.REAL_TRACE_ID).getStringValue()
: Hex.encodeHexString(span.getTraceId().toByteArray());
String realSpanId = spanAttrMap.containsKey(Const.REAL_SPAN_ID)
? spanAttrMap.get(Const.REAL_SPAN_ID).getStringValue()
: Hex.encodeHexString(span.getSpanId().toByteArray());
String realParentSpanId = spanAttrMap.containsKey(Const.REAL_PARENT_SPAN_ID)
? spanAttrMap.get(Const.REAL_PARENT_SPAN_ID).getStringValue()
: Hex.encodeHexString(span.getParentSpanId().toByteArray());
Span otelSpan = new Span();
otelSpan.setTraceId(Hex.encodeHexString(span.getTraceId().toByteArray()));
otelSpan.setSpanId(Hex.encodeHexString(span.getSpanId().toByteArray()));
otelSpan.setParentSpanId(Hex.encodeHexString(span.getParentSpanId().toByteArray()));
otelSpan.setTraceId(realTraceId);
otelSpan.setSpanId(realSpanId);
otelSpan.setParentSpanId(realParentSpanId);
otelSpan.setName(span.getName());
otelSpan.setTraceState(span.getTraceState());
otelSpan.setKind(SpanKind.fromProto(span.getKind()));
Expand Down
Loading
Loading