diff --git a/server/agg/agg-boot/src/main/java/io/holoinsight/server/agg/v1/executor/executor/MetricStorageOutput.java b/server/agg/agg-boot/src/main/java/io/holoinsight/server/agg/v1/executor/executor/MetricStorageOutput.java index b578f4cd0..b475cc540 100644 --- a/server/agg/agg-boot/src/main/java/io/holoinsight/server/agg/v1/executor/executor/MetricStorageOutput.java +++ b/server/agg/agg-boot/src/main/java/io/holoinsight/server/agg/v1/executor/executor/MetricStorageOutput.java @@ -4,10 +4,12 @@ package io.holoinsight.server.agg.v1.executor.executor; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.text.StringSubstitutor; import org.springframework.beans.factory.annotation.Autowired; @@ -16,8 +18,10 @@ import com.alibaba.fastjson.serializer.SerializerFeature; import io.holoinsight.server.agg.v1.core.Utils; +import io.holoinsight.server.agg.v1.core.conf.OutputField; import io.holoinsight.server.agg.v1.executor.output.AggStringLookup; import io.holoinsight.server.agg.v1.executor.output.MergedCompleteness; +import io.holoinsight.server.agg.v1.executor.output.PercentileFinalValues; import io.holoinsight.server.agg.v1.executor.output.XOutput; import io.holoinsight.server.extension.MetricStorage; import io.holoinsight.server.extension.model.WriteMetricsParam; @@ -55,6 +59,8 @@ public void write(Batch batch) { String ts = Utils.formatTimeShort(batch.window.getTimestamp()); + Map fieldMap = convertToMap(batch.oi.getFields()); + boolean debug = false; int discard = 0; @@ -62,6 +68,7 @@ public void write(Batch batch) { for (Group g : batch.groups) { Map finalFields = g.getFinalFields(); for (Map.Entry e : finalFields.entrySet()) { + OutputField outputField = fieldMap.get(e.getKey()); Object value = e.getValue(); if (value == null) { continue; @@ -76,8 +83,29 @@ public void write(Batch batch) { } usedMetricNames.add(metricName); - try { + if (outputField != null && OutputField.PERCENTILE.equals(outputField.getType())) { + PercentileFinalValues pfv = + JSON.parseObject((String) value, PercentileFinalValues.class); + for (PercentileFinalValues.PercentileFinalValue v : pfv.getValues()) { + + + WriteMetricsParam.Point p = new WriteMetricsParam.Point(); + p.setMetricName(metricName); + p.setTimeStamp(batch.window.getTimestamp()); + + // add extra tag 'percent' + Map tags = new HashMap<>(g.getTags().asMap()); + tags.put("percent", Integer.toString((int) (100 * v.getRank()))); + p.setTags(tags); + + p.setValue(v.getQuantile()); + + points.add(p); + } + continue; + } + if (value instanceof Number) { } else if (value instanceof String) { } else { @@ -100,8 +128,10 @@ public void write(Batch batch) { batch.key, ts, metricName, g.getTags(), value); } points.add(p); - } catch (IllegalArgumentException ex) { + } catch (Exception ex) { discard++; + log.error("[output] [MetricStorage] agg=[{}] ts=[{}] metric=[{}] error", // + batch.key, ts, metricName, ex); } } } @@ -150,4 +180,8 @@ public void write(Batch batch) { } } + + private static Map convertToMap(List list) { + return list.stream().collect(Collectors.toMap(OutputField::getName, x -> x, (a, b) -> a)); + } } diff --git a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/AggFunc.java b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/AggFunc.java index dbeefd2cd..88dfa4825 100644 --- a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/AggFunc.java +++ b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/AggFunc.java @@ -32,6 +32,7 @@ public class AggFunc { public static final int TYPE_TOPN = 8; public static final int TYPE_AVG_MERGE = 9; public static final int TYPE_HLL = 10; + public static final int TYPE_PERCENTILE = 11; private static final Map TYPE_TO_INT = new HashMap<>(); @@ -48,6 +49,7 @@ public class AggFunc { TYPE_TO_INT.put("TOPN", 8); TYPE_TO_INT.put("AVG_MERGE", 9); TYPE_TO_INT.put("HLL", 10); + TYPE_TO_INT.put("PERCENTILE", 11); } private String type; diff --git a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/AggTask.java b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/AggTask.java index ad0d13dd7..944c84d47 100644 --- a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/AggTask.java +++ b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/AggTask.java @@ -62,10 +62,11 @@ public class AggTask { */ private FillZero fillZero = new FillZero(); + private Extension extension = new Extension(); + public AggTask() {} public boolean hasGroupBy() { return groupBy != null && !groupBy.isEmpty(); } - } diff --git a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/Extension.java b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/Extension.java new file mode 100644 index 000000000..a8dca59a1 --- /dev/null +++ b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/Extension.java @@ -0,0 +1,21 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ +package io.holoinsight.server.agg.v1.core.conf; + +import lombok.Data; + +/** + * Agg Task Extension + *

+ * created at 2024/2/20 + * + * @author xzchaoo + */ +@Data +public class Extension { + /** + * Whether to enter debug mode + */ + private boolean debug; +} diff --git a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/OutputField.java b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/OutputField.java index c0c8bdf9b..2aae011fe 100644 --- a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/OutputField.java +++ b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/conf/OutputField.java @@ -8,6 +8,7 @@ import com.googlecode.aviator.AviatorEvaluator; import com.googlecode.aviator.Expression; +import lombok.AccessLevel; import lombok.Data; import lombok.Getter; import lombok.Setter; @@ -20,6 +21,9 @@ */ @Data public class OutputField { + public static final String DEFAULT = ""; + public static final String PERCENTILE = "PERCENTILE"; + @Nonnull private String name; /** @@ -28,10 +32,21 @@ public class OutputField { @Nonnull private String expression; - @Getter - @Setter + @Getter(AccessLevel.NONE) + @Setter(AccessLevel.NONE) private transient Expression compiledExpression; + /** + * Final data type + *

    + *
  • null or "" or "DEFAULT": data of number and string types are supported, and data of other + * types will be uniformly converted to string.
  • + *
  • PERCENTILE: percentile requires the output result to be null, or `Map` + * where Key is the string format of the percentile, such as "99" "95"
  • + *
+ */ + private String type = DEFAULT; + public OutputField() {} public OutputField(String name, String expression) { diff --git a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/data/InDataNodeDataAccessor.java b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/data/InDataNodeDataAccessor.java index d2d5bc4a5..24826b105 100644 --- a/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/data/InDataNodeDataAccessor.java +++ b/server/agg/agg-core/src/main/java/io/holoinsight/server/agg/v1/core/data/InDataNodeDataAccessor.java @@ -22,6 +22,8 @@ public class InDataNodeDataAccessor implements DataAccessor { public void replace(AggProtos.InDataNode in) { this.in = in; + this.fieldName = null; + this.bf = null; } @Override @@ -76,6 +78,7 @@ public String getStringField(String name) { public void bindFieldName(String fieldName) { if (!isSingleValue()) { this.fieldName = fieldName; + this.bf = null; } } @@ -119,6 +122,10 @@ public String getStringField() { private void ensureBf() { if (bf == null) { bf = in.getFieldsOrDefault(fieldName, null); + // if bf is null, we have to treat it as a default value + if (bf == null) { + bf = AggProtos.BasicField.getDefaultInstance(); + } } } } diff --git a/server/agg/agg-dispatcher/pom.xml b/server/agg/agg-dispatcher/pom.xml index 13f699ffb..55b5fca01 100644 --- a/server/agg/agg-dispatcher/pom.xml +++ b/server/agg/agg-dispatcher/pom.xml @@ -28,6 +28,11 @@ registry-core ${project.version} + + io.holoinsight.server + apm-engine + ${project.version} + \ No newline at end of file diff --git a/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggDispatcher.java b/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggDispatcher.java index 3bf3fa1d2..66462ff98 100644 --- a/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggDispatcher.java +++ b/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggDispatcher.java @@ -21,6 +21,7 @@ import org.springframework.beans.factory.annotation.Autowired; import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; import com.xzchaoo.commons.stat.StringsKey; import io.holoinsight.server.agg.v1.core.AggProperties; @@ -31,8 +32,10 @@ import io.holoinsight.server.agg.v1.core.data.AggValuesSerdes; import io.holoinsight.server.agg.v1.core.kafka.KafkaProducerHealthChecker; import io.holoinsight.server.agg.v1.pb.AggProtos; +import io.holoinsight.server.apm.engine.model.SpanDO; import io.holoinsight.server.common.auth.AuthInfo; import io.holoinsight.server.common.threadpool.CommonThreadPools; +import io.holoinsight.server.extension.model.Record; import io.holoinsight.server.extension.model.Row; import io.holoinsight.server.extension.model.Table; import io.holoinsight.server.gateway.core.utils.StatUtils; @@ -322,12 +325,128 @@ private boolean send(AggTaskKey key, AggProtos.AggTaskValue value) { count += value.getDataTable().getRowCount(); } log.error("[agg] [{}] write kafka error, metric=[{}] size=[{}]", // - key, value.getMetric(), count); + key, value.getMetric(), count, exception); } }); return true; } + public void dispatchRecords(AuthInfo ai, String name, List records) { + List aggTasks = storage.getByMetric(name); + if (aggTasks.isEmpty()) { + return; + } + + StringBuilder sb = new StringBuilder(); + for (AggTask aggTask : aggTasks) { + String partition = buildPartition(sb, ai, aggTask); + AggTaskKey aggTaskKey = new AggTaskKey(ai.getTenant(), aggTask.getAggId(), partition); + + AggProtos.AggTaskValue.Builder aggTaskValue = + AggProtos.AggTaskValue.newBuilder().setMetric(name); + + for (Record record : records) { + AggProtos.InDataNode.Builder b = AggProtos.InDataNode.newBuilder() // + .setType(2) // + .setTimestamp(record.getTimestamp()); + + record.getTags().forEach((k, v) -> { + if (v != null) { + b.putTags(k, v); + } + }); + + record.getFields().forEach((k, v) -> { + if (v instanceof Number) { + putField(b, k, ((Number) v).doubleValue()); + } else if (v instanceof String) { + putField(b, k, v.toString()); + } + }); + + aggTaskValue.addInDataNodes(b); // + } + + AggProtos.AggTaskValue taskValue = aggTaskValue.build(); + + if (send(aggTaskKey, taskValue)) { + if (aggTask.getExtension().isDebug()) { + log.info("send to kafka {} {}", aggTaskKey.getAggId(), taskValue.getInDataNodesCount()); + } + StatUtils.KAFKA_SEND.add(StringsKey.of("v1"), + new long[] {1, taskValue.getInDataNodesCount()}); + } + } + } + + public void dispatchSpans(AuthInfo ai, List spans) { + if (CollectionUtils.isEmpty(spans)) { + return; + } + + String name = "_span"; + List aggTasks = storage.getByMetric(name); + if (aggTasks.isEmpty()) { + return; + } + + StringBuilder sb = new StringBuilder(); + for (AggTask aggTask : aggTasks) { + String partition = buildPartition(sb, ai, aggTask); + AggTaskKey aggTaskKey = new AggTaskKey(ai.getTenant(), aggTask.getAggId(), partition); + + AggProtos.AggTaskValue.Builder aggTaskValue = + AggProtos.AggTaskValue.newBuilder().setMetric(name); + + for (SpanDO span : spans) { + AggProtos.InDataNode.Builder b = AggProtos.InDataNode.newBuilder() // + .setType(2) // + .setTimestamp(span.getEndTime()); + + maybePutTag(b, "trace_id", span.getTraceId()); + maybePutTag(b, "kind", span.getKind()); + maybePutTag(b, "name", span.getName()); + maybePutTag(b, "trace_status", Integer.toString(span.getTraceStatus())); + + putField(b, "latency", span.getLatency()); + for (Map.Entry e : span.getTags().entrySet()) { + Object value = e.getValue(); + if (value instanceof String) { + maybePutTag(b, e.getKey(), (String) e.getValue()); + } + } + + aggTaskValue.addInDataNodes(b); // + } + + AggProtos.AggTaskValue taskValue = aggTaskValue.build(); + + if (send(aggTaskKey, taskValue)) { + if (aggTask.getExtension().isDebug()) { + log.info("send to kafka {} {}", aggTaskKey.getAggId(), taskValue.getInDataNodesCount()); + } + StatUtils.KAFKA_SEND.add(StringsKey.of("v1"), + new long[] {1, taskValue.getInDataNodesCount()}); + } + } + + } + + private static void maybePutTag(AggProtos.InDataNode.Builder b, String key, String value) { + if (value != null) { + b.putTags(key, value); + } + } + + private static void putField(AggProtos.InDataNode.Builder b, String key, String value) { + b.putFields(key, AggProtos.BasicField.newBuilder().setType(1) + .setBytesValue(ByteString.copyFromUtf8(value)).build()); + } + + private static void putField(AggProtos.InDataNode.Builder b, String key, double value) { + b.putFields(key, AggProtos.BasicField.newBuilder().setType(0).setFloat64Value(value).build()); + } + @NotNull private static String buildPartition(StringBuilder sb, AuthInfo authInfo, AggTask aggTask) { if (sb == null) { diff --git a/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggDispatcherAutoConfiguration.java b/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggDispatcherAutoConfiguration.java index 8570834ac..2af1260aa 100644 --- a/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggDispatcherAutoConfiguration.java +++ b/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggDispatcherAutoConfiguration.java @@ -3,6 +3,7 @@ */ package io.holoinsight.server.agg.v1.dispatcher; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -59,4 +60,11 @@ public AggConfig aggConfig() { public AggDispatcherMockDataGenerator aggDispatcherMockDataGenerator() { return new AggDispatcherMockDataGenerator(); } + + @ConditionalOnMissingBean + @ConditionalOnRole("apm") + @Bean + public AggSpanStorageHook aggSpanStorageHook() { + return new AggSpanStorageHook(); + } } diff --git a/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggSpanStorageHook.java b/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggSpanStorageHook.java new file mode 100644 index 000000000..13568274a --- /dev/null +++ b/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggSpanStorageHook.java @@ -0,0 +1,129 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ +package io.holoinsight.server.agg.v1.dispatcher; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; + +import com.google.common.collect.Maps; + +import io.holoinsight.server.apm.engine.model.ServiceErrorDO; +import io.holoinsight.server.apm.engine.model.ServiceRelationDO; +import io.holoinsight.server.apm.engine.model.SpanDO; +import io.holoinsight.server.apm.engine.storage.SpanStorageHook; +import io.holoinsight.server.common.auth.AuthInfo; +import io.holoinsight.server.extension.model.Record; +import lombok.extern.slf4j.Slf4j; + +/** + *

+ * created at 2024/2/5 + * + * @author xzchaoo + */ +@Slf4j +public class AggSpanStorageHook implements SpanStorageHook { + @Autowired + protected AggDispatcher aggDispatcher; + + @Override + public void beforeStorage(List spans) { + if (CollectionUtils.isEmpty(spans)) { + return; + } + Map> byTenant = new HashMap<>(); + for (SpanDO span : spans) { + String tenant = (String) span.getTags().get(SpanDO.resource(SpanDO.TENANT)); + if (StringUtils.isEmpty(tenant)) { + continue; + } + byTenant.computeIfAbsent(tenant, i -> new ArrayList<>()).add(span); + } + + byTenant.forEach((tenant, list) -> { + AuthInfo ai = new AuthInfo(); + ai.setTenant(tenant); + aggDispatcher.dispatchSpans(ai, list); + }); + } + + @Override + public void beforeStorageServiceRelation(List list) { + String name = "_span_service_relation"; + Map> byTenant = new HashMap<>(); + + for (ServiceRelationDO relation : list) { + Record record = new Record(); + record.setTimestamp(relation.getEndTime()); + record.setName(name); + + record.setTags(buildStorageServiceRelationTags(relation)); + + Map fields = Maps.newHashMapWithExpectedSize(1); + fields.put("latency", relation.getLatency()); + record.setFields(fields); + + byTenant.computeIfAbsent(relation.getTenant(), i -> new ArrayList<>()).add(record); + } + + dispatch(name, byTenant); + } + + @Override + public void beforeStorageServiceError(List list) { + String name = "_span_service_error"; + Map> byTenant = new HashMap<>(); + + for (ServiceErrorDO e : list) { + Record record = new Record(); + record.setTimestamp(e.getTimestamp()); + record.setName(name); + + record.setTags(buildStorageServiceErrorTags(e)); + + Map fields = Maps.newHashMapWithExpectedSize(1); + fields.put("latency", e.getLatency()); + record.setFields(fields); + + byTenant.computeIfAbsent(e.getTenant(), i -> new ArrayList<>()).add(record); + } + + dispatch(name, byTenant); + } + + protected Map buildStorageServiceErrorTags(ServiceErrorDO e) { + Map tags = new HashMap<>(); + tags.put("trace_id", e.getTraceId()); + tags.put("resource.service.name", e.getServiceName()); + tags.put("resource.service.instance.name", e.getServiceInstanceName()); + tags.put("endpoint_name", e.getEndpointName()); + tags.put("error_kind", e.getErrorKind()); + tags.put("resource.tenant", e.getTenant()); + return tags; + } + + protected Map buildStorageServiceRelationTags(ServiceRelationDO e) { + Map tags = new HashMap<>(); + tags.put("trace_id", e.getTraceId()); + tags.put("dest_service_name", e.getDestServiceName()); + tags.put("component", e.getComponent()); + tags.put("type", e.getType()); + tags.put("resource.tenant", e.getTenant()); + return tags; + } + + protected void dispatch(String name, Map> byTenant) { + byTenant.forEach((tenant, list) -> { + AuthInfo ai = new AuthInfo(); + ai.setTenant(tenant); + aggDispatcher.dispatchRecords(ai, name, list); + }); + } +} diff --git a/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggTaskV1Syncer2.java b/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggTaskV1Syncer2.java index 01f244b44..78d0e2f92 100644 --- a/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggTaskV1Syncer2.java +++ b/server/agg/agg-dispatcher/src/main/java/io/holoinsight/server/agg/v1/dispatcher/AggTaskV1Syncer2.java @@ -130,17 +130,17 @@ private void deltaSync0() { case 0: // del aggTasks.remove(d.aggId); - log.error("[aggtask] [sync] remove {}", d.aggId); + log.info("[aggtask] [sync] remove {}", d.aggId); ++del; break; case 1: AggTask aggTask = JSON.parseObject(d.add.get(0).getJson(), AggTask.class); aggTasks.put(d.aggId, aggTask); - log.error("[aggtask] [sync] add [{}/{}]", d.aggId, d.add.get(0).getVersion()); + log.info("[aggtask] [sync] add [{}/{}]", d.aggId, d.add.get(0).getVersion()); ++add; break; default: - log.error("[aggtask] [sync] invalid {}", d); + log.info("[aggtask] [sync] invalid {}", d); // invalid break; } diff --git a/server/agg/agg-executor-api/src/main/java/io/holoinsight/server/agg/v1/executor/output/PercentileFinalValues.java b/server/agg/agg-executor-api/src/main/java/io/holoinsight/server/agg/v1/executor/output/PercentileFinalValues.java new file mode 100644 index 000000000..477c9d34a --- /dev/null +++ b/server/agg/agg-executor-api/src/main/java/io/holoinsight/server/agg/v1/executor/output/PercentileFinalValues.java @@ -0,0 +1,42 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ +package io.holoinsight.server.agg.v1.executor.output; + +import java.util.ArrayList; +import java.util.List; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + *

+ * created at 2024/2/22 + * + * @author xzchaoo + */ +@Data +public class PercentileFinalValues { + private List values; + + public PercentileFinalValues() { + this.values = new ArrayList<>(); + } + + public PercentileFinalValues(int capacity) { + this.values = new ArrayList<>(capacity); + } + + public void add(double rank, double quantile) { + values.add(new PercentileFinalValue(rank, quantile)); + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class PercentileFinalValue { + private double rank; + private double quantile; + } +} diff --git a/server/agg/agg-executor/pom.xml b/server/agg/agg-executor/pom.xml index be247c816..56780284c 100644 --- a/server/agg/agg-executor/pom.xml +++ b/server/agg/agg-executor/pom.xml @@ -59,6 +59,11 @@ meta-facade ${project.version} + + org.apache.datasketches + datasketches-java + 5.0.1 + \ No newline at end of file diff --git a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/AggTaskExecutor.java b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/AggTaskExecutor.java index cd9c53abc..4d8e49ad8 100644 --- a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/AggTaskExecutor.java +++ b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/AggTaskExecutor.java @@ -38,6 +38,7 @@ import io.holoinsight.server.agg.v1.executor.state.AggTaskState; import io.holoinsight.server.agg.v1.executor.state.AggWindowState; import io.holoinsight.server.agg.v1.pb.AggProtos; +import io.holoinsight.server.common.ProtoJsonUtils; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -105,7 +106,11 @@ public void process(XAggTask latestAggTask, AggProtos.AggTaskValue aggTaskValue) if (aggTaskValue.hasDataTable()) { processData2(latestAggTask, aggTaskValue); } else { - processData(latestAggTask, aggTaskValue); + try { + processData(latestAggTask, aggTaskValue); + } catch (Exception e) { + log.error("[agg] [{}] error", key(), e); + } } break; } @@ -188,6 +193,11 @@ private void processData2(XAggTask latestAggTask, AggProtos.AggTaskValue aggTask } private void processData(XAggTask latestAggTask, AggProtos.AggTaskValue aggTaskValue) { + boolean debug = latestAggTask.getInner().getExtension().isDebug(); + if (debug) { + log.info("[agg] [debug] [{}] input={}", key(), ProtoJsonUtils.toJson(aggTaskValue)); + } + long now = System.currentTimeMillis(); AggWindowState lastWindowState = null; InDataNodeDataAccessor da = new InDataNodeDataAccessor(); diff --git a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/GroupField.java b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/GroupField.java index 7edc9dd83..940cf3a03 100644 --- a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/GroupField.java +++ b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/GroupField.java @@ -12,7 +12,9 @@ import io.holoinsight.server.agg.v1.core.conf.SelectItem; import io.holoinsight.server.agg.v1.core.data.DataAccessor; import io.holoinsight.server.agg.v1.core.data.LogSamples; +import io.holoinsight.server.agg.v1.executor.output.PercentileFinalValues; import io.holoinsight.server.agg.v1.executor.state.LogSamplesState; +import io.holoinsight.server.common.JsonUtils; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -25,6 +27,9 @@ @Slf4j @Data public class GroupField { + private static final double[] DEFAULT_PERCENTILE_RANKS = + {0.01, 0.05, 0.25, 0.50, 0.75, 0.95, 0.99}; + private AggFunc agg; private int input; private int count; @@ -32,6 +37,7 @@ public class GroupField { private LogSamplesState logSamples; private TopnState topn; private HllState hll; + private PercentileState percentile; public GroupField() {} @@ -103,6 +109,12 @@ public void add(DataAccessor da, SelectItem si) { hll.add(v); } break; + case AggFunc.TYPE_PERCENTILE: + if (percentile == null) { + percentile = new PercentileState(); + } + percentile.add(da.getFloat64Field()); + break; } } @@ -144,9 +156,18 @@ public Object getFinalValue() { return 0; } return hll.cardinality(); + case AggFunc.TYPE_PERCENTILE: + if (percentile == null) { + return null; + } + PercentileFinalValues pfv = new PercentileFinalValues(DEFAULT_PERCENTILE_RANKS.length); + double[] quantiles = percentile.getQuantiles(DEFAULT_PERCENTILE_RANKS); + for (int i = 0; i < DEFAULT_PERCENTILE_RANKS.length; i++) { + pfv.add(DEFAULT_PERCENTILE_RANKS[i], quantiles[i]); + } + return JsonUtils.toJson(pfv); default: throw new IllegalStateException("unsupported"); } } - } diff --git a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/PercentileState.java b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/PercentileState.java new file mode 100644 index 000000000..6b08bf490 --- /dev/null +++ b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/PercentileState.java @@ -0,0 +1,47 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ +package io.holoinsight.server.agg.v1.executor.executor; + +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.quantiles.DoublesSketch; +import org.apache.datasketches.quantiles.UpdateDoublesSketch; + +import com.esotericsoftware.kryo.DefaultSerializer; + +import io.holoinsight.server.agg.v1.executor.executor.kryo.PercentileStateSerializer; + +/** + *

+ * created at 2024/2/21 + * + * @author xzchaoo + */ +@DefaultSerializer(PercentileStateSerializer.class) +public class PercentileState { + private UpdateDoublesSketch dk; + + public PercentileState() { + dk = DoublesSketch.builder().build(); + } + + public PercentileState(byte[] bytes) { + dk = UpdateDoublesSketch.heapify(Memory.wrap(bytes)); + } + + public void add(double value) { + dk.update(value); + } + + public double getQuantile(double rank) { + return dk.getQuantile(rank); + } + + public double[] getQuantiles(double[] ranks) { + return dk.getQuantiles(ranks); + } + + public byte[] toByteArray(boolean compact) { + return dk.toByteArray(compact); + } +} diff --git a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/kryo/PercentileStateSerializer.java b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/kryo/PercentileStateSerializer.java new file mode 100644 index 000000000..8e429e903 --- /dev/null +++ b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/executor/kryo/PercentileStateSerializer.java @@ -0,0 +1,34 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ +package io.holoinsight.server.agg.v1.executor.executor.kryo; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import io.holoinsight.server.agg.v1.executor.executor.PercentileState; + +/** + * Kryo serializer for PercentileState + *

+ * created at 2024/2/21 + * + * @author xzchaoo + */ +public class PercentileStateSerializer extends Serializer { + @Override + public void write(Kryo kryo, Output output, PercentileState obj) { + byte[] bytes = obj.toByteArray(false); + output.writeVarInt(bytes.length, true); + output.writeBytes(bytes); + } + + @Override + public PercentileState read(Kryo kryo, Input input, Class type) { + int length = input.readVarInt(true); + byte[] bytes = input.readBytes(length); + return new PercentileState(bytes); + } +} diff --git a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/output/AsyncOutput.java b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/output/AsyncOutput.java index d86e29b01..01848052d 100644 --- a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/output/AsyncOutput.java +++ b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/output/AsyncOutput.java @@ -46,19 +46,15 @@ public class AsyncOutput { private static final int MAX_REQUEST_SIZE = 10 * 1024 * 1024; private static final Duration POLL_TIMEOUT = Duration.ofSeconds(2); private final CountDownLatch cdl = new CountDownLatch(1); - + private final XConsoleOutput consoleOutput = new XConsoleOutput(); private KafkaConsumer consumer; private KafkaProducer producer; private volatile boolean stopped; - @Autowired private AggProperties aggProperties; - private String outputTopic; - @Autowired(required = false) private List outputs = new ArrayList<>(); - private transient Map outputMap = new HashMap<>(); public AsyncOutput() {} @@ -190,8 +186,10 @@ private void write0(XOutput.Batch batch) { } public void write(XOutput.Batch batch) { - if (aggProperties.isDebugOutput()) { - new XConsoleOutput().write(batch); + if (aggProperties.isDebugOutput() || XConsoleOutput.TYPE.equals(batch.getOi().getType())) { + // write to console of current instance directly + // avoid logging on another machine + consoleOutput.write(batch); return; } diff --git a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/output/XConsoleOutput.java b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/output/XConsoleOutput.java index 9dd01bef4..74bce2174 100644 --- a/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/output/XConsoleOutput.java +++ b/server/agg/agg-executor/src/main/java/io/holoinsight/server/agg/v1/executor/output/XConsoleOutput.java @@ -7,11 +7,10 @@ import org.apache.commons.text.StringSubstitutor; -import com.alibaba.fastjson.JSON; - import io.holoinsight.server.agg.v1.core.Utils; import io.holoinsight.server.agg.v1.core.conf.OutputItem; import io.holoinsight.server.agg.v1.core.data.AggTaskKey; +import io.holoinsight.server.common.JsonUtils; import lombok.extern.slf4j.Slf4j; /** @@ -49,8 +48,8 @@ public void write(Batch batch) { key, // oi.getName(), // Utils.formatTimeShort(w.getTimestamp()), // - g.getTags(), // - JSON.toJSONString(finalFields)); // + JsonUtils.toJson(g.getTags()), // + JsonUtils.toJson(finalFields)); // } else { for (Map.Entry e : finalFields.entrySet()) { String metricName; @@ -64,7 +63,8 @@ public void write(Batch batch) { key, // oi.getName(), // Utils.formatTimeShort(w.getTimestamp()), // - metricName, g.getTags(), // + metricName, // + JsonUtils.toJson(g.getTags()), // e.getValue()); // } } diff --git a/server/apm/apm-boot/src/main/java/io/holoinsight/server/apm/bootstrap/HoloinsightApmConfiguration.java b/server/apm/apm-boot/src/main/java/io/holoinsight/server/apm/bootstrap/HoloinsightApmConfiguration.java index 03540f085..753d4f68f 100644 --- a/server/apm/apm-boot/src/main/java/io/holoinsight/server/apm/bootstrap/HoloinsightApmConfiguration.java +++ b/server/apm/apm-boot/src/main/java/io/holoinsight/server/apm/bootstrap/HoloinsightApmConfiguration.java @@ -3,9 +3,16 @@ */ package io.holoinsight.server.apm.bootstrap; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Lazy; +import org.springframework.context.annotation.Primary; + +import io.holoinsight.server.apm.core.ApmConfig; import io.holoinsight.server.apm.core.installer.ModelInstallManager; import io.holoinsight.server.apm.engine.elasticsearch.storage.impl.CommonBuilder; import io.holoinsight.server.apm.engine.elasticsearch.storage.impl.SpanMetricEsStorage; +import io.holoinsight.server.apm.engine.storage.SpanStorageHookManager; import io.holoinsight.server.apm.receiver.analysis.RelationAnalysis; import io.holoinsight.server.apm.receiver.analysis.ServiceErrorAnalysis; import io.holoinsight.server.apm.receiver.analysis.SlowSqlAnalysis; @@ -27,10 +34,6 @@ import io.holoinsight.server.apm.server.service.impl.VirtualComponentServiceImpl; import io.holoinsight.server.common.springboot.ConditionalOnFeature; import io.holoinsight.server.common.springboot.ConditionalOnRole; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Lazy; -import org.springframework.context.annotation.Primary; @ConditionalOnRole("apm") @ConditionalOnFeature("trace") @@ -117,6 +120,16 @@ public PublicAttr publicAttr() { return new PublicAttr(); } + @Bean + public ApmConfig apmConfig() { + return new ApmConfig(); + } + + @Bean + public SpanStorageHookManager spanStorageHookManager() { + return new SpanStorageHookManager(); + } + @Bean("spanHandler") @Lazy public SpanHandler spanHandler() throws InterruptedException { diff --git a/server/apm/apm-server/apm-core/pom.xml b/server/apm/apm-server/apm-core/pom.xml index eb85471af..c868c7ddf 100644 --- a/server/apm/apm-server/apm-core/pom.xml +++ b/server/apm/apm-server/apm-core/pom.xml @@ -12,6 +12,11 @@ apm-core + + io.holoinsight.server + common + ${project.version} + io.holoinsight.server apm-common @@ -25,5 +30,9 @@ org.slf4j slf4j-api + + org.springframework.boot + spring-boot + \ No newline at end of file diff --git a/server/apm/apm-server/apm-core/src/main/java/io/holoinsight/server/apm/core/ApmConfig.java b/server/apm/apm-server/apm-core/src/main/java/io/holoinsight/server/apm/core/ApmConfig.java new file mode 100644 index 000000000..e158a6182 --- /dev/null +++ b/server/apm/apm-server/apm-core/src/main/java/io/holoinsight/server/apm/core/ApmConfig.java @@ -0,0 +1,35 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ +package io.holoinsight.server.apm.core; + +import java.util.HashSet; +import java.util.Set; + +import org.springframework.boot.context.properties.bind.Binder; + +import com.xzchaoo.commons.basic.config.spring.AbstractConfig; + +import lombok.Data; +import lombok.Getter; + +/** + *

+ * created at 2024/2/26 + * + * @author xzchaoo + */ +@Getter +public class ApmConfig extends AbstractConfig { + private volatile Materialize materialize = new Materialize(); + + @Override + protected void refresh(Binder binder) { + binder.bind("apm.materialize", Materialize.class).ifBound(x -> materialize = x); + } + + @Data + public static class Materialize { + private Set blacklist = new HashSet<>(); + } +} diff --git a/server/apm/apm-server/apm-engine/src/main/java/io/holoinsight/server/apm/engine/postcal/MetricsManager.java b/server/apm/apm-server/apm-engine/src/main/java/io/holoinsight/server/apm/engine/postcal/MetricsManager.java index f29ada023..bcc85485e 100644 --- a/server/apm/apm-server/apm-engine/src/main/java/io/holoinsight/server/apm/engine/postcal/MetricsManager.java +++ b/server/apm/apm-server/apm-engine/src/main/java/io/holoinsight/server/apm/engine/postcal/MetricsManager.java @@ -3,12 +3,17 @@ */ package io.holoinsight.server.apm.engine.postcal; -import com.google.gson.reflect.TypeToken; -import io.holoinsight.server.apm.common.utils.GsonUtils; -import io.holoinsight.server.common.dao.entity.MetaDataDictValue; -import io.holoinsight.server.common.service.SuperCacheService; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.PostConstruct; + import org.apache.commons.io.IOUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -16,10 +21,14 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; -import java.io.IOException; -import java.nio.charset.Charset; -import java.util.*; +import com.google.gson.reflect.TypeToken; + +import io.holoinsight.server.apm.common.utils.GsonUtils; +import io.holoinsight.server.apm.core.ApmConfig; +import io.holoinsight.server.common.dao.entity.MetaDataDictValue; +import io.holoinsight.server.common.service.SuperCacheService; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; @Slf4j @Component @@ -31,20 +40,36 @@ public class MetricsManager { @Autowired private SuperCacheService superCacheService; + @Autowired + private ApmConfig apmConfig; + @Getter - private Map metricDefines = new HashMap<>(); + private volatile Map metricDefines = new HashMap<>(); + private Map localMetricDefines = new HashMap<>(); @PostConstruct - private List loadFromResource() throws IOException { - List materializedMetrics; - String metricDefineJson = IOUtils.toString(resource.getInputStream(), Charset.defaultCharset()); - materializedMetrics = GsonUtils.get().fromJson(metricDefineJson, + public void init() throws IOException { + String metricDefineJson; + try (InputStream is = resource.getInputStream()) { + metricDefineJson = IOUtils.toString(is, Charset.defaultCharset()); + } + List materializedMetrics = GsonUtils.get().fromJson(metricDefineJson, new TypeToken>() {}.getType()); log.info("[apm] load metric definitions from resource: {}", materializedMetrics); - materializedMetrics.forEach(materializedMetric -> this.metricDefines - .put(materializedMetric.getName(), materializedMetric)); - return materializedMetrics; + for (MetricDefine md : materializedMetrics) { + localMetricDefines.put(md.getName(), md); + } + Map metricDefines = new HashMap<>(localMetricDefines); + updateMetricDefines(metricDefines); + } + + private void updateMetricDefines(Map metricDefines) { + Set blacklist = apmConfig.getMaterialize().getBlacklist(); + if (!blacklist.isEmpty()) { + metricDefines.keySet().removeIf(metric -> blacklist.stream().anyMatch(metric::startsWith)); + } + this.metricDefines = metricDefines; } /** @@ -53,6 +78,7 @@ private List loadFromResource() throws IOException { */ @Scheduled(initialDelay = 10000L, fixedDelay = 60000L) private void loadFromDB() { + Map metricDefines = new HashMap<>(localMetricDefines); try { MetaDataDictValue metricDefineDictVal = superCacheService.getSc().metaDataDictValueMap .getOrDefault("global_config", new HashMap<>()).get("apm_materialized_metrics"); @@ -60,12 +86,14 @@ private void loadFromDB() { List materializedMetrics = GsonUtils.get().fromJson( metricDefineDictVal.getDictValue(), new TypeToken>() {}.getType()); log.info("[apm] load metric definitions from database: {}", materializedMetrics); - materializedMetrics.forEach(materializedMetric -> this.metricDefines - .put(materializedMetric.getName(), materializedMetric)); + for (MetricDefine md : materializedMetrics) { + metricDefines.put(md.getName(), md); + } } } catch (Exception e) { log.info("[apm] load metric definitions from database fail", e); } + updateMetricDefines(metricDefines); } public List listMetrics() { diff --git a/server/apm/apm-server/apm-engine/src/main/java/io/holoinsight/server/apm/engine/storage/SpanStorageHook.java b/server/apm/apm-server/apm-engine/src/main/java/io/holoinsight/server/apm/engine/storage/SpanStorageHook.java new file mode 100644 index 000000000..a16797f10 --- /dev/null +++ b/server/apm/apm-server/apm-engine/src/main/java/io/holoinsight/server/apm/engine/storage/SpanStorageHook.java @@ -0,0 +1,24 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ +package io.holoinsight.server.apm.engine.storage; + +import java.util.List; + +import io.holoinsight.server.apm.engine.model.ServiceErrorDO; +import io.holoinsight.server.apm.engine.model.ServiceRelationDO; +import io.holoinsight.server.apm.engine.model.SpanDO; + +/** + *

+ * created at 2024/2/5 + * + * @author xzchaoo + */ +public interface SpanStorageHook { + void beforeStorage(List spans); + + void beforeStorageServiceRelation(List list); + + void beforeStorageServiceError(List list); +} diff --git a/server/apm/apm-server/apm-engine/src/main/java/io/holoinsight/server/apm/engine/storage/SpanStorageHookManager.java b/server/apm/apm-server/apm-engine/src/main/java/io/holoinsight/server/apm/engine/storage/SpanStorageHookManager.java new file mode 100644 index 000000000..d87565586 --- /dev/null +++ b/server/apm/apm-server/apm-engine/src/main/java/io/holoinsight/server/apm/engine/storage/SpanStorageHookManager.java @@ -0,0 +1,68 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ +package io.holoinsight.server.apm.engine.storage; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.beans.factory.annotation.Autowired; + +import io.holoinsight.server.apm.engine.model.ServiceErrorDO; +import io.holoinsight.server.apm.engine.model.ServiceRelationDO; +import io.holoinsight.server.apm.engine.model.SpanDO; +import lombok.extern.slf4j.Slf4j; + +/** + *

+ * created at 2024/2/5 + * + * @author xzchaoo + */ +@Slf4j +public class SpanStorageHookManager { + @Autowired(required = false) + private List hooks = new ArrayList<>(); + + public void beforeStorage(List spans) { + if (spans == null || spans.isEmpty()) { + return; + } + + for (SpanStorageHook hook : hooks) { + try { + hook.beforeStorage(spans); + } catch (Exception e) { + log.error("SpanStorageHook error", e); + } + } + } + + public void beforeStorageServiceRelation(List list) { + if (list == null || list.isEmpty()) { + return; + } + + for (SpanStorageHook hook : hooks) { + try { + hook.beforeStorageServiceRelation(list); + } catch (Exception e) { + log.error("SpanStorageHook error", e); + } + } + } + + public void beforeStorageServiceError(List list) { + if (list == null || list.isEmpty()) { + return; + } + + for (SpanStorageHook hook : hooks) { + try { + hook.beforeStorageServiceError(list); + } catch (Exception e) { + log.error("SpanStorageHook error", e); + } + } + } +} diff --git a/server/apm/apm-server/apm-receiver/src/main/java/io/holoinsight/server/apm/receiver/trace/SpanHandler.java b/server/apm/apm-server/apm-receiver/src/main/java/io/holoinsight/server/apm/receiver/trace/SpanHandler.java index ee522c688..7cc3711e8 100644 --- a/server/apm/apm-server/apm-receiver/src/main/java/io/holoinsight/server/apm/receiver/trace/SpanHandler.java +++ b/server/apm/apm-server/apm-receiver/src/main/java/io/holoinsight/server/apm/receiver/trace/SpanHandler.java @@ -3,6 +3,20 @@ */ package io.holoinsight.server.apm.receiver.trace; +import static io.holoinsight.server.apm.receiver.common.TransformAttr.anyValueToString; +import static io.holoinsight.server.apm.receiver.common.TransformAttr.convertKeyValue; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.time.StopWatch; +import org.springframework.beans.factory.annotation.Autowired; + import io.holoinsight.server.apm.common.constants.Const; import io.holoinsight.server.apm.common.model.specification.otel.Event; import io.holoinsight.server.apm.common.model.specification.otel.KeyValue; @@ -22,6 +36,7 @@ import io.holoinsight.server.apm.engine.model.ServiceRelationDO; import io.holoinsight.server.apm.engine.model.SlowSqlDO; import io.holoinsight.server.apm.engine.model.SpanDO; +import io.holoinsight.server.apm.engine.storage.SpanStorageHookManager; import io.holoinsight.server.apm.receiver.analysis.RelationAnalysis; import io.holoinsight.server.apm.receiver.analysis.ServiceErrorAnalysis; import io.holoinsight.server.apm.receiver.analysis.SlowSqlAnalysis; @@ -38,19 +53,6 @@ import io.holoinsight.server.common.ctl.ProductCtlService; import io.opentelemetry.proto.trace.v1.ScopeSpans; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.time.StopWatch; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static io.holoinsight.server.apm.receiver.common.TransformAttr.anyValueToString; -import static io.holoinsight.server.apm.receiver.common.TransformAttr.convertKeyValue; @Slf4j public class SpanHandler { @@ -77,6 +79,8 @@ public class SpanHandler { private ServiceErrorService serviceErrorService; @Autowired private ProductCtlService productCtlService; + @Autowired + private SpanStorageHookManager spanStorageHookManager; public void handleResourceSpans( List resourceSpansList) { @@ -195,6 +199,7 @@ public void buildRelation(List relationBuilders) throws } private void storageSpan(List spans) throws Exception { + spanStorageHookManager.beforeStorage(spans); traceService.insertSpans(spans); } @@ -204,6 +209,7 @@ public void storageNetworkMapping(List networkAddressMa } public void storageServiceRelation(List relationList) throws Exception { + spanStorageHookManager.beforeStorageServiceRelation(relationList); serviceRelationService.insert(relationList); } @@ -221,6 +227,7 @@ public void storageSlowSql(List slowSqlEsDOList) throws Exception { } public void storageServiceError(List errorInfoDOList) throws Exception { + spanStorageHookManager.beforeStorageServiceError(errorInfoDOList); serviceErrorService.insert(errorInfoDOList); } diff --git a/server/common/common/src/main/java/io/holoinsight/server/common/threadpool/ThreadPoolConfiguration.java b/server/common/common/src/main/java/io/holoinsight/server/common/threadpool/ThreadPoolConfiguration.java index 3618a8b35..7dc5608ff 100644 --- a/server/common/common/src/main/java/io/holoinsight/server/common/threadpool/ThreadPoolConfiguration.java +++ b/server/common/common/src/main/java/io/holoinsight/server/common/threadpool/ThreadPoolConfiguration.java @@ -5,6 +5,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; /** *

@@ -23,4 +25,9 @@ public class ThreadPoolConfiguration { public CommonThreadPools commonThreadPools() { return new CommonThreadPools(); } + + @Bean + public TaskScheduler taskScheduler(CommonThreadPools commonThreadPools) { + return new ConcurrentTaskScheduler(commonThreadPools.getScheduler()); + } } diff --git a/server/extension/extension-storage/src/main/java/io/holoinsight/server/extension/model/Record.java b/server/extension/extension-storage/src/main/java/io/holoinsight/server/extension/model/Record.java new file mode 100644 index 000000000..13f842f8c --- /dev/null +++ b/server/extension/extension-storage/src/main/java/io/holoinsight/server/extension/model/Record.java @@ -0,0 +1,23 @@ +/* + * Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0. + */ +package io.holoinsight.server.extension.model; + +import java.util.Map; + +import lombok.Data; + +/** + *

+ * created at 2024/2/23 + * + * @author xzchaoo + */ +@Data +public class Record { + private long timestamp; + private String name; + private Map tags; + // value must be number of string + private Map fields; +}