From 556e69be7047abf699f79d5d5db7aaff5e3d68be Mon Sep 17 00:00:00 2001 From: "saimu.msm" Date: Tue, 2 Jul 2024 12:53:46 +0800 Subject: [PATCH] fix alert task assign --- .../alert/service/task/CacheAlertTask.java | 10 ++++++ .../task/coordinator/CoordinatorService.java | 32 +++++++++++++++---- .../coordinator/CoordinatorServiceTest.java | 6 ++-- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/server/home/home-alert/src/main/java/io/holoinsight/server/home/alert/service/task/CacheAlertTask.java b/server/home/home-alert/src/main/java/io/holoinsight/server/home/alert/service/task/CacheAlertTask.java index d5e4c0db2..2aa70f653 100644 --- a/server/home/home-alert/src/main/java/io/holoinsight/server/home/alert/service/task/CacheAlertTask.java +++ b/server/home/home-alert/src/main/java/io/holoinsight/server/home/alert/service/task/CacheAlertTask.java @@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -54,6 +55,7 @@ public class CacheAlertTask { protected final AtomicInteger aiPageNum = new AtomicInteger(); protected final AtomicInteger pqlPageSize = new AtomicInteger(); protected final AtomicInteger pqlPageNum = new AtomicInteger(); + protected final AtomicBoolean enable = new AtomicBoolean(true); @Resource protected AlarmRuleMapper alarmRuleDOMapper; @@ -77,6 +79,10 @@ public void start() { private void getAlarmTaskCache() { try { + if (!enable.get()) { + LOGGER.warn("alert cache task has been closed."); + return; + } loadLogMetric(); LOGGER.info("complete to loadLogMetric, logPatternCache size {} logSampleCache size {}", logPatternCache.size(), logSampleCache.size()); @@ -289,4 +295,8 @@ public void setPqlPageSize(int size) { public void setPqlPageNum(int num) { this.pqlPageNum.set(num); } + + public void setEnable(boolean enable) { + this.enable.set(enable); + } } diff --git a/server/home/home-alert/src/main/java/io/holoinsight/server/home/alert/service/task/coordinator/CoordinatorService.java b/server/home/home-alert/src/main/java/io/holoinsight/server/home/alert/service/task/coordinator/CoordinatorService.java index 2b71b724b..66612eea2 100644 --- a/server/home/home-alert/src/main/java/io/holoinsight/server/home/alert/service/task/coordinator/CoordinatorService.java +++ b/server/home/home-alert/src/main/java/io/holoinsight/server/home/alert/service/task/coordinator/CoordinatorService.java @@ -7,6 +7,7 @@ import com.google.common.reflect.TypeToken; import io.holoinsight.server.common.AddressUtil; import io.holoinsight.server.common.J; +import io.holoinsight.server.common.Pair; import io.holoinsight.server.home.alert.service.task.CacheAlertTask; import io.holoinsight.server.home.alert.service.task.coordinator.server.NettyServer; import io.holoinsight.server.home.biz.common.MetaDictUtil; @@ -54,22 +55,30 @@ public class CoordinatorService { * * @return */ - public int getOrder() { + public Pair getOrder() { long minuteBefore = System.currentTimeMillis() - 60_000L; QueryWrapper condition = new QueryWrapper<>(); condition.eq("role", this.role); condition.ge("last_heartbeat_time", minuteBefore); + int total = -1; + int order = -1; List clusters = this.clusterMapper.selectList(condition); + if (CollectionUtils.isEmpty(clusters)) { + return new Pair<>(order, total); + } else { + total = clusters.size(); + } + clusters.sort(Comparator.comparing(Cluster::getIp)); String myIp = AddressUtil.getLocalHostIPV4(); log.info("get order by my ip {} in {}", myIp, J.toJson(clusters)); this.otherMembers = new ArrayList<>(); - int order = -1; + List blackList = getBlackServerList(); if (!CollectionUtils.isEmpty(blackList) && blackList.contains(myIp)) { - return order; + return new Pair<>(order, total); } for (int i = 0; i < clusters.size(); i++) { Cluster cluster = clusters.get(i); @@ -79,7 +88,7 @@ public int getOrder() { this.otherMembers.add(cluster.getIp()); } } - return order; + return new Pair<>(order, total); } private List getBlackServerList() { @@ -91,7 +100,9 @@ private List getBlackServerList() { } public void spread(long heartbeat) { - int order = getOrder(); + Pair pair = getOrder(); + int order = pair.getLeft(); + int total = pair.getRight(); if (order < 0) { log.info("fail to get order, give up allocating task."); return; @@ -108,11 +119,18 @@ public void spread(long heartbeat) { if (realOrder < 0) { return; } - calculateSelectRange(realOrder); + calculateSelectRange(realOrder, total); } - protected void calculateSelectRange(int realOrder) { + protected void calculateSelectRange(int realOrder, int total) { double realSize = orderMap.getRealSize().doubleValue(); + if (total > 2 && realSize <= ((double) total / 2)) { + log.warn("TASK_ASSIGN_CRITICAL,realSize={},total={}", realSize, total); + this.cacheAlertTask.setEnable(false); + return; + } else { + this.cacheAlertTask.setEnable(true); + } double ruleSize = this.cacheAlertTask.ruleSize("rule", (byte) 1).doubleValue(); double aiSize = this.cacheAlertTask.ruleSize("ai", (byte) 1).doubleValue(); double pqlSize = this.cacheAlertTask.ruleSize("pql", (byte) 1).doubleValue(); diff --git a/server/home/home-alert/src/test/java/io/holoinsight/server/home/alert/service/task/coordinator/CoordinatorServiceTest.java b/server/home/home-alert/src/test/java/io/holoinsight/server/home/alert/service/task/coordinator/CoordinatorServiceTest.java index 09205eca0..a235d7bc3 100644 --- a/server/home/home-alert/src/test/java/io/holoinsight/server/home/alert/service/task/coordinator/CoordinatorServiceTest.java +++ b/server/home/home-alert/src/test/java/io/holoinsight/server/home/alert/service/task/coordinator/CoordinatorServiceTest.java @@ -54,9 +54,9 @@ public void testCalculateSelectRange() { } private void order(CoordinatorService service) { - service.calculateSelectRange(0); - service.calculateSelectRange(1); - service.calculateSelectRange(2); + service.calculateSelectRange(0, 3); + service.calculateSelectRange(1, 3); + service.calculateSelectRange(2, 3); Mockito.verify(service.cacheAlertTask, Mockito.times(3)) .setRulePageNum(rulePageNumArgument.capture()); List rulePageNums = rulePageNumArgument.getAllValues();