Skip to content

Commit

Permalink
refactor(home): change size of messages sent by kafka to 2M (#848)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsy1001de authored May 7, 2024
1 parent f68018a commit 75c8f19
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public void init() {
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AggValuesSerdes.S.class.getName());
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 64 * 1024);
properties.put(ProducerConfig.LINGER_MS_CONFIG, 100);
properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 2097152);
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 3000);
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
aggProperties.getProducerCompressionType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.holoinsight.server.common.dao.converter.AlarmSubscribeConverter;
import io.holoinsight.server.common.dao.mapper.AlarmSubscribeMapper;
import io.holoinsight.server.common.dao.entity.AlarmSubscribe;
import io.holoinsight.server.common.dao.entity.dto.AlarmSubscribeDTO;
import io.holoinsight.server.common.dao.entity.dto.AlarmSubscribeInfo;
import io.holoinsight.server.common.dao.mapper.AlarmSubscribeMapper;
import io.holoinsight.server.common.service.AlertSubscribeService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
Expand All @@ -32,11 +32,9 @@
public class AlertSubscribeServiceImpl extends ServiceImpl<AlarmSubscribeMapper, AlarmSubscribe>
implements AlertSubscribeService {

@Resource
@Autowired
private AlarmSubscribeConverter alarmSubscribeConverter;

@Resource
private AlarmSubscribeMapper alarmSubscribeMapper;

public Boolean saveDataBatch(AlarmSubscribeDTO alarmSubscribeDTO, String creator, String tenant,
String workspace) {
Expand All @@ -56,7 +54,9 @@ public Boolean saveDataBatch(AlarmSubscribeDTO alarmSubscribeDTO, String creator
alarmSubscribeInfo.stream().map(AlarmSubscribeInfo::getId).collect(Collectors.toList());
ids.removeAll(updateIds);
// 删除
this.removeBatchByIds(ids);
if (!CollectionUtils.isEmpty(ids)) {
ids.forEach(id -> this.baseMapper.deleteById(id));
}

List<AlarmSubscribe> alarmSubscribeList = new ArrayList<>();
alarmSubscribeDTO.getAlarmSubscribe().forEach(e -> {
Expand Down Expand Up @@ -88,7 +88,7 @@ public AlarmSubscribeDTO queryByUniqueId(QueryWrapper<AlarmSubscribe> queryWrapp
String uniqueId) {
AlarmSubscribeDTO alarmSubscribeDTO = new AlarmSubscribeDTO();
alarmSubscribeDTO.setUniqueId(uniqueId);
List<AlarmSubscribe> list = this.alarmSubscribeMapper.selectList(queryWrapper);
List<AlarmSubscribe> list = this.baseMapper.selectList(queryWrapper);
if (!CollectionUtils.isEmpty(list)) {
alarmSubscribeDTO.setEnvType(list.get(0).getEnvType());
}
Expand All @@ -98,7 +98,7 @@ public AlarmSubscribeDTO queryByUniqueId(QueryWrapper<AlarmSubscribe> queryWrapp

@Override
public List<AlarmSubscribeInfo> queryByMap(QueryWrapper<AlarmSubscribe> queryWrapper) {
List<AlarmSubscribe> list = this.alarmSubscribeMapper.selectList(queryWrapper);
List<AlarmSubscribe> list = this.baseMapper.selectList(queryWrapper);
return alarmSubscribeConverter.dosToDTOs(list);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ public interface GaeaCollectConfigService extends IService<GaeaCollectConfig> {

GaeaCollectConfigDTO findById(Long id);

GaeaCollectConfigDTO findByTableName(String tenant, String workspace, String tableName);

List<GaeaCollectConfigDTO> findByRefId(String refId);

GaeaCollectConfigDTO upsert(GaeaCollectConfigDTO gaeaCollectConfigDTO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
*/
package io.holoinsight.server.home.biz.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import io.holoinsight.server.common.MD5Hash;
import io.holoinsight.server.home.biz.service.GaeaCollectConfigService;
import io.holoinsight.server.home.dal.converter.GaeaCollectConfigConverter;
import io.holoinsight.server.home.dal.mapper.GaeaCollectConfigMapper;
import io.holoinsight.server.home.dal.model.GaeaCollectConfig;
import io.holoinsight.server.home.dal.model.dto.GaeaCollectConfigDTO;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.mapstruct.factory.Mappers;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand Down Expand Up @@ -39,6 +41,20 @@ public GaeaCollectConfigDTO findById(Long id) {
return gaeaCollectConfigConverter.doToDTO(getById(id));
}

@Override
public GaeaCollectConfigDTO findByTableName(String tenant, String workspace, String tableName) {
QueryWrapper<GaeaCollectConfig> wrapper = new QueryWrapper<>();

wrapper.eq("tenant", tenant);
if (StringUtils.isNotBlank(workspace)) {
wrapper.eq("workspace", workspace);
}
wrapper.eq("table_name", tableName);
wrapper.eq("deleted", 0);
wrapper.last("LIMIT 1");
return gaeaCollectConfigConverter.doToDTO(this.getOne(wrapper));
}

@Override
public List<GaeaCollectConfigDTO> findByRefId(String refId) {
Map<String, Object> columnMap = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ public void checkParameter() {
ParaCheckUtil.checkParaNotNull(alarmSubscribeDTO, "alarmSubscribeDTO");
MonitorScope ms = RequestContext.getContext().ms;
MonitorUser mu = RequestContext.getContext().mu;
if (StringUtils.isNotEmpty(alarmSubscribeDTO.getUniqueId())) {
String uniqueId = alarmSubscribeDTO.getUniqueId();
if (StringUtils.isNotEmpty(uniqueId)) {
ParaCheckUtil.checkParaBoolean(
parameterSecurityService.checkRuleTenantAndWorkspace(alarmSubscribeDTO.getUniqueId(),
tenant(), workspace()),
parameterSecurityService.checkRuleTenantAndWorkspace(uniqueId, tenant(), workspace()),
"uniqueId do not belong to this tenant " + tenant() + " or workspace " + workspace());
}
if (!CollectionUtils.isEmpty(alarmSubscribeDTO.getAlarmSubscribe())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,25 @@
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.reflect.TypeToken;
import io.holoinsight.server.common.J;
import io.holoinsight.server.common.dao.entity.MetricInfo;
import io.holoinsight.server.common.dao.entity.dto.MetricInfoDTO;
import io.holoinsight.server.common.service.RequestContextAdapter;
import io.holoinsight.server.common.MonitorPageRequest;
import io.holoinsight.server.common.RequestContext;
import io.holoinsight.server.common.dao.mapper.AlarmRuleMapper;
import io.holoinsight.server.common.dao.mapper.AlertTemplateMapper;
import io.holoinsight.server.common.dao.emuns.TimeFilterEnum;
import io.holoinsight.server.common.dao.entity.AlarmRule;
import io.holoinsight.server.common.dao.entity.AlertTemplate;
import io.holoinsight.server.common.dao.entity.dto.AlarmRuleDTO;
import io.holoinsight.server.common.dao.entity.dto.AlertRuleExtra;
import io.holoinsight.server.common.dao.entity.dto.AlertSilenceConfig;
import io.holoinsight.server.common.dao.entity.dto.MetricInfoDTO;
import io.holoinsight.server.common.dao.entity.dto.NotificationConfig;
import io.holoinsight.server.common.dao.entity.dto.NotificationTemplate;
import io.holoinsight.server.common.dao.entity.dto.alarm.AlarmRuleConf;
import io.holoinsight.server.common.dao.entity.dto.alarm.TimeFilter;
import io.holoinsight.server.common.dao.emuns.TimeFilterEnum;
import io.holoinsight.server.common.MonitorPageRequest;
import io.holoinsight.server.common.dao.entity.dto.alarm.trigger.CompareConfig;
import io.holoinsight.server.common.dao.entity.dto.alarm.trigger.DataSource;
import io.holoinsight.server.common.dao.entity.dto.alarm.trigger.Trigger;
import io.holoinsight.server.common.dao.mapper.AlarmRuleMapper;
import io.holoinsight.server.common.dao.mapper.AlertTemplateMapper;
import io.holoinsight.server.common.service.RequestContextAdapter;
import io.holoinsight.server.home.web.common.ParaCheckUtil;
import io.holoinsight.server.home.web.security.LevelAuthorizationCheckResult;
import io.holoinsight.server.home.web.security.LevelAuthorizationMetaData;
Expand Down Expand Up @@ -86,13 +85,14 @@ public class AlarmRuleLevelAuthorizationChecker extends AbstractQueryChecker

private static final Set<String> silenceModes =
new HashSet<>(Arrays.asList("default", "gradual", "fixed"));
private static final Set<String> aggregators = new HashSet<>(Arrays.asList("sum", "avg", "mix",
"max", "count", "none", "SUM", "AVG", "MIX", "MAX", "COUNT", "NONE"));
private static final Set<String> aggregators = new HashSet<>(Arrays.asList("sum", "avg", "min",
"max", "count", "none", "SUM", "AVG", "MIN", "MAX", "COUNT", "NONE"));
private static final Set<String> metricTypes =
new HashSet<>(Arrays.asList("app", "cache", "log", "oss", "trace", "system", "metric",
"service", "function", "pg", "mongodb", "db", "miniProgram", "mysql"));
private static final Set<String> products = new HashSet<>(Arrays.asList("JVM", "Function",
"OceanBase", "Tbase", "PortCheck", "System", "MiniProgram", "Spanner", "IoT", "APM"));
private static final Set<String> products = new HashSet<>(
Arrays.asList("JVM", "Function", "OceanBase", "Tbase", "PortCheck", "System", "MiniProgram",
"Spanner", "IoT", "APM", "Mysql", "SLB", "SOFAMQX", "Postgres", "Gateway"));

@Override
public LevelAuthorizationCheckResult check(LevelAuthorizationMetaData levelAuthMetaData,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package io.holoinsight.server.registry.model;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
Expand All @@ -22,6 +23,7 @@ public class Output {
*/
private String type;
private Gateway gateway;
private SLSGateway sls;

@ToString
@Getter
Expand All @@ -32,4 +34,16 @@ public static class Gateway {
*/
private String metricName;
}

@ToString
@Getter
@Setter
@AllArgsConstructor
public static class SLSGateway {
private String endpoint;
private String project;
private String logstore;
private String ak;
private String sk;
}
}

0 comments on commit 75c8f19

Please sign in to comment.