Skip to content

Commit

Permalink
refactor: optimize metric metering switch (#784)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsy1001de authored Jan 25, 2024
1 parent 7e8a3a8 commit d12ed21
Show file tree
Hide file tree
Showing 17 changed files with 198 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class CeresdbDetailsStorage implements DetailsStorage {
@Autowired
private CeresdbxClientManager ceresdbxClientManager;
@Autowired
private MetricMeterService meterService;
private MetricMeterService metricMeterService;
private Cache<String, Boolean> cache = //
CacheBuilder.newBuilder() //
.expireAfterWrite(Duration.ofHours(1)) //
Expand All @@ -57,7 +57,7 @@ public CeresdbDetailsStorage(String forceTenant) {

@Override
public Mono<Void> write(String tenant, Table table) {
meterService.meter(tenant, table);
metricMeterService.meter(tenant, table);

String forceTenant = StringUtils.firstNonEmpty(this.forceTenant, tenant);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class CeresdbxMetricStorage implements MetricStorage {
private PqlQueryService pqlQueryService;

@Autowired(required = false)
private MetricMeterService meterService;
private MetricMeterService metricMeterService;

public CeresdbxMetricStorage(CeresdbxClientManager ceresdbxClientManager) {
this.ceresdbxClientManager = ceresdbxClientManager;
Expand Down Expand Up @@ -111,8 +111,8 @@ public Mono<Void> write(WriteMetricsParam writeMetricsParam) {
dps = 0;
}
}
if (meterService != null && !writeMetricsParam.isFree()) {
meterService.meter(writeMetricsParam);
if (metricMeterService != null && !writeMetricsParam.isFree()) {
metricMeterService.meter(writeMetricsParam);
}
return Mono.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.holoinsight.server.extension.ceresdbx;

import io.holoinsight.server.common.dao.mapper.TenantOpsMapper;
import io.holoinsight.server.extension.MetricMeterService;
import io.holoinsight.server.extension.MetricStorage;
import io.holoinsight.server.extension.promql.PqlQueryService;
import io.holoinsight.server.extension.promql.RemotePqlConfiguration;
Expand Down Expand Up @@ -33,4 +34,8 @@ public MetricStorage ceresdbxMetricStorage(CeresdbxClientManager ceresdbxClientM
return new CeresdbxMetricStorage(ceresdbxClientManager, pqlQueryService);
}

@Bean
public MetricMeterService metricMeterService() {
return new MetricMeterServiceImpl();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2022 Holoinsight Project Authors. Licensed under Apache-2.0.
*/

package io.holoinsight.server.extension.ceresdbx;

import io.holoinsight.server.extension.MetricMeterService;
import io.holoinsight.server.extension.model.Table;
import io.holoinsight.server.extension.model.WriteMetricsParam;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;

/**
* @author jsy1001de
* @version 1.0: MetricMeterServiceImpl.java, Date: 2024-01-25 Time: 20:04
*/

@Slf4j
public class MetricMeterServiceImpl implements MetricMeterService {
@Override
public void meter(WriteMetricsParam writeMetricsParam) {

}

@Override
public void meter(String tenant, Table table) {

}

@Override
public void meter(Map<String, String> tags, String tenant, String metricName) {

}

@Override
public Map<String, String> keyGen(String tenant, String name, Map<String, String> tags) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ public interface MetricMeterService {
* @param tags
*/
void meter(Map<String, String> tags, String tenant, String metricName);

Map<String, String> keyGen(String tenant, String name, Map<String, String> tags);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
package io.holoinsight.server.gateway.core.grpc;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.holoinsight.server.common.ctl.MonitorProductCode;
import io.holoinsight.server.common.ctl.ProductCtlService;
import io.holoinsight.server.extension.MetricMeterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand All @@ -17,6 +22,7 @@
import io.holoinsight.server.gateway.grpc.DataNode;
import io.holoinsight.server.gateway.grpc.WriteMetricsRequestV4;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand All @@ -32,6 +38,12 @@ public class DetailsStorageService {
@Autowired
private DetailsStorage detailsStorage;

@Autowired
private ProductCtlService productCtlService;

@Autowired
private MetricMeterService metricMeterService;

public Mono<Void> write(AuthInfo authInfo, WriteMetricsRequestV4 req) {
List<Mono<Void>> monos = new ArrayList<>();

Expand Down Expand Up @@ -64,6 +76,11 @@ public Mono<Void> write(AuthInfo authInfo, WriteMetricsRequestV4 req) {
fieldValues.add(dn.getValue());
}
row.setFieldValues(fieldValues);

// check
if (!checkClosed(authInfo.getTenant(), table.getName(), header, pbrow.getTagValuesList())) {
continue;
}
rows.add(row);
}
table.setRows(rows);
Expand All @@ -74,4 +91,30 @@ public Mono<Void> write(AuthInfo authInfo, WriteMetricsRequestV4 req) {

return Flux.merge(monos).ignoreElements();
}

private Boolean checkClosed(String tenant, String metric, Header header, List<String> tagValues) {
try {
Map<String, String> dataTags = new HashMap<>();
if (!CollectionUtils.isEmpty(header.getTagKeys())) {
for (int i = 0; i < header.getTagKeys().size(); i++) {
dataTags.put(header.getTagKeys().get(i), tagValues.get(i));
}
if (CollectionUtils.isEmpty(dataTags)) {
return Boolean.TRUE;
}
Map<String, String> meterTags = metricMeterService.keyGen(tenant, metric, dataTags);
if (CollectionUtils.isEmpty(meterTags)) {
return Boolean.TRUE;
}

if (!productCtlService.isMetricInWhiteList(metric)
&& productCtlService.productClosed(MonitorProductCode.METRIC, meterTags)) {
return Boolean.FALSE;
}
}
} catch (Exception e) {
log.warn("detail metric meter fail" + e.getMessage(), e);
}
return Boolean.TRUE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,13 @@
*/
package io.holoinsight.server.gateway.core.grpc;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.google.common.collect.Maps;
import com.google.protobuf.Empty;
import com.xzchaoo.commons.stat.StringsKey;

import io.grpc.stub.StreamObserver;
import io.holoinsight.server.common.TrafficTracer;
import io.holoinsight.server.common.auth.ApikeyAuthService;
import io.holoinsight.server.common.auth.AuthInfo;
import io.holoinsight.server.common.ctl.MonitorProductCode;
import io.holoinsight.server.common.ctl.ProductCtlService;
import io.holoinsight.server.common.service.MetaDataDictValueService;
import io.holoinsight.server.extension.MetricStorage;
import io.holoinsight.server.extension.model.WriteMetricsParam;
import io.holoinsight.server.gateway.core.utils.StatUtils;
Expand All @@ -36,9 +22,18 @@
import io.holoinsight.server.gateway.grpc.WriteMetricsRequestV4;
import io.holoinsight.server.gateway.grpc.WriteMetricsResponse;
import io.holoinsight.server.gateway.grpc.common.CommonResponseHeader;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* <p>
* created at 2022/2/25
Expand All @@ -56,12 +51,6 @@ public class GatewayGrpcServiceImpl extends GatewayServiceGrpc.GatewayServiceImp
@Autowired
private ApikeyAuthService apikeyAuthService;

@Autowired
private ProductCtlService productCtlService;

@Autowired
private MetaDataDictValueService metaDataDictValueService;

@Autowired
private GatewayHookManager gatewayHookManager;

Expand Down Expand Up @@ -143,10 +132,10 @@ private WriteMetricsParam convertToWriteMetricsParam(AuthInfo authInfo,
List<WriteMetricsParam.Point> points = new ArrayList<>(request.getPointCount());
for (Point p : request.getPointList()) {

if (!productCtlService.isMetricInWhiteList(p.getMetricName())
&& productCtlService.productClosed(MonitorProductCode.METRIC, p.getTagsMap())) {
continue;
}
// if (!productCtlService.isMetricInWhiteList(p.getMetricName())
// && productCtlService.productClosed(MonitorProductCode.METRIC, p.getTagsMap())) {
// continue;
// }

p = gatewayHookManager.processV1(authInfo, p);
if (p == null) {
Expand Down Expand Up @@ -198,10 +187,10 @@ private WriteMetricsParam convertToWriteMetricsParam(AuthInfo authInfo,
for (int i = 0; i < header.getTagKeysCount(); i++) {
tags.put(header.getTagKeys(i), row.getTagValues(i));
}
if (!productCtlService.isMetricInWhiteList(header.getMetricName())
&& productCtlService.productClosed(MonitorProductCode.METRIC, tags)) {
continue;
}
// if (!productCtlService.isMetricInWhiteList(header.getMetricName())
// && productCtlService.productClosed(MonitorProductCode.METRIC, tags)) {
// continue;
// }
wmpp.setTimeStamp(row.getTimestamp());
wmpp.setTags(tags);
for (DataNode dataNode : row.getValueValuesList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ private void tryQueryLogSample(List<AlertNotify> alertNotifyList) {
List<String> logs = new ArrayList<>();
Long alertTime = alertNotify.getAlarmTime();
for (QueryProto.QueryRequest queryRequest : queryRequests) {
QueryProto.QueryResponse response = this.queryClientService.queryData(queryRequest);
QueryProto.QueryResponse response =
this.queryClientService.queryData(queryRequest, "LOG_ALERT");
if (response != null && !CollectionUtils.isEmpty(response.getResultsList())) {
LOGGER.debug("{} log sample result {} request {}", alertNotify.getTraceId(),
J.toJson(response.getResultsList()), J.toJson(queryRequest));
Expand Down Expand Up @@ -226,7 +227,8 @@ private void tryQueryLogAnalysis(List<AlertNotify> alertNotifyList) {
List<String> logs = new ArrayList<>();
Long alertTime = alertNotify.getAlarmTime();
for (QueryProto.QueryRequest queryRequest : queryRequests) {
QueryProto.QueryResponse response = this.queryClientService.queryData(queryRequest);
QueryProto.QueryResponse response =
this.queryClientService.queryData(queryRequest, "LOG_ALERT");
if (response != null && !CollectionUtils.isEmpty(response.getResultsList())) {
LOGGER.debug("{} log analysis result {} request {}", alertNotify.getTraceId(),
J.toJson(response.getResultsList()), J.toJson(queryRequest));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private CheckResult doCheck(Record record) {

QueryProto.QueryRequest request =
ruleAlarmLoadData.buildRequest(record.period, record.tenant, record.trigger);
QueryProto.QueryResponse response = queryClientService.queryData(request);
QueryProto.QueryResponse response = queryClientService.queryData(request, "ALERT");

Map<Long, Double> points = new HashMap<>();
if (response != null && !CollectionUtils.isEmpty(response.getResultsList())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private QueryProto.QueryResponse getData(InspectConfig inspectConfig, Trigger tr

request = QueryProto.QueryRequest.newBuilder().setTenant(inspectConfig.getTenant())
.setQuery(trigger.getQuery()).addAllDatasources(datasources).build();
response = queryClientService.queryData(request);
response = queryClientService.queryData(request, "AI_ALERT");
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} QueryData Success Request:{} Response:{}", inspectConfig.getTraceId(),
G.get().toJson(request), G.get().toJson(response));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ public List<DataResult> queryDataResult(ComputeTaskPackage computeTask,
try {
request = buildRequest(computeTask.getTimestamp(), inspectConfig.getTenant(), trigger);
LOGGER.debug("{} alert query request {}", inspectConfig.getTraceId(), request.toString());
response = queryClientService.queryData(request);
response = queryClientService.queryData(request, "ALERT");
dataResults = merge(dataResults, response);
long deltaTimestamp = getDeltaTimestamp(trigger.getPeriodType());
if (deltaTimestamp > 0) {
deltaRequest = buildRequest(computeTask.getTimestamp() - deltaTimestamp,
inspectConfig.getTenant(), trigger);
deltaResponse = queryClientService.queryData(deltaRequest);
deltaResponse = queryClientService.queryData(deltaRequest, "ALERT");
dataResults = merge(dataResults, deltaResponse);
}
} catch (Exception exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,14 +427,14 @@ private QueryProto.QueryTraceRequest convertRequest(QueryTraceRequest request) {
return requestBuilder.build();
}

public QueryProto.QueryResponse queryData(QueryProto.QueryRequest request) {
public QueryProto.QueryResponse queryData(QueryProto.QueryRequest request, String source) {

QueryProto.QueryRequest queryRequest = this.requestContextAdapter.requestAdapte(request);
long start = System.currentTimeMillis();
QueryProto.QueryResponse response =
queryServiceBlockingStub.withMaxInboundMessageSize(4194304 * 100).queryData(queryRequest);
int pointSize = getPointSizeFromResp(response);
log.info("HOME_QUERY_STAT from[ALERT] invoke[1], cost[{}], pointSize[{}]",
log.info("HOME_QUERY_STAT from[{}] invoke[1], cost[{}], pointSize[{}]", source,
System.currentTimeMillis() - start, pointSize);
return response;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,10 @@ public class QueryResponse {
*/
private List<Result> completes;

private String message;

private Boolean success = true;

private String errorCode;

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class MetaDictKey {
public static final String LOG_TIME_LAYOUT = "log_time_layout";
public static final String TOKEN_URL_WRITE_LIST = "token_url_write_list";
public static final String TOKEN_URL_NO_AUTH = "token_url_no_auth";
public static final String RESOURCE_KEYS = "resource_keys";
public static final String IS_APM_MATERIALIZED = "is_apm_materialized";
public static final String DISPLAY_MENU_APM = "display_menu_apm";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

import com.google.gson.reflect.TypeToken;
import io.holoinsight.server.common.J;
import io.holoinsight.server.common.dao.entity.MetaDataDictValue;
import io.holoinsight.server.common.service.SuperCacheService;
import io.holoinsight.server.home.common.service.SpringContext;
import io.holoinsight.server.home.common.util.scope.IdentityType;
import io.holoinsight.server.common.dao.entity.MetaDataDictValue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -150,12 +151,11 @@ public static List<String> getTokenUrlNoAuth() {
return value;
}

public static Boolean isLogMeteringOpen() {
Boolean value = MetaDictUtil.getValue(MetaDictType.GLOBAL_CONFIG, MetaDictKey.METERING_LOG_OPEN,
new TypeToken<Boolean>() {});
if (null == value) {
return false;
}
public static List<String> getResourceKeys() {
List<String> value = MetaDictUtil.getValue(MetaDictType.GLOBAL_CONFIG,
MetaDictKey.RESOURCE_KEYS, new TypeToken<List<String>>() {});
if (CollectionUtils.isEmpty(value))
return Collections.singletonList("tenant");
return value;
}

Expand Down
Loading

0 comments on commit d12ed21

Please sign in to comment.