Skip to content

Commit

Permalink
refactor: agg dispatcher (#701)
Browse files Browse the repository at this point in the history
  • Loading branch information
xzchaoo authored Oct 19, 2023
1 parent 47a49e8 commit 9fae622
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public String type() {

@Override
public void write(Batch batch) {
log.info("xxx");
WriteMetricsParam param = new WriteMetricsParam();
param.setTenant(batch.key.getTenant());
List<WriteMetricsParam.Point> points = new ArrayList<>(batch.groups.size());
Expand Down
124 changes: 0 additions & 124 deletions server/agg/agg-boot/src/main/java/temp.groovy

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ public void dispatch(AuthInfo authInfo, WriteMetricsRequestV4 request) {
ProducerRecord<AggTaskKey, AggProtos.AggTaskValue> record =
new ProducerRecord<>(aggProperties.getTopic(), aggKey, aggTaskValue.build());
kafkaProducer.send(record);
StatUtils.KAFKA_SEND.add(StringsKey.of("v4"), Stats.V_1);
StatUtils.KAFKA_SEND.add(StringsKey.of("v4"),
new long[] {1, aggTaskValue.getInDataNodesCount()});
}
}
}
Expand Down Expand Up @@ -204,7 +205,8 @@ public void dispatch(AuthInfo authInfo, WriteMetricsRequestV1 request) {
ProducerRecord<AggTaskKey, AggProtos.AggTaskValue> record =
new ProducerRecord<>(aggProperties.getTopic(), aggTaskKey, taskValue);
kafkaProducer.send(record);
StatUtils.KAFKA_SEND.add(StringsKey.of("v1"), Stats.V_1);
StatUtils.KAFKA_SEND.add(StringsKey.of("v1"),
new long[] {1, taskValue.getInDataNodesCount()});
}
}
}
Expand Down Expand Up @@ -291,9 +293,14 @@ public void dispatchDetailData(AuthInfo authInfo, GatewayHook.Table table) {
ProducerRecord<AggTaskKey, AggProtos.AggTaskValue> record =
new ProducerRecord<>(aggProperties.getTopic(), aggTaskKey, aggTaskValue);
kafkaProducer.send(record);
StatUtils.KAFKA_SEND.add(StringsKey.of("detail"), new long[] {1, table.getRows().size()});
}
}

public boolean supportsDetail(String name) {
return CollectionUtils.isNotEmpty(this.storage.getByMetric(name));
}

@NotNull
private static String buildPartition(StringBuilder sb, AuthInfo authInfo, AggTask aggTask) {
if (sb == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,9 @@ public void writeMetricsV4(AuthInfo authInfo, WriteMetricsRequestV4 request) {
public void writeDetail(AuthInfo authInfo, Table table) {
aggDispatcher.dispatchDetailData(authInfo, table);
}

@Override
public boolean supportsDetail(String name) {
return aggDispatcher.supportsDetail(name);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public Map<String, List<XSelectItem>> getSelectItem(String metric) {
XSelectItem item = items.get(index);
SelectItem.Elect e = item.getInner().getElect();

if (item.getInner().getAgg().getTypeInt() == AggFunc.TYPE_HLL) {
int aggType = item.getInner().getAgg().getTypeInt();
if (aggType == AggFunc.TYPE_HLL || aggType == AggFunc.TYPE_COUNT) {
electToItemMap.computeIfAbsent(e.getMetric(), i -> new HashMap<>())
.computeIfAbsent(NO_FIELD, i -> new LinkedList<>()) //
.add(item);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.holoinsight.server.gateway.core.grpc;

import java.util.List;
import java.util.Map;

import io.holoinsight.server.common.auth.AuthInfo;
import io.holoinsight.server.gateway.grpc.WriteMetricsRequestV1;
Expand Down Expand Up @@ -35,6 +34,8 @@ public interface GatewayHook {

void writeDetail(AuthInfo authInfo, Table table);

boolean supportsDetail(String name);

@lombok.Data
class Table {
private String name;
Expand All @@ -47,6 +48,13 @@ class Table {
class Header {
private List<String> tagKeys;
private List<String> fieldKeys;

public Header() {}

public Header(List<String> tagKeys, List<String> fieldKeys) {
this.tagKeys = tagKeys;
this.fieldKeys = fieldKeys;
}
}

@lombok.Data
Expand Down

0 comments on commit 9fae622

Please sign in to comment.