Skip to content

Commit

Permalink
feat: apm agg
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfeng.xzc authored and xzchaoo committed Feb 26, 2024
1 parent 4d58755 commit f0ca11a
Show file tree
Hide file tree
Showing 28 changed files with 772 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
package io.holoinsight.server.agg.v1.executor.executor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.commons.text.StringSubstitutor;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -16,8 +18,10 @@
import com.alibaba.fastjson.serializer.SerializerFeature;

import io.holoinsight.server.agg.v1.core.Utils;
import io.holoinsight.server.agg.v1.core.conf.OutputField;
import io.holoinsight.server.agg.v1.executor.output.AggStringLookup;
import io.holoinsight.server.agg.v1.executor.output.MergedCompleteness;
import io.holoinsight.server.agg.v1.executor.output.PercentileFinalValues;
import io.holoinsight.server.agg.v1.executor.output.XOutput;
import io.holoinsight.server.extension.MetricStorage;
import io.holoinsight.server.extension.model.WriteMetricsParam;
Expand Down Expand Up @@ -55,13 +59,16 @@ public void write(Batch batch) {

String ts = Utils.formatTimeShort(batch.window.getTimestamp());

Map<String, OutputField> fieldMap = convertToMap(batch.oi.getFields());

boolean debug = false;

int discard = 0;
Set<String> usedMetricNames = new HashSet<>();
for (Group g : batch.groups) {
Map<String, Object> finalFields = g.getFinalFields();
for (Map.Entry<String, Object> e : finalFields.entrySet()) {
OutputField outputField = fieldMap.get(e.getKey());
Object value = e.getValue();
if (value == null) {
continue;
Expand All @@ -76,8 +83,29 @@ public void write(Batch batch) {
}

usedMetricNames.add(metricName);

try {
if (outputField != null && OutputField.PERCENTILE.equals(outputField.getType())) {
PercentileFinalValues pfv =
JSON.parseObject((String) value, PercentileFinalValues.class);
for (PercentileFinalValues.PercentileFinalValue v : pfv.getValues()) {


WriteMetricsParam.Point p = new WriteMetricsParam.Point();
p.setMetricName(metricName);
p.setTimeStamp(batch.window.getTimestamp());

// add extra tag 'percent'
Map<String, String> tags = new HashMap<>(g.getTags().asMap());
tags.put("percent", Integer.toString((int) (100 * v.getRank())));
p.setTags(tags);

p.setValue(v.getQuantile());

points.add(p);
}
continue;
}

if (value instanceof Number) {
} else if (value instanceof String) {
} else {
Expand All @@ -100,8 +128,10 @@ public void write(Batch batch) {
batch.key, ts, metricName, g.getTags(), value);
}
points.add(p);
} catch (IllegalArgumentException ex) {
} catch (Exception ex) {
discard++;
log.error("[output] [MetricStorage] agg=[{}] ts=[{}] metric=[{}] error", //
batch.key, ts, metricName, ex);
}
}
}
Expand Down Expand Up @@ -150,4 +180,8 @@ public void write(Batch batch) {

}
}

private static Map<String, OutputField> convertToMap(List<OutputField> list) {
return list.stream().collect(Collectors.toMap(OutputField::getName, x -> x, (a, b) -> a));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class AggFunc {
public static final int TYPE_TOPN = 8;
public static final int TYPE_AVG_MERGE = 9;
public static final int TYPE_HLL = 10;
public static final int TYPE_PERCENTILE = 11;

private static final Map<String, Integer> TYPE_TO_INT = new HashMap<>();

Expand All @@ -48,6 +49,7 @@ public class AggFunc {
TYPE_TO_INT.put("TOPN", 8);
TYPE_TO_INT.put("AVG_MERGE", 9);
TYPE_TO_INT.put("HLL", 10);
TYPE_TO_INT.put("PERCENTILE", 11);
}

private String type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ public class AggTask {
*/
private FillZero fillZero = new FillZero();

private Extension extension = new Extension();

public AggTask() {}

public boolean hasGroupBy() {
return groupBy != null && !groupBy.isEmpty();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/
package io.holoinsight.server.agg.v1.core.conf;

import lombok.Data;

/**
* Agg Task Extension
* <p>
* created at 2024/2/20
*
* @author xzchaoo
*/
@Data
public class Extension {
/**
* Whether to enter debug mode
*/
private boolean debug;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;

import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -20,6 +21,9 @@
*/
@Data
public class OutputField {
public static final String DEFAULT = "";
public static final String PERCENTILE = "PERCENTILE";

@Nonnull
private String name;
/**
Expand All @@ -28,10 +32,21 @@ public class OutputField {
@Nonnull
private String expression;

@Getter
@Setter
@Getter(AccessLevel.NONE)
@Setter(AccessLevel.NONE)
private transient Expression compiledExpression;

/**
* Final data type
* <ul>
* <li>null or "" or "DEFAULT": data of number and string types are supported, and data of other
* types will be uniformly converted to string.</li>
* <li>PERCENTILE: percentile requires the output result to be null, or `Map<String, Double>`
* where Key is the string format of the percentile, such as "99" "95"</li>
* </ul>
*/
private String type = DEFAULT;

public OutputField() {}

public OutputField(String name, String expression) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public class InDataNodeDataAccessor implements DataAccessor {

public void replace(AggProtos.InDataNode in) {
this.in = in;
this.fieldName = null;
this.bf = null;
}

@Override
Expand Down Expand Up @@ -76,6 +78,7 @@ public String getStringField(String name) {
public void bindFieldName(String fieldName) {
if (!isSingleValue()) {
this.fieldName = fieldName;
this.bf = null;
}
}

Expand Down Expand Up @@ -119,6 +122,10 @@ public String getStringField() {
private void ensureBf() {
if (bf == null) {
bf = in.getFieldsOrDefault(fieldName, null);
// if bf is null, we have to treat it as a default value
if (bf == null) {
bf = AggProtos.BasicField.getDefaultInstance();
}
}
}
}
5 changes: 5 additions & 0 deletions server/agg/agg-dispatcher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<artifactId>registry-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.holoinsight.server</groupId>
<artifactId>apm-engine</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.springframework.beans.factory.annotation.Autowired;

import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.xzchaoo.commons.stat.StringsKey;

import io.holoinsight.server.agg.v1.core.AggProperties;
Expand All @@ -31,8 +32,10 @@
import io.holoinsight.server.agg.v1.core.data.AggValuesSerdes;
import io.holoinsight.server.agg.v1.core.kafka.KafkaProducerHealthChecker;
import io.holoinsight.server.agg.v1.pb.AggProtos;
import io.holoinsight.server.apm.engine.model.SpanDO;
import io.holoinsight.server.common.auth.AuthInfo;
import io.holoinsight.server.common.threadpool.CommonThreadPools;
import io.holoinsight.server.extension.model.Record;
import io.holoinsight.server.extension.model.Row;
import io.holoinsight.server.extension.model.Table;
import io.holoinsight.server.gateway.core.utils.StatUtils;
Expand Down Expand Up @@ -322,12 +325,128 @@ private boolean send(AggTaskKey key, AggProtos.AggTaskValue value) {
count += value.getDataTable().getRowCount();
}
log.error("[agg] [{}] write kafka error, metric=[{}] size=[{}]", //
key, value.getMetric(), count);
key, value.getMetric(), count, exception);
}
});
return true;
}

public void dispatchRecords(AuthInfo ai, String name, List<Record> records) {
List<AggTask> aggTasks = storage.getByMetric(name);
if (aggTasks.isEmpty()) {
return;
}

StringBuilder sb = new StringBuilder();
for (AggTask aggTask : aggTasks) {
String partition = buildPartition(sb, ai, aggTask);
AggTaskKey aggTaskKey = new AggTaskKey(ai.getTenant(), aggTask.getAggId(), partition);

AggProtos.AggTaskValue.Builder aggTaskValue =
AggProtos.AggTaskValue.newBuilder().setMetric(name);

for (Record record : records) {
AggProtos.InDataNode.Builder b = AggProtos.InDataNode.newBuilder() //
.setType(2) //
.setTimestamp(record.getTimestamp());

record.getTags().forEach((k, v) -> {
if (v != null) {
b.putTags(k, v);
}
});

record.getFields().forEach((k, v) -> {
if (v instanceof Number) {
putField(b, k, ((Number) v).doubleValue());
} else if (v instanceof String) {
putField(b, k, v.toString());
}
});

aggTaskValue.addInDataNodes(b); //
}

AggProtos.AggTaskValue taskValue = aggTaskValue.build();

if (send(aggTaskKey, taskValue)) {
if (aggTask.getExtension().isDebug()) {
log.info("send to kafka {} {}", aggTaskKey.getAggId(), taskValue.getInDataNodesCount());
}
StatUtils.KAFKA_SEND.add(StringsKey.of("v1"),
new long[] {1, taskValue.getInDataNodesCount()});
}
}
}

public void dispatchSpans(AuthInfo ai, List<SpanDO> spans) {
if (CollectionUtils.isEmpty(spans)) {
return;
}

String name = "_span";
List<AggTask> aggTasks = storage.getByMetric(name);
if (aggTasks.isEmpty()) {
return;
}

StringBuilder sb = new StringBuilder();
for (AggTask aggTask : aggTasks) {
String partition = buildPartition(sb, ai, aggTask);
AggTaskKey aggTaskKey = new AggTaskKey(ai.getTenant(), aggTask.getAggId(), partition);

AggProtos.AggTaskValue.Builder aggTaskValue =
AggProtos.AggTaskValue.newBuilder().setMetric(name);

for (SpanDO span : spans) {
AggProtos.InDataNode.Builder b = AggProtos.InDataNode.newBuilder() //
.setType(2) //
.setTimestamp(span.getEndTime());

maybePutTag(b, "trace_id", span.getTraceId());
maybePutTag(b, "kind", span.getKind());
maybePutTag(b, "name", span.getName());
maybePutTag(b, "trace_status", Integer.toString(span.getTraceStatus()));

putField(b, "latency", span.getLatency());
for (Map.Entry<String, Object> e : span.getTags().entrySet()) {
Object value = e.getValue();
if (value instanceof String) {
maybePutTag(b, e.getKey(), (String) e.getValue());
}
}

aggTaskValue.addInDataNodes(b); //
}

AggProtos.AggTaskValue taskValue = aggTaskValue.build();

if (send(aggTaskKey, taskValue)) {
if (aggTask.getExtension().isDebug()) {
log.info("send to kafka {} {}", aggTaskKey.getAggId(), taskValue.getInDataNodesCount());
}
StatUtils.KAFKA_SEND.add(StringsKey.of("v1"),
new long[] {1, taskValue.getInDataNodesCount()});
}
}

}

private static void maybePutTag(AggProtos.InDataNode.Builder b, String key, String value) {
if (value != null) {
b.putTags(key, value);
}
}

private static void putField(AggProtos.InDataNode.Builder b, String key, String value) {
b.putFields(key, AggProtos.BasicField.newBuilder().setType(1)
.setBytesValue(ByteString.copyFromUtf8(value)).build());
}

private static void putField(AggProtos.InDataNode.Builder b, String key, double value) {
b.putFields(key, AggProtos.BasicField.newBuilder().setType(0).setFloat64Value(value).build());
}

@NotNull
private static String buildPartition(StringBuilder sb, AuthInfo authInfo, AggTask aggTask) {
if (sb == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package io.holoinsight.server.agg.v1.dispatcher;

import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -59,4 +60,11 @@ public AggConfig aggConfig() {
public AggDispatcherMockDataGenerator aggDispatcherMockDataGenerator() {
return new AggDispatcherMockDataGenerator();
}

@ConditionalOnMissingBean
@ConditionalOnRole("apm")
@Bean
public AggSpanStorageHook aggSpanStorageHook() {
return new AggSpanStorageHook();
}
}
Loading

0 comments on commit f0ca11a

Please sign in to comment.