Skip to content

Commit

Permalink
fix: alert task assign (#872)
Browse files Browse the repository at this point in the history
  • Loading branch information
masaimu authored Jul 2, 2024
1 parent b7249a8 commit 3df55b7
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -54,6 +55,7 @@ public class CacheAlertTask {
protected final AtomicInteger aiPageNum = new AtomicInteger();
protected final AtomicInteger pqlPageSize = new AtomicInteger();
protected final AtomicInteger pqlPageNum = new AtomicInteger();
protected final AtomicBoolean enable = new AtomicBoolean(true);

@Resource
protected AlarmRuleMapper alarmRuleDOMapper;
Expand All @@ -77,6 +79,10 @@ public void start() {

private void getAlarmTaskCache() {
try {
if (!enable.get()) {
LOGGER.warn("alert cache task has been closed.");
return;
}
loadLogMetric();
LOGGER.info("complete to loadLogMetric, logPatternCache size {} logSampleCache size {}",
logPatternCache.size(), logSampleCache.size());
Expand Down Expand Up @@ -289,4 +295,8 @@ public void setPqlPageSize(int size) {
public void setPqlPageNum(int num) {
this.pqlPageNum.set(num);
}

public void setEnable(boolean enable) {
this.enable.set(enable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.reflect.TypeToken;
import io.holoinsight.server.common.AddressUtil;
import io.holoinsight.server.common.J;
import io.holoinsight.server.common.Pair;
import io.holoinsight.server.home.alert.service.task.CacheAlertTask;
import io.holoinsight.server.home.alert.service.task.coordinator.server.NettyServer;
import io.holoinsight.server.home.biz.common.MetaDictUtil;
Expand Down Expand Up @@ -54,22 +55,30 @@ public class CoordinatorService {
*
* @return
*/
public int getOrder() {
public Pair<Integer /* order */, Integer /* total */> getOrder() {
long minuteBefore = System.currentTimeMillis() - 60_000L;
QueryWrapper<Cluster> condition = new QueryWrapper<>();
condition.eq("role", this.role);
condition.ge("last_heartbeat_time", minuteBefore);

int total = -1;
int order = -1;
List<Cluster> clusters = this.clusterMapper.selectList(condition);

if (CollectionUtils.isEmpty(clusters)) {
return new Pair<>(order, total);
} else {
total = clusters.size();
}

clusters.sort(Comparator.comparing(Cluster::getIp));
String myIp = AddressUtil.getLocalHostIPV4();
log.info("get order by my ip {} in {}", myIp, J.toJson(clusters));
this.otherMembers = new ArrayList<>();
int order = -1;

List<String> blackList = getBlackServerList();
if (!CollectionUtils.isEmpty(blackList) && blackList.contains(myIp)) {
return order;
return new Pair<>(order, total);
}
for (int i = 0; i < clusters.size(); i++) {
Cluster cluster = clusters.get(i);
Expand All @@ -79,7 +88,7 @@ public int getOrder() {
this.otherMembers.add(cluster.getIp());
}
}
return order;
return new Pair<>(order, total);
}

private List<String> getBlackServerList() {
Expand All @@ -91,7 +100,9 @@ private List<String> getBlackServerList() {
}

public void spread(long heartbeat) {
int order = getOrder();
Pair<Integer /* order */, Integer /* total */> pair = getOrder();
int order = pair.getLeft();
int total = pair.getRight();
if (order < 0) {
log.info("fail to get order, give up allocating task.");
return;
Expand All @@ -108,11 +119,18 @@ public void spread(long heartbeat) {
if (realOrder < 0) {
return;
}
calculateSelectRange(realOrder);
calculateSelectRange(realOrder, total);
}

protected void calculateSelectRange(int realOrder) {
protected void calculateSelectRange(int realOrder, int total) {
double realSize = orderMap.getRealSize().doubleValue();
if (total > 2 && realSize <= ((double) total / 2)) {
log.warn("TASK_ASSIGN_CRITICAL,realSize={},total={}", realSize, total);
this.cacheAlertTask.setEnable(false);
return;
} else {
this.cacheAlertTask.setEnable(true);
}
double ruleSize = this.cacheAlertTask.ruleSize("rule", (byte) 1).doubleValue();
double aiSize = this.cacheAlertTask.ruleSize("ai", (byte) 1).doubleValue();
double pqlSize = this.cacheAlertTask.ruleSize("pql", (byte) 1).doubleValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ public void testCalculateSelectRange() {
}

private void order(CoordinatorService service) {
service.calculateSelectRange(0);
service.calculateSelectRange(1);
service.calculateSelectRange(2);
service.calculateSelectRange(0, 3);
service.calculateSelectRange(1, 3);
service.calculateSelectRange(2, 3);
Mockito.verify(service.cacheAlertTask, Mockito.times(3))
.setRulePageNum(rulePageNumArgument.capture());
List<Integer> rulePageNums = rulePageNumArgument.getAllValues();
Expand Down

0 comments on commit 3df55b7

Please sign in to comment.