keys = redisTemplate.keys(beClearKey);
@@ -65,8 +131,7 @@ private void reset() {
*/
private String getCacheKey(String namespace) {
return CommonUtil.generateRedisKey(cacheName,
- WebPluginUtils.traceTenantCode(), WebPluginUtils.traceEnvCode(), namespace);
-
+ WebPluginUtils.traceTenantCode(), WebPluginUtils.traceEnvCode(), namespace);
}
/**
@@ -84,7 +149,7 @@ protected T queryValue(String userAppKey, String envCode, String namespace) {
/**
* 新版本貌似用上面的方法替代了
- *
+ *
* TODO 具体实现 - 张天赐修改,为了编译通过
*
* @param namespace -
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/cache/agentimpl/ShadowKafkaClusterConfigAgentCache.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/cache/agentimpl/ShadowKafkaClusterConfigAgentCache.java
index 6d9bebecd2..70c708a0cc 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/cache/agentimpl/ShadowKafkaClusterConfigAgentCache.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/cache/agentimpl/ShadowKafkaClusterConfigAgentCache.java
@@ -17,11 +17,9 @@
@Component
public class ShadowKafkaClusterConfigAgentCache extends AbstractAgentConfigCache> {
- public static final String CACHE_NAME = "t:a:c:shadow:es";
-
+ public static final String CACHE_NAME = "t:a:c:shadow:kafka";
@Autowired
private DsService dsService;
-
public ShadowKafkaClusterConfigAgentCache(@Autowired RedisTemplate redisTemplate) {
super(CACHE_NAME, redisTemplate);
}
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/common/AbstractSceneTask.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/common/AbstractSceneTask.java
index 98f7b768ab..30ba938ca5 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/common/AbstractSceneTask.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/common/AbstractSceneTask.java
@@ -1,11 +1,8 @@
package io.shulie.takin.web.biz.common;
import java.time.LocalDateTime;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -63,7 +60,7 @@ protected int getAllowedTenantThreadMax() {
return allowedTenantThreadMax;
}
allowedTenantThreadMax = ConfigServerHelper.getIntegerValueByKey(
- ConfigServerKeyEnum.PER_TENANT_ALLOW_TASK_THREADS_MAX);
+ ConfigServerKeyEnum.PER_TENANT_ALLOW_TASK_THREADS_MAX);
return allowedTenantThreadMax;
}
@@ -72,7 +69,7 @@ protected void cleanUnAvailableTasks(List taskDtoList) {
if (CollectionUtils.isNotEmpty(taskDtoList)) {
final LocalDateTime now = LocalDateTime.now();
taskDtoList.stream().filter(t -> t.getEndTime() != null && now.compareTo(t.getEndTime()) > 0).forEach(
- t -> removeReportKey(t.getReportId()));
+ t -> removeReportKey(t.getReportId()));
}
} catch (Exception e) {
log.error("清理过期任务时发生错误!", e);
@@ -96,7 +93,7 @@ protected synchronized List runTask(List taskDtoList
int allowedThreadMax = this.getAllowedTenantThreadMax();
//筛选出租户的任务
final Map> listMap = taskDtoList.stream().filter(t ->
- t.getReportId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()
+ t.getReportId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()
).collect(Collectors.groupingBy(SceneTaskDto::getTenantId));
if (org.springframework.util.CollectionUtils.isEmpty(listMap)) {
return taskAlreadyRun;
@@ -111,7 +108,7 @@ protected synchronized List runTask(List taskDtoList
* 取最值。当前租户的任务数和允许的最大线程数
*/
AtomicInteger allowRunningThreads = new AtomicInteger(
- Math.min(allowedThreadMax, tenantTasks.size()));
+ Math.min(allowedThreadMax, tenantTasks.size()));
/**
* 已经运行的任务数
@@ -128,7 +125,7 @@ protected synchronized List runTask(List taskDtoList
* allow running threads calculated by capacity
*/
int permitsThreads = Math.min(allowedThreadMax - oldRunningThreads.get(),
- allowRunningThreads.get());
+ allowRunningThreads.get());
// add new threads to capacity
oldRunningThreads.addAndGet(permitsThreads);
// adjust allow current running threads
@@ -144,4 +141,29 @@ protected synchronized List runTask(List taskDtoList
return taskAlreadyRun;
}
+
+ /**
+ * @param taskDtoList
+ * @param shardingContext
+ */
+ protected void runTask_ext(List taskDtoList, ShardingContext shardingContext) {
+ //筛选出租户的任务
+ final Map> listMap =
+ taskDtoList.stream().filter(t -> t.getReportId() % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()
+ ).collect(Collectors.groupingBy(SceneTaskDto::getTenantId));
+ if (listMap.isEmpty()) {
+ return;
+ }
+ for (Entry> listEntry : listMap.entrySet()) {
+ final List tenantTasks = listEntry.getValue();
+ if (CollectionUtils.isEmpty(tenantTasks)) {
+ continue;
+ }
+ for (int i = 0; i < tenantTasks.size(); i++) {
+ final SceneTaskDto task = tenantTasks.get(i);
+ this.runTaskInTenantIfNecessary(task, task.getReportId());
+ }
+ }
+ }
+
}
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/init/sync/ConfigSynchronizer.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/init/sync/ConfigSynchronizer.java
index 03b9456fdf..4acdf85412 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/init/sync/ConfigSynchronizer.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/init/sync/ConfigSynchronizer.java
@@ -3,7 +3,9 @@
import java.util.List;
import io.shulie.takin.web.biz.service.ApplicationService;
+import io.shulie.takin.web.common.enums.config.ConfigServerKeyEnum;
import io.shulie.takin.web.data.result.application.ApplicationDetailResult;
+import io.shulie.takin.web.data.util.ConfigServerHelper;
import io.shulie.takin.web.ext.util.WebPluginUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
@@ -32,28 +34,34 @@ public void initSyncAgentConfig() {
if (WebPluginUtils.checkUserPlugin()) {
return;
}
- log.info("项目启动,重新同步信息去配置中心");
- List applications = applicationService.getAllApplications();
- if (CollectionUtils.isEmpty(applications)) {
- return;
+ // 是否需要同步数据到配置中心
+ if (!ConfigServerHelper.getBooleanValueByKey(ConfigServerKeyEnum.TAKIN_ENABLE_SYN_CONFIG)) {
+ log.warn("项目启动,应用相关配置不同步");
} else {
- for (ApplicationDetailResult application : applications) {
- configSyncService.syncGuard(WebPluginUtils.traceTenantCommonExt(), application.getApplicationId(),
- application.getApplicationName());
- sleep();
- configSyncService.syncShadowDB(WebPluginUtils.traceTenantCommonExt(), application.getApplicationId(),
- application.getApplicationName());
- sleep();
- configSyncService.syncAllowList(WebPluginUtils.traceTenantCommonExt(), application.getApplicationId(),
- application.getApplicationName());
- sleep();
- configSyncService.syncShadowJob(WebPluginUtils.traceTenantCommonExt(), application.getApplicationId(),
- application.getApplicationName());
- sleep();
- configSyncService.syncShadowConsumer(WebPluginUtils.traceTenantCommonExt(), application.getApplicationId(),
- application.getApplicationName());
+ log.info("项目启动,重新同步信息去配置中心");
+ List applications = applicationService.getAllApplications();
+ if (CollectionUtils.isEmpty(applications)) {
+ return;
+ } else {
+ for (ApplicationDetailResult application : applications) {
+ configSyncService.syncGuard(WebPluginUtils.traceTenantCommonExt(), application.getApplicationId(),
+ application.getApplicationName());
+ sleep();
+ configSyncService.syncShadowDB(WebPluginUtils.traceTenantCommonExt(), application.getApplicationId(),
+ application.getApplicationName());
+ sleep();
+ configSyncService.syncAllowList(WebPluginUtils.traceTenantCommonExt(), application.getApplicationId(),
+ application.getApplicationName());
+ sleep();
+ configSyncService.syncShadowJob(WebPluginUtils.traceTenantCommonExt(), application.getApplicationId(),
+ application.getApplicationName());
+ sleep();
+ configSyncService.syncShadowConsumer(WebPluginUtils.traceTenantCommonExt(), application.getApplicationId(),
+ application.getApplicationName());
+ }
}
}
+ log.info("项目启动,重新同步信息去配置中心");
configSyncService.syncClusterTestSwitch(WebPluginUtils.traceTenantCommonExt());
sleep();
configSyncService.syncAllowListSwitch(WebPluginUtils.traceTenantCommonExt());
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/AppAccessStatusJob.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/AppAccessStatusJob.java
index cd7f1d6349..8e08b912a9 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/AppAccessStatusJob.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/AppAccessStatusJob.java
@@ -28,17 +28,17 @@
*/
@Component
@ElasticSchedulerJob(jobName = "appAccessStatusJob", cron = "0/10 * * * * ?", description = "同步大数据应用状态",
- // 时效转移
- misfire = true,
- // 重新执行
- failover = true)
+ // 时效转移
+ misfire = true,
+ // 重新执行
+ failover = true)
public class AppAccessStatusJob implements SimpleJob {
@Autowired
private ApplicationService applicationService;
@Resource
- @Qualifier("jobThreadPool")
- private ThreadPoolExecutor jobThreadPool;
+ @Qualifier("syncAppStatusThreadPool")
+ private ThreadPoolExecutor syncAppStatusThreadPool;
@Autowired
private DistributedLock distributedLock;
@@ -53,26 +53,26 @@ public void execute(ShardingContext shardingContext) {
List tenantInfoExts = WebPluginUtils.getTenantInfoList();
for (TenantInfoExt ext : tenantInfoExts) {
- if(CollectionUtils.isEmpty(ext.getEnvs())) {
+ if (CollectionUtils.isEmpty(ext.getEnvs())) {
continue;
}
// 根据环境 分线程
for (TenantEnv e : ext.getEnvs()) {
// 开始数据层分片
// 分布式锁
- String lockKey = JobRedisUtils.getJobRedis(ext.getTenantId(),e.getEnvCode(),shardingContext.getJobName());
+ String lockKey = JobRedisUtils.getJobRedis(ext.getTenantId(), e.getEnvCode(), shardingContext.getJobName());
if (distributedLock.checkLock(lockKey)) {
continue;
}
- jobThreadPool.execute(() -> {
- boolean tryLock = distributedLock.tryLock(lockKey, 1L, 1L, TimeUnit.MINUTES);
- if(!tryLock) {
+ syncAppStatusThreadPool.execute(() -> {
+ boolean tryLock = distributedLock.tryLock(lockKey, 0L, 1L, TimeUnit.MINUTES);
+ if (!tryLock) {
return;
}
try {
WebPluginUtils.setTraceTenantContext(
- new TenantCommonExt(ext.getTenantId(),ext.getTenantAppKey(),e.getEnvCode(),
- ext.getTenantCode(), ContextSourceEnum.JOB.getCode()));
+ new TenantCommonExt(ext.getTenantId(), ext.getTenantAppKey(), e.getEnvCode(),
+ ext.getTenantCode(), ContextSourceEnum.JOB.getCode()));
applicationService.syncApplicationAccessStatus();
WebPluginUtils.removeTraceContext();
} finally {
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/AppRemoteApiFilterJob.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/AppRemoteApiFilterJob.java
index 8abeca6b4c..b468ab0ab9 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/AppRemoteApiFilterJob.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/AppRemoteApiFilterJob.java
@@ -50,8 +50,8 @@ public class AppRemoteApiFilterJob implements SimpleJob {
@Resource
private ApplicationApiService apiService;
@Resource
- @Qualifier("jobThreadPool")
- private ThreadPoolExecutor jobThreadPool;
+ @Qualifier("appRemoteApiFilterThreadPool")
+ private ThreadPoolExecutor appRemoteApiFilterThreadPool;
@Resource
private DistributedLock distributedLock;
@@ -64,7 +64,7 @@ public void execute(ShardingContext shardingContext) {
} else {
List tenantInfoExtList = WebPluginUtils.getTenantInfoList();
for (TenantInfoExt ext : tenantInfoExtList) {
- if(CollectionUtils.isEmpty(ext.getEnvs())) {
+ if (CollectionUtils.isEmpty(ext.getEnvs())) {
continue;
}
// 根据环境 分线程
@@ -74,15 +74,15 @@ public void execute(ShardingContext shardingContext) {
if (distributedLock.checkLock(lockKey)) {
continue;
}
- jobThreadPool.execute(() -> {
- boolean tryLock = distributedLock.tryLock(lockKey, 1L, 1L, TimeUnit.MINUTES);
+ appRemoteApiFilterThreadPool.execute(() -> {
+ boolean tryLock = distributedLock.tryLock(lockKey, 0L, 1L, TimeUnit.MINUTES);
if (!tryLock) {
return;
}
try {
WebPluginUtils.setTraceTenantContext(
- new TenantCommonExt(ext.getTenantId(), ext.getTenantAppKey(), e.getEnvCode(),
- ext.getTenantCode(), ContextSourceEnum.JOB.getCode()));
+ new TenantCommonExt(ext.getTenantId(), ext.getTenantAppKey(), e.getEnvCode(),
+ ext.getTenantCode(), ContextSourceEnum.JOB.getCode()));
this.appRemoteApiFilter();
WebPluginUtils.removeTraceContext();
} finally {
@@ -121,7 +121,7 @@ private void appRemoteApiFilter() {
}
delList.addAll(appRemoteCallFilterList);
// 唯一
- filterMap.put(apiManage.getApplicationId()+"##" +apiManage.getApi(), appRemoteCallFilterList);
+ filterMap.put(apiManage.getApplicationId() + "##" + apiManage.getApi(), appRemoteCallFilterList);
});
});
@@ -132,7 +132,7 @@ private void appRemoteApiFilter() {
List save = Lists.newArrayList();
filterMap.forEach((k, v) -> {
String[] temp = k.split("##");
- if(temp.length != 2) {
+ if (temp.length != 2) {
return;
}
String interfaceName = temp[1];
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/CalcApplicationSummaryJob.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/CalcApplicationSummaryJob.java
index f3f300a47a..61745d0892 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/CalcApplicationSummaryJob.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/CalcApplicationSummaryJob.java
@@ -2,94 +2,91 @@
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import io.shulie.takin.job.annotation.ElasticSchedulerJob;
import io.shulie.takin.web.biz.common.AbstractSceneTask;
import io.shulie.takin.web.biz.service.report.ReportTaskService;
+import io.shulie.takin.web.biz.threadpool.ThreadPoolUtil;
import io.shulie.takin.web.common.pojo.dto.SceneTaskDto;
import io.shulie.takin.web.ext.util.WebPluginUtils;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
-import org.springframework.util.CollectionUtils;
/**
* @author 无涯
* @date 2021/7/13 23:10
*/
-@Component
-@ElasticSchedulerJob(jobName = "calcApplicationSummaryJob",
- // 分片序列号和参数用等号分隔 不需要参数可以不加
- isSharding = true,
- //shardingItemParameters = "0=0,1=1,2=2",
- cron = "*/10 * * * * ?",
- description = "汇总应用 机器数 风险机器数")
+//@Component
+//@ElasticSchedulerJob(jobName = "calcApplicationSummaryJob",
+// // 分片序列号和参数用等号分隔 不需要参数可以不加
+// isSharding = true,
+// //shardingItemParameters = "0=0,1=1,2=2",
+// cron = "*/10 * * * * ?",
+// description = "汇总应用 机器数 风险机器数")
@Slf4j
public class CalcApplicationSummaryJob extends AbstractSceneTask implements SimpleJob {
@Autowired
private ReportTaskService reportTaskService;
- @Autowired
- @Qualifier("reportSummaryThreadPool")
- private ThreadPoolExecutor reportThreadPool;
-
private static Map runningTasks = new ConcurrentHashMap<>();
private static AtomicInteger EMPTY = new AtomicInteger();
@Override
public void execute(ShardingContext shardingContext) {
+ try {
+ this.execute_ext(shardingContext);
+ } catch (Throwable e) {
+ // 捕捉全部异常,防止任务异常,导致esjob有问题
+ log.error("io.shulie.takin.web.biz.job.CalcApplicationSummaryJob#execute error" + ExceptionUtils.getStackTrace(e));
+ }
+ }
+
+ public void execute_ext(ShardingContext shardingContext) {
long start = System.currentTimeMillis();
final Boolean openVersion = WebPluginUtils.isOpenVersion();
- while (true) {
- List taskDtoList = getTaskFromRedis();
- if (taskDtoList == null) { break; }
- if (openVersion) {
- for (SceneTaskDto taskDto : taskDtoList) {
- Long reportId = taskDto.getReportId();
- // 开始数据层分片
- if (reportId % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()) {
- Object task = runningTasks.putIfAbsent(reportId, EMPTY);
- if (task == null) {
- reportThreadPool.execute(() -> {
- try {
- reportTaskService.calcApplicationSummary(reportId);
- } catch (Throwable e) {
- log.error(
+ List taskDtoList = getTaskFromRedis();
+ if (taskDtoList == null) {
+ log.warn("current not running pressure task!!!");
+ return;
+ }
+ if (openVersion) {
+ for (SceneTaskDto taskDto : taskDtoList) {
+ Long reportId = taskDto.getReportId();
+ // 开始数据层分片
+ if (reportId % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()) {
+ Object task = runningTasks.putIfAbsent(reportId, EMPTY);
+ if (task == null) {
+ ThreadPoolUtil.getReportSummaryThreadPool().execute(() -> {
+ try {
+ reportTaskService.calcApplicationSummary(reportId);
+ } catch (Throwable e) {
+ log.error(
"execute CalcApplicationSummaryJob occured error. reportId= {},errorMsg={}",
reportId, e.getMessage(), e);
- } finally {
- runningTasks.remove(reportId);
- }
- });
- }
+ } finally {
+ runningTasks.remove(reportId);
+ }
+ });
}
}
- } else {
- this.runTask(taskDtoList,shardingContext);
}
+ } else {
+ this.runTask_ext(taskDtoList, shardingContext);
}
-
log.debug("calcApplicationSummaryJob 执行时间:{}", System.currentTimeMillis() - start);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
}
@Override
protected void runTaskInTenantIfNecessary(SceneTaskDto tenantTask, Long reportId) {
//将任务放入线程池
- reportThreadPool.execute(() -> {
+ ThreadPoolUtil.getReportSummaryThreadPool().execute(() -> {
try {
WebPluginUtils.setTraceTenantContext(tenantTask);
reportTaskService.calcApplicationSummary(tenantTask.getReportId());
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/CalcTpsTargetJob.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/CalcTpsTargetJob.java
index f02e062d8c..3c93608969 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/CalcTpsTargetJob.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/CalcTpsTargetJob.java
@@ -3,7 +3,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import com.dangdang.ddframe.job.api.ShardingContext;
@@ -11,24 +10,25 @@
import io.shulie.takin.job.annotation.ElasticSchedulerJob;
import io.shulie.takin.web.biz.common.AbstractSceneTask;
import io.shulie.takin.web.biz.service.report.ReportTaskService;
+import io.shulie.takin.web.biz.threadpool.ThreadPoolUtil;
import io.shulie.takin.web.common.pojo.dto.SceneTaskDto;
import io.shulie.takin.web.ext.util.WebPluginUtils;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
* @author 无涯
* @date 2021/7/13 23:10
*/
-@Component
-@ElasticSchedulerJob(jobName = "calcTpsTargetJob",
- // 分片序列号和参数用等号分隔 不需要参数可以不加
- //shardingItemParameters = "0=0,1=1,2=2",
- isSharding = true,
- cron = "*/10 * * * * ?",
- description = "获取tps指标图")
+//@Component
+//@ElasticSchedulerJob(jobName = "calcTpsTargetJob",
+// // 分片序列号和参数用等号分隔 不需要参数可以不加
+// //shardingItemParameters = "0=0,1=1,2=2",
+// isSharding = true,
+// cron = "*/10 * * * * ?",
+// description = "获取tps指标图")
@Slf4j
public class CalcTpsTargetJob extends AbstractSceneTask implements SimpleJob {
@@ -36,55 +36,58 @@ public class CalcTpsTargetJob extends AbstractSceneTask implements SimpleJob {
private ReportTaskService reportTaskService;
@Autowired
- @Qualifier("reportTpsThreadPool")
- private ThreadPoolExecutor reportThreadPool;
+ private ThreadPoolUtil threadPoolUtil;
private static Map runningTasks = new ConcurrentHashMap<>();
private static AtomicInteger EMPTY = new AtomicInteger();
@Override
public void execute(ShardingContext shardingContext) {
+ try {
+ this.execute_ext(shardingContext);
+ } catch (Throwable e) {
+ // 捕捉全部异常,防止任务异常,导致esjob有问题
+ log.error("io.shulie.takin.web.biz.job.CalcTpsTargetJob#execute error" + ExceptionUtils.getStackTrace(e));
+ }
+ }
+
+ public void execute_ext(ShardingContext shardingContext) {
long start = System.currentTimeMillis();
final Boolean openVersion = WebPluginUtils.isOpenVersion();
- while (true) {
- List taskDtoList = getTaskFromRedis();
- if (taskDtoList == null) { break; }
- if (openVersion) {
- for (SceneTaskDto taskDto : taskDtoList) {
- Long reportId = taskDto.getReportId();
- // 开始数据层分片
- if (reportId % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()) {
- Object task = runningTasks.putIfAbsent(reportId, EMPTY);
- if (task == null) {
- reportThreadPool.execute(() -> {
- try {
- reportTaskService.calcTpsTarget(reportId);
- } catch (Throwable e) {
- log.error("execute CalcTpsTargetJob occured error. reportId={}", reportId, e);
- } finally {
- runningTasks.remove(reportId);
- }
-
- });
- }
+ List taskDtoList = getTaskFromRedis();
+ if (taskDtoList == null) {
+ log.warn("current not running pressure task!!!");
+ return;
+ }
+ if (openVersion) {
+ for (SceneTaskDto taskDto : taskDtoList) {
+ Long reportId = taskDto.getReportId();
+ // 开始数据层分片
+ if (reportId % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()) {
+ Object task = runningTasks.putIfAbsent(reportId, EMPTY);
+ if (task == null) {
+ ThreadPoolUtil.getReportTpsThreadPool().execute(() -> {
+ try {
+ reportTaskService.calcTpsTarget(reportId);
+ } catch (Throwable e) {
+ log.error("execute CalcTpsTargetJob occured error. reportId={}", reportId, e);
+ } finally {
+ runningTasks.remove(reportId);
+ }
+ });
}
}
- } else {
- this.runTask(taskDtoList,shardingContext);
}
+ } else {
+ this.runTask_ext(taskDtoList, shardingContext);
}
log.debug("calcTpsTargetJob 执行时间:{}", System.currentTimeMillis() - start);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
}
@Override
protected void runTaskInTenantIfNecessary(SceneTaskDto tenantTask, Long reportId) {
//将任务放入线程池
- reportThreadPool.execute(() -> {
+ threadPoolUtil.getReportTpsThreadPool().execute(() -> {
try {
WebPluginUtils.setTraceTenantContext(tenantTask);
reportTaskService.calcTpsTarget(tenantTask.getReportId());
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/FinishReportJob.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/FinishReportJob.java
index 141e75bcea..23237f6ee4 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/FinishReportJob.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/FinishReportJob.java
@@ -3,7 +3,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import com.dangdang.ddframe.job.api.ShardingContext;
@@ -11,11 +10,12 @@
import io.shulie.takin.job.annotation.ElasticSchedulerJob;
import io.shulie.takin.web.biz.common.AbstractSceneTask;
import io.shulie.takin.web.biz.service.report.ReportTaskService;
+import io.shulie.takin.web.biz.threadpool.ThreadPoolUtil;
import io.shulie.takin.web.common.pojo.dto.SceneTaskDto;
import io.shulie.takin.web.ext.util.WebPluginUtils;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
@@ -24,69 +24,70 @@
*/
@Component
@ElasticSchedulerJob(jobName = "finishReportJob",
- // 分片序列号和参数用等号分隔 不需要参数可以不加
- //shardingItemParameters = "0=0,1=1,2=2",
- isSharding = true,
- cron = "*/10 * * * * ?",
- description = "压测报告状态,汇总报告")
+ // 分片序列号和参数用等号分隔 不需要参数可以不加
+ //shardingItemParameters = "0=0,1=1,2=2",
+ isSharding = true,
+ cron = "*/10 * * * * ?",
+ description = "压测报告状态,汇总报告")
@Slf4j
public class FinishReportJob extends AbstractSceneTask implements SimpleJob {
@Autowired
private ReportTaskService reportTaskService;
- @Autowired
- @Qualifier("reportFinishThreadPool")
- private ThreadPoolExecutor reportThreadPool;
-
private static Map runningTasks = new ConcurrentHashMap<>();
private static AtomicInteger EMPTY = new AtomicInteger();
@Override
public void execute(ShardingContext shardingContext) {
+ try {
+ this.execute_ext(shardingContext);
+ } catch (Throwable e) {
+ // 捕捉全部异常,防止任务异常,导致esjob有问题
+ log.error("io.shulie.takin.web.biz.job.FinishReportJob#execute error" + ExceptionUtils.getStackTrace(e));
+ }
+ }
+
+ public void execute_ext(ShardingContext shardingContext) {
long start = System.currentTimeMillis();
final Boolean openVersion = WebPluginUtils.isOpenVersion();
//任务开始
- while (true){
- List taskDtoList = getTaskFromRedis();
- if (taskDtoList == null) { break; }
- if(openVersion) {
- for (SceneTaskDto taskDto : taskDtoList) {
- Long reportId = taskDto.getReportId();
- // 私有化 + 开源 根据 报告id进行分片
- // 开始数据层分片
- if (reportId % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()) {
- Object task = runningTasks.putIfAbsent(reportId, EMPTY);
- if (task == null) {
- reportThreadPool.execute(() -> {
- try {
- reportTaskService.finishReport(reportId,taskDto);
- } catch (Throwable e) {
- log.error("execute FinishReportJob occured error. reportId={}", reportId, e);
- } finally {
- runningTasks.remove(reportId);
- }
- });
- }
+ List taskDtoList = getTaskFromRedis();
+ if (taskDtoList == null) {
+ log.warn("current not running pressure task!!!");
+ return;
+ }
+ if (openVersion) {
+ for (SceneTaskDto taskDto : taskDtoList) {
+ Long reportId = taskDto.getReportId();
+ // 私有化 + 开源 根据 报告id进行分片
+ // 开始数据层分片
+ if (reportId % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()) {
+ Object task = runningTasks.putIfAbsent(reportId, EMPTY);
+ if (task == null) {
+ ThreadPoolUtil.getReportFinishThreadPool().execute(() -> {
+ try {
+ reportTaskService.finishReport(reportId, taskDto);
+ } catch (Throwable e) {
+ log.error("execute FinishReportJob occured error. reportId={}", reportId, e);
+ } finally {
+ runningTasks.remove(reportId);
+ }
+ });
}
}
- this.cleanUnAvailableTasks(taskDtoList);
- }else {
- final List taskAlreadyRun = this.runTask(taskDtoList, shardingContext);
- this.cleanUnAvailableTasks(taskAlreadyRun);
}
+ this.cleanUnAvailableTasks(taskDtoList);
+ } else {
+ this.runTask_ext(taskDtoList, shardingContext);
+ this.cleanUnAvailableTasks(taskDtoList);
}
log.debug("finishReport 执行时间:{}", System.currentTimeMillis() - start);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
}
@Override
- protected void runTaskInTenantIfNecessary( SceneTaskDto tenantTask, Long reportId) {
+ protected void runTaskInTenantIfNecessary(SceneTaskDto tenantTask, Long reportId) {
//将任务放入线程池
- reportThreadPool.execute(() -> {
+ ThreadPoolUtil.getReportFinishThreadPool().execute(() -> {
try {
WebPluginUtils.setTraceTenantContext(tenantTask);
reportTaskService.finishReport(reportId, tenantTask);
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/SyncMachineDataJob.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/SyncMachineDataJob.java
index 9f4fe04298..ea50b65108 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/SyncMachineDataJob.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/job/SyncMachineDataJob.java
@@ -3,90 +3,92 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
+import com.google.common.collect.Maps;
import io.shulie.takin.job.annotation.ElasticSchedulerJob;
import io.shulie.takin.web.biz.common.AbstractSceneTask;
import io.shulie.takin.web.biz.service.report.ReportTaskService;
+import io.shulie.takin.web.biz.threadpool.ThreadPoolUtil;
import io.shulie.takin.web.common.pojo.dto.SceneTaskDto;
import io.shulie.takin.web.ext.util.WebPluginUtils;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
* @author 无涯
* @date 2021/7/13 23:10
*/
-@Component
-@ElasticSchedulerJob(jobName = "syncMachineDataJob",
- // 分片序列号和参数用等号分隔 不需要参数可以不加
- isSharding = true,
- //shardingItemParameters = "0=0,1=1,2=2",
- cron = "*/10 * * * * ?",
- description = "同步应用基础信息")
+//@Component
+//@ElasticSchedulerJob(jobName = "syncMachineDataJob",
+// // 分片序列号和参数用等号分隔 不需要参数可以不加
+// isSharding = true,
+// //shardingItemParameters = "0=0,1=1,2=2",
+// cron = "*/10 * * * * ?",
+// description = "同步应用基础信息")
@Slf4j
public class SyncMachineDataJob extends AbstractSceneTask implements SimpleJob {
@Autowired
private ReportTaskService reportTaskService;
- @Autowired
- @Qualifier("reportMachineThreadPool")
- private ThreadPoolExecutor reportThreadPool;
-
-
private static Map runningTasks = new ConcurrentHashMap<>();
private static AtomicInteger EMPTY = new AtomicInteger();
+ private Map syncMacheineMap = Maps.newHashMap();
+
@Override
public void execute(ShardingContext shardingContext) {
+ try {
+ this.execute_ext(shardingContext);
+ } catch (Throwable e) {
+ // 捕捉全部异常,防止任务异常,导致esjob有问题
+ log.error("io.shulie.takin.web.biz.job.SyncMachineDataJob#execute error" + ExceptionUtils.getStackTrace(e));
+ }
+ }
+
+ public void execute_ext(ShardingContext shardingContext) {
long start = System.currentTimeMillis();
final Boolean openVersion = WebPluginUtils.isOpenVersion();
- while (true) {
- List taskDtoList = getTaskFromRedis();
- if (taskDtoList == null) { break; }
+ List taskDtoList = getTaskFromRedis();
+ if (taskDtoList == null) {
+ log.warn("current not running pressure task!!!");
+ return;
+ }
- if (openVersion){
- for (SceneTaskDto taskDto : taskDtoList) {
- Long reportId = taskDto.getReportId();
- if (reportId % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()) {
- Object task = runningTasks.putIfAbsent(reportId, EMPTY);
- if (task == null) {
- reportThreadPool.execute(() -> {
- try {
- reportTaskService.syncMachineData(reportId);
- } catch (Throwable e) {
- log.error("execute SyncMachineDataJob occured error. reportId= {}", reportId, e);
- } finally {
- runningTasks.remove(reportId);
- }
- });
- }
+ if (openVersion) {
+ for (SceneTaskDto taskDto : taskDtoList) {
+ Long reportId = taskDto.getReportId();
+ if (reportId % shardingContext.getShardingTotalCount() == shardingContext.getShardingItem()) {
+ Object task = runningTasks.putIfAbsent(reportId, EMPTY);
+ if (task == null) {
+ ThreadPoolUtil.getSyncMachinePool().execute(() -> {
+ try {
+ reportTaskService.syncMachineData(reportId);
+ } catch (Throwable e) {
+ log.error("execute SyncMachineDataJob occured error. reportId= {}", reportId, e);
+ } finally {
+ runningTasks.remove(reportId);
+ }
+ });
}
-
}
- }else {
- this.runTask(taskDtoList,shardingContext);
}
+ } else {
+ this.runTask_ext(taskDtoList, shardingContext);
}
-
log.debug("syncMachineData 执行时间:{}", System.currentTimeMillis() - start);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
}
@Override
protected void runTaskInTenantIfNecessary(SceneTaskDto tenantTask, Long reportId) {
//将任务放入线程池
- reportThreadPool.execute(() -> {
+ ThreadPoolUtil.getSyncMachinePool().execute(() -> {
try {
WebPluginUtils.setTraceTenantContext(tenantTask);
reportTaskService.syncMachineData(tenantTask.getReportId());
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/mq/consumer/impl/middleware/AgentPushMiddlewareAndCompareConsumer.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/mq/consumer/impl/middleware/AgentPushMiddlewareAndCompareConsumer.java
index 84db709517..4cdde8e008 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/mq/consumer/impl/middleware/AgentPushMiddlewareAndCompareConsumer.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/mq/consumer/impl/middleware/AgentPushMiddlewareAndCompareConsumer.java
@@ -18,6 +18,7 @@
import io.shulie.takin.web.ext.util.WebPluginUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
@@ -39,6 +40,10 @@ public class AgentPushMiddlewareAndCompareConsumer implements MessageListener {
@Autowired
private ApplicationMiddlewareDAO applicationMiddlewareDAO;
+ //默认不处理中间件信息
+ @Value("${takin.enable.middlewareFlag:false}")
+ private boolean middlewareFlag;
+
@Autowired
@Lazy
private ApplicationMiddlewareService applicationMiddlewareService;
@@ -46,6 +51,9 @@ public class AgentPushMiddlewareAndCompareConsumer implements MessageListener {
@Transactional(rollbackFor = Throwable.class)
@Override
public void onMessage(Message message, byte[] pattern) {
+ if (!middlewareFlag) {
+ return;
+ }
String messageBody = new String(message.getBody());
if (StringUtils.isEmpty(messageBody)) {
return;
@@ -53,7 +61,7 @@ public void onMessage(Message message, byte[] pattern) {
messageBody = messageBody.substring(1, messageBody.length() - 1).replace("\\", "");
MqApplicationMiddlewareCompareDTO mqApplicationMiddlewareCompareDTO = JsonUtil.json2Bean(messageBody,
- MqApplicationMiddlewareCompareDTO.class);
+ MqApplicationMiddlewareCompareDTO.class);
if (mqApplicationMiddlewareCompareDTO == null) {
return;
}
@@ -69,15 +77,15 @@ public void onMessage(Message message, byte[] pattern) {
try {
TenantCommonExt tenantCommonExt = new TenantCommonExt(mqApplicationMiddlewareCompareDTO.getTenantId(),
- null, mqApplicationMiddlewareCompareDTO.getEnvCode(),
- null, ContextSourceEnum.JOB.getCode());
+ null, mqApplicationMiddlewareCompareDTO.getEnvCode(),
+ null, ContextSourceEnum.JOB.getCode());
WebPluginUtils.setTraceTenantContext(tenantCommonExt);
// 根据 applicationId 查询应用中间件
log.info("应用中间件上报 --> 异步消息处理 --> 应用中间件查询");
PageUtils.clearPageHelper();
List applicationMiddlewareList =
- applicationMiddlewareDAO.listByApplicationId(applicationId);
+ applicationMiddlewareDAO.listByApplicationId(applicationId);
if (applicationMiddlewareList.isEmpty()) {
return;
}
@@ -85,7 +93,7 @@ public void onMessage(Message message, byte[] pattern) {
// 比对
log.info("应用中间件上报 --> 异步消息处理 --> 应用中间件比对");
List updateParamList =
- applicationMiddlewareService.doCompare(applicationMiddlewareList);
+ applicationMiddlewareService.doCompare(applicationMiddlewareList);
log.info("应用中间件上报 --> 异步消息处理 --> 应用中间件更新");
applicationMiddlewareDAO.updateBatchById(updateParamList);
@@ -103,7 +111,7 @@ public void onMessage(Message message, byte[] pattern) {
* @return 相应数据
*/
private MqApplicationMiddlewareCompareDTO getMqApplicationMiddlewareCompareDTO(
- Message message) {
+ Message message) {
String messageBody = new String(message.getBody());
if (StringUtils.isEmpty(messageBody)) {
return null;
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/ConfCenterService.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/ConfCenterService.java
index ef0cb9b7ad..1ff7578fef 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/ConfCenterService.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/ConfCenterService.java
@@ -99,6 +99,7 @@
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -150,6 +151,10 @@ public class ConfCenterService extends CommonService {
public static final String APPLICATION_CACHE_PREFIX = "application:cache";
+ // 是否默认初始化白名单
+ @Value("${takin.enable.initWhiteList:false}")
+ private boolean initWhiteList;
+
@PostConstruct
public void init() {
number = ConfigServerHelper.getWrapperIntegerValueByKey(ConfigServerKeyEnum.TAKIN_WHITE_LIST_NUMBER_LIMIT);
@@ -244,6 +249,10 @@ private void addApplicationToDataBuild(ApplicationCreateParam tApplicationMnt) {
@PostConstruct
public void initWhiteList() {
+ if (!initWhiteList) {
+ log.info("不初始化白名单到文件");
+ return;
+ }
writeWhiteListFile();
}
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/application/impl/ApplicationErrorServiceImpl.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/application/impl/ApplicationErrorServiceImpl.java
index 5315ece1cc..ed07bb067b 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/application/impl/ApplicationErrorServiceImpl.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/application/impl/ApplicationErrorServiceImpl.java
@@ -18,6 +18,7 @@
import com.pamirs.takin.common.util.DateUtils;
import com.pamirs.takin.entity.domain.dto.NodeUploadDataDTO;
import com.pamirs.takin.entity.domain.entity.ExceptionInfo;
+import io.shulie.takin.utils.string.StringUtil;
import io.shulie.takin.web.biz.pojo.input.application.ApplicationErrorQueryInput;
import io.shulie.takin.web.biz.pojo.output.application.ApplicationErrorOutput;
import io.shulie.takin.web.biz.pojo.output.application.ApplicationExceptionOutput;
@@ -65,15 +66,15 @@ public List list(ApplicationErrorQueryInput queryRequest
// 应用节点相关错误信息
ApplicationErrorOutput nodeErrorResponse =
- this.getNodeErrorResponse(tApplicationMnt.getApplicationName(), tApplicationMnt.getNodeNum());
+ this.getNodeErrorResponse(tApplicationMnt.getApplicationName(), tApplicationMnt.getNodeNum());
if (nodeErrorResponse != null) {
responseList.add(nodeErrorResponse);
}
//redisKey改造
String appUniqueKey = CommonUtil.generateRedisKeyWithSeparator(Separator.Separator3,
- WebPluginUtils.traceTenantAppKey(), WebPluginUtils.traceEnvCode(),
- queryRequest.getApplicationId() + ApplicationServiceImpl.PRADARNODE_KEYSET);
+ WebPluginUtils.traceTenantAppKey(), WebPluginUtils.traceEnvCode(),
+ queryRequest.getApplicationId() + ApplicationServiceImpl.PRADARNODE_KEYSET);
Set keys = redisTemplate.opsForSet().members(appUniqueKey);
if (keys == null || keys.size() == 0) {
return responseList;
@@ -92,9 +93,9 @@ public List list(ApplicationErrorQueryInput queryRequest
return this.processErrorList(responseList);
}
- private ApplicationDetailResult ensureApplicationExist(ApplicationErrorQueryInput queryRequest) {
+ public ApplicationDetailResult ensureApplicationExist(ApplicationErrorQueryInput queryRequest) {
Response applicationMntResponse = applicationService.getApplicationInfoForError(
- String.valueOf(queryRequest.getApplicationId()));
+ String.valueOf(queryRequest.getApplicationId()));
ApplicationDetailResult tApplicationMnt = applicationMntResponse.getData();
if (Objects.isNull(tApplicationMnt)) {
throw new TakinWebException(TakinWebExceptionEnum.APPLICATION_MANAGE_VALIDATE_ERROR, "应用不存在");
@@ -103,11 +104,11 @@ private ApplicationDetailResult ensureApplicationExist(ApplicationErrorQueryInpu
}
private void putNodeExceptionIfNeeded(List responseList,
- ApplicationDetailResult tApplicationMnt) {
+ ApplicationDetailResult tApplicationMnt) {
Integer totalNodeCount = tApplicationMnt.getNodeNum();
Integer onlineNodeCount = 0;
List applicationResultList = applicationDAO.getApplicationByName(
- Collections.singletonList(tApplicationMnt.getApplicationName()));
+ Collections.singletonList(tApplicationMnt.getApplicationName()));
if (CollectionUtils.isEmpty(applicationResultList)) {
log.error("AMDB中应用信息查询结果为空");
} else {
@@ -119,16 +120,16 @@ private void putNodeExceptionIfNeeded(List responseList,
}
if (!totalNodeCount.equals(onlineNodeCount)) {
responseList.add(new ApplicationErrorOutput()
- .setExceptionId("-")
- .setAgentIdList(Collections.singletonList("-"))
- .setDescription("在线节点数 与 配置的节点总数 不一致")
- .setTime(DateUtils.getNowDateStr())
- .setDetail("设置节点数:" + totalNodeCount + ",在线节点数:" + onlineNodeCount));
+ .setExceptionId("-")
+ .setAgentIdList(Collections.singletonList("-"))
+ .setDescription("在线节点数 与 配置的节点总数 不一致")
+ .setTime(DateUtils.getNowDateStr())
+ .setDetail("设置节点数:" + totalNodeCount + ",在线节点数:" + onlineNodeCount));
}
}
private void convertNodeUploadDataList(List responseList,
- List nodeUploadDataDTOList) {
+ List nodeUploadDataDTOList) {
nodeUploadDataDTOList.parallelStream().forEach(n -> {
NodeUploadDataDTO nodeUploadDataDTO = JSONObject.parseObject(n, NodeUploadDataDTO.class);
Map exceptionMap = nodeUploadDataDTO.getSwitchErrorMap();
@@ -143,13 +144,16 @@ private void convertNodeUploadDataList(List responseList
log.error("异常转换失败:错误信息: {},异常内容{}", message, e.getMessage());
}
ApplicationErrorOutput applicationErrorResponse
- = new ApplicationErrorOutput()
- .setExceptionId(exceptionInfo != null ? exceptionInfo.getErrorCode() : "web-异常原文显示")
- .setAgentIdList(Collections.singletonList(nodeUploadDataDTO.getAgentId()))
- .setDescription(exceptionInfo != null ? exceptionInfo.getMessage() : message)
- .setDetail(exceptionInfo != null ? exceptionInfo.getDetail() : message)
- .setTime(nodeUploadDataDTO.getExceptionTime());
- responseList.add(applicationErrorResponse);
+ = new ApplicationErrorOutput()
+ .setExceptionId(exceptionInfo != null ? exceptionInfo.getErrorCode() : "web-异常原文显示")
+ .setAgentIdList(Collections.singletonList(nodeUploadDataDTO.getAgentId()))
+ .setDescription(exceptionInfo != null ? exceptionInfo.getMessage() : message)
+ .setDetail(exceptionInfo != null ? exceptionInfo.getDetail() : message)
+ .setTime(nodeUploadDataDTO.getExceptionTime());
+ if (!StringUtil.equals("探针接入异常", applicationErrorResponse.getDetail())
+ || !StringUtil.equals("探针接入异常", applicationErrorResponse.getDescription())) {
+ responseList.add(applicationErrorResponse);
+ }
}
}
}
@@ -173,8 +177,8 @@ public List getAppException(List appNames) {
}
//redisKey改造
String appUniqueKey = CommonUtil.generateRedisKeyWithSeparator(Separator.Separator3,
- WebPluginUtils.traceTenantAppKey(), WebPluginUtils.traceTenantCode(),
- app.getAppId() + ApplicationServiceImpl.PRADAR_SEPERATE_FLAG);
+ WebPluginUtils.traceTenantAppKey(), WebPluginUtils.traceTenantCode(),
+ app.getAppId() + ApplicationServiceImpl.PRADAR_SEPERATE_FLAG);
Set keys = redisTemplate.keys(appUniqueKey + "*");
if (keys != null) {
for (String nodeKey : keys) {
@@ -191,7 +195,7 @@ public List getAppException(List appNames) {
if (message.contains("errorCode")) {
try {
ExceptionInfo exceptionInfo = JSONObject.parseObject(message,
- ExceptionInfo.class);
+ ExceptionInfo.class);
ApplicationExceptionOutput output = new ApplicationExceptionOutput();
output.setApplicationName(app.getAppName());
output.setAgentIds(Arrays.asList(nodeUploadDataDTO.getAgentId()));
@@ -223,13 +227,13 @@ public List getAppException(List appNames) {
* @param totalNodeCount 节点数量
* @return 节点错误
*/
- private ApplicationErrorOutput getNodeErrorResponse(String applicationName, Integer totalNodeCount) {
+ public ApplicationErrorOutput getNodeErrorResponse(String applicationName, Integer totalNodeCount) {
List applicationResultList = applicationDAO.getApplicationByName(
- Collections.singletonList(applicationName));
+ Collections.singletonList(applicationName));
ApplicationErrorOutput applicationErrorResponse = null;
if (CollectionUtils.isEmpty(applicationResultList)
- || !totalNodeCount.equals(applicationResultList.get(0).getInstanceInfo().getInstanceOnlineAmount())) {
+ || !totalNodeCount.equals(applicationResultList.get(0).getInstanceInfo().getInstanceOnlineAmount())) {
applicationErrorResponse = new ApplicationErrorOutput();
applicationErrorResponse.setExceptionId("-");
applicationErrorResponse.setAgentIdList(Collections.singletonList("-"));
@@ -255,14 +259,14 @@ private ApplicationErrorOutput getNodeErrorResponse(String applicationName, Inte
private List processErrorList(List responseList) {
// 按照时间倒序输出
List sortedList = responseList.parallelStream()
- .filter(t -> t != null && CharSequenceUtil.isNotBlank(t.getTime()))
- .sorted((a1, a2) -> a2.getTime().compareTo(a1.getTime()))
- .collect(Collectors.toList());
+ .filter(t -> t != null && CharSequenceUtil.isNotBlank(t.getTime()))
+ .sorted((a1, a2) -> a2.getTime().compareTo(a1.getTime()))
+ .collect(Collectors.toList());
List noTimeList = responseList.parallelStream()
- // 无时间的
- .filter(response -> response != null && CharSequenceUtil.isBlank(response.getTime()))
- .collect(Collectors.toList());
+ // 无时间的
+ .filter(response -> response != null && CharSequenceUtil.isBlank(response.getTime()))
+ .collect(Collectors.toList());
if (sortedList.isEmpty()) {
return noTimeList;
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/application/impl/ApplicationMiddlewareServiceImpl.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/application/impl/ApplicationMiddlewareServiceImpl.java
index 32f2281434..af459df30d 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/application/impl/ApplicationMiddlewareServiceImpl.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/application/impl/ApplicationMiddlewareServiceImpl.java
@@ -42,6 +42,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -70,9 +71,13 @@ public class ApplicationMiddlewareServiceImpl implements ApplicationMiddlewareSe
@Autowired
private MiddlewareJarService middlewareJarService;
+ //默认不处理中间件信息
+ @Value("${takin.enable.middlewareFlag:false}")
+ private boolean middlewareFlag;
+
@Override
public PagingList page(
- ListApplicationMiddlewareRequest listApplicationMiddlewareRequest) {
+ ListApplicationMiddlewareRequest listApplicationMiddlewareRequest) {
PageApplicationMiddlewareParam pageApplicationMiddlewareParam = new PageApplicationMiddlewareParam();
BeanUtils.copyProperties(listApplicationMiddlewareRequest, pageApplicationMiddlewareParam);
@@ -90,7 +95,7 @@ public PagingList page(
// 状态转换
ApplicationMiddlewareStatusEnum applicationMiddlewareStatusEnum =
- ApplicationMiddlewareStatusEnum.getByCode(result.getStatus());
+ ApplicationMiddlewareStatusEnum.getByCode(result.getStatus());
if (applicationMiddlewareStatusEnum != null) {
response.setStatusDesc(applicationMiddlewareStatusEnum.getDesc());
}
@@ -104,7 +109,7 @@ public PagingList page(
public ApplicationMiddlewareCountResponse countSome(Long applicationId) {
PageUtils.clearPageHelper();
List statusMapCountResultList = applicationMiddlewareDAO
- .listCountByApplicationIdAndStatusAndGroupByStatus(applicationId, null);
+ .listCountByApplicationIdAndStatusAndGroupByStatus(applicationId, null);
if (statusMapCountResultList.isEmpty()) {
return new ApplicationMiddlewareCountResponse();
@@ -112,8 +117,8 @@ public ApplicationMiddlewareCountResponse countSome(Long applicationId) {
// 状态统计转为 状态 -> 统计个数 map
Map statusAboutCount = statusMapCountResultList.stream()
- .collect(Collectors.toMap(ApplicationMiddlewareStatusAboutCountResult::getStatus,
- ApplicationMiddlewareStatusAboutCountResult::getCount, (v1, v2) -> v2));
+ .collect(Collectors.toMap(ApplicationMiddlewareStatusAboutCountResult::getStatus,
+ ApplicationMiddlewareStatusAboutCountResult::getCount, (v1, v2) -> v2));
ApplicationMiddlewareCountResponse response = new ApplicationMiddlewareCountResponse();
response.setTotalCount(applicationMiddlewareDAO.countByApplicationIdAndStatus(applicationId, null));
@@ -122,7 +127,7 @@ public ApplicationMiddlewareCountResponse countSome(Long applicationId) {
response.setNotSupportedCount(statusAboutCount.get(ApplicationMiddlewareStatusEnum.NOT_SUPPORTED.getCode()));
response.setNoneCount(statusAboutCount.get(ApplicationMiddlewareStatusEnum.NONE.getCode()));
response.setNoSupportRequiredCount(
- statusAboutCount.get(ApplicationMiddlewareStatusEnum.NO_SUPPORT_REQUIRED.getCode()));
+ statusAboutCount.get(ApplicationMiddlewareStatusEnum.NO_SUPPORT_REQUIRED.getCode()));
return response;
}
@@ -156,7 +161,7 @@ public void compare(Long applicationId) {
public List doCompare(List results) {
// 转 dto
List compareApplicationMiddlewareList = DataTransformUtil.list2list(results,
- CompareApplicationMiddlewareDTO.class);
+ CompareApplicationMiddlewareDTO.class);
// 比对
middlewareJarService.appCompare(compareApplicationMiddlewareList);
@@ -168,6 +173,9 @@ public List doCompare(List middlewareList = pushMiddlewareRequest.getMiddlewareList();
if (middlewareList.isEmpty()) {
return;
@@ -195,9 +203,9 @@ public void pushMiddlewareList(PushMiddlewareRequest pushMiddlewareRequest) {
// 新的中间件插入
log.info("应用中间件上报 --> 插入上报中间件");
List createApplicationMiddlewareParamList =
- this.listCreateApplicationMiddlewareParam(middlewareList, application);
+ this.listCreateApplicationMiddlewareParam(middlewareList, application);
this.isPushError(!applicationMiddlewareDAO.insertBatch(createApplicationMiddlewareParamList),
- "应用中间件报错失败!");
+ "应用中间件报错失败!");
} catch (Exception e) {
// 发生错误, 解锁
@@ -221,20 +229,20 @@ public void pushMiddlewareList(PushMiddlewareRequest pushMiddlewareRequest) {
@Override
public Map> getApplicationNameAboutStatusCountMap(
- List applicationIds) {
+ List applicationIds) {
List statusList = Arrays.asList(ApplicationMiddlewareStatusEnum.NONE.getCode(),
- ApplicationMiddlewareStatusEnum.UNKNOWN.getCode(),
- ApplicationMiddlewareStatusEnum.NOT_SUPPORTED.getCode());
+ ApplicationMiddlewareStatusEnum.UNKNOWN.getCode(),
+ ApplicationMiddlewareStatusEnum.NOT_SUPPORTED.getCode());
List results = applicationMiddlewareDAO
- .listStatusCountByAndGroupByApplicationNameListAndStatus(applicationIds, statusList);
+ .listStatusCountByAndGroupByApplicationNameListAndStatus(applicationIds, statusList);
if (results.isEmpty()) {
return Collections.emptyMap();
}
return results.stream()
- .collect(Collectors.groupingBy(ApplicationMiddlewareStatusAboutCountResult::getApplicationName,
- Collectors.toMap(ApplicationMiddlewareStatusAboutCountResult::getStatus,
- ApplicationMiddlewareStatusAboutCountResult::getCount)));
+ .collect(Collectors.groupingBy(ApplicationMiddlewareStatusAboutCountResult::getApplicationName,
+ Collectors.toMap(ApplicationMiddlewareStatusAboutCountResult::getStatus,
+ ApplicationMiddlewareStatusAboutCountResult::getCount)));
}
/**
@@ -267,7 +275,7 @@ private void isPushError(boolean condition, String message) {
* @return 待新增的中间件对象列表
*/
private List listCreateApplicationMiddlewareParam(
- List middlewareList, ApplicationDetailResult application) {
+ List middlewareList, ApplicationDetailResult application) {
return middlewareList.stream().map(pushMiddlewareListRequest -> {
CreateApplicationMiddlewareParam createParam = new CreateApplicationMiddlewareParam();
createParam.setApplicationId(application.getApplicationId());
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/impl/ApplicationPluginsConfigServiceImpl.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/impl/ApplicationPluginsConfigServiceImpl.java
index 1b83c02100..03493f245e 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/impl/ApplicationPluginsConfigServiceImpl.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/impl/ApplicationPluginsConfigServiceImpl.java
@@ -7,6 +7,7 @@
import javax.annotation.Resource;
+import cn.hutool.core.util.NumberUtil;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.google.common.collect.Lists;
import io.shulie.takin.common.beans.page.PagingList;
@@ -82,7 +83,7 @@ public PagingList getPageByParam(ApplicationPluginsC
if ("-1".equals(configVO.getConfigValue())) {
configVO.setConfigValueName("与业务key一致");
} else {
- configVO.setConfigValueName(record.getConfigValue() + "小时");
+ configVO.setConfigValueName(record.getConfigValue() + "分钟");
}
//精度丢失问题
configVO.setApplicationId(record.getApplicationId() + "");
@@ -204,7 +205,7 @@ public Boolean update(ApplicationPluginsConfigParam param) {
OperationLogContextHolder.operationType(OpTypes.UPDATE);
OperationLogContextHolder.addVars(Vars.APPLICATION_ID,oldEntity.getApplicationId().toString());
OperationLogContextHolder.addVars(Vars.APP_PLUGIN_KEY,oldEntity.getConfigItem());
- OperationLogContextHolder.addVars(Vars.APP_PLUGIN_VALUE,oldEntity.getConfigValue().equals("-1")?"与业务key一致":oldEntity.getConfigValue()+" h");
+ OperationLogContextHolder.addVars(Vars.APP_PLUGIN_VALUE,oldEntity.getConfigValue().equals("-1")?"与业务key一致":oldEntity.getConfigValue()+" min");
return true;
}
@@ -216,9 +217,18 @@ public List getListByParam(ApplicationPluginsConfigP
if (Objects.isNull(param.getConfigKey())) {
throw new TakinWebException(ExceptionCode.POD_NUM_EMPTY, "configKey为空");
}
+ ApplicationDetailResult application = applicationDAO.getApplicationByTenantIdAndName(param.getApplicationName());
+ param.setApplicationId(application.getApplicationId());
List list = applicationPluginsConfigDAO.findList(param);
if (list != null && !list.isEmpty()) {
- return CopyUtils.copyFieldsList(list, ApplicationPluginsConfigVO.class);
+ List vos = CopyUtils.copyFieldsList(list, ApplicationPluginsConfigVO.class);
+ vos.forEach(x -> {
+ if(!"-1".equals(x.getConfigValue())){
+ x.setConfigValue(NumberUtil.div(x.getConfigValue(),"60",3).toString());
+ }
+ });
+// vos.forEach(x -> x.setConfigValue(NumberUtil.div(x.getConfigValue(),"60",3).toString()));
+ return vos;
}
return null;
}
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/impl/ApplicationServiceImpl.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/impl/ApplicationServiceImpl.java
index 5a497878e5..dc8a30ef85 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/impl/ApplicationServiceImpl.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/impl/ApplicationServiceImpl.java
@@ -43,6 +43,7 @@
import io.shulie.takin.web.biz.pojo.input.application.*;
import io.shulie.takin.web.biz.pojo.input.whitelist.WhitelistImportFromExcelInput;
import io.shulie.takin.web.biz.pojo.openapi.response.application.ApplicationListResponse;
+import io.shulie.takin.web.biz.pojo.output.application.ApplicationErrorOutput;
import io.shulie.takin.web.biz.pojo.request.activity.ActivityCreateRequest;
import io.shulie.takin.web.biz.pojo.request.application.ApplicationListByUpgradeRequest;
import io.shulie.takin.web.biz.pojo.request.application.ApplicationNodeOperateProbeRequest;
@@ -55,7 +56,9 @@
import io.shulie.takin.web.biz.pojo.response.application.ShadowServerConfigurationResponse;
import io.shulie.takin.web.biz.pojo.vo.application.ApplicationDsManageExportVO;
import io.shulie.takin.web.biz.service.*;
+import io.shulie.takin.web.biz.service.application.ApplicationErrorService;
import io.shulie.takin.web.biz.service.application.ApplicationNodeService;
+import io.shulie.takin.web.biz.service.application.impl.ApplicationErrorServiceImpl;
import io.shulie.takin.web.biz.service.dsManage.DsService;
import io.shulie.takin.web.biz.service.linkmanage.LinkGuardService;
import io.shulie.takin.web.biz.service.linkmanage.WhiteListService;
@@ -119,6 +122,7 @@
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.math.BigDecimal;
@@ -127,6 +131,7 @@
import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -364,50 +369,57 @@ public Response> getApplicationList(ApplicationQueryRequest
@Override
public Long getAccessErrorNum() {
- List results = applicationDAO.getDashboardAppData();
- // 通过amdb查询状态
- if (results == null || results.size() == 0) {
- return 0L;
- }
- //取应用节点数信息
- List appNameList = results.stream().map(ApplicationDetailResult::getApplicationName).collect(
- Collectors.toList());
- List applicationResultList = applicationDAO.getApplicationByName(appNameList);
- if (CollectionUtil.isEmpty(applicationResultList)) {
- return (long) results.size();
- }
- Map> appResultMap = applicationResultList.stream()
- .collect(Collectors.groupingBy(ApplicationResult::getAppName));
- //取应用节点版本信息
- ApplicationNodeQueryParam queryParam = new ApplicationNodeQueryParam();
- queryParam.setCurrent(0);
- queryParam.setPageSize(99999);
- queryParam.setApplicationNames(appNameList);
- PagingList applicationNodes = applicationNodeDAO.pageNodes(queryParam);
- if (CollectionUtil.isEmpty(applicationResultList)) {
- return (long) results.size();
- }
-
- List applicationNodeResultList = applicationNodes.getList();
- Map> applicationNodeResultMap = applicationNodeResultList
- .stream().collect(Collectors.groupingBy(ApplicationNodeResult::getAppName));
- return results.stream().filter(result -> {
- List nodeResults = applicationNodeResultMap.get(result.getApplicationName());
- List appResults = appResultMap.get(result.getApplicationName());
- if (CollectionUtils.isEmpty(nodeResults) || CollectionUtils.isEmpty(appResults)) {
- return true;
- }
- if (!appResults.get(0).getInstanceInfo().getInstanceOnlineAmount().equals(result.getNodeNum())
- || nodeResults.stream().map(ApplicationNodeResult::getAgentVersion).distinct().count() > 1) {
- return true;
- }
- // 自身异常
- if (AppAccessStatusEnum.EXCEPTION.getCode().equals(result.getAccessStatus())) {
- return true;
- }
- return false;
- }).count();
- }
+ ApplicationQueryRequestV2 requestV2 = new ApplicationQueryRequestV2();
+ requestV2.setAccessStatus(3);
+ return this.pageApplication(requestV2).getTotal();
+ }
+
+// @Override
+// public Long getAccessErrorNum() {
+// List results = applicationDAO.getDashboardAppData();
+// // 通过amdb查询状态
+// if (results == null || results.size() == 0) {
+// return 0L;
+// }
+// //取应用节点数信息
+// List appNameList = results.stream().map(ApplicationDetailResult::getApplicationName).collect(
+// Collectors.toList());
+// List applicationResultList = applicationDAO.getApplicationByName(appNameList);
+// if (CollectionUtil.isEmpty(applicationResultList)) {
+// return (long) results.size();
+// }
+// Map> appResultMap = applicationResultList.stream()
+// .collect(Collectors.groupingBy(ApplicationResult::getAppName));
+// //取应用节点版本信息
+// ApplicationNodeQueryParam queryParam = new ApplicationNodeQueryParam();
+// queryParam.setCurrent(0);
+// queryParam.setPageSize(99999);
+// queryParam.setApplicationNames(appNameList);
+// PagingList applicationNodes = applicationNodeDAO.pageNodes(queryParam);
+// if (CollectionUtil.isEmpty(applicationResultList)) {
+// return (long) results.size();
+// }
+//
+// List applicationNodeResultList = applicationNodes.getList();
+// Map> applicationNodeResultMap = applicationNodeResultList
+// .stream().collect(Collectors.groupingBy(ApplicationNodeResult::getAppName));
+// return results.stream().filter(result -> {
+// List nodeResults = applicationNodeResultMap.get(result.getApplicationName());
+// List appResults = appResultMap.get(result.getApplicationName());
+// if (CollectionUtils.isEmpty(nodeResults) || CollectionUtils.isEmpty(appResults)) {
+// return true;
+// }
+// if (!appResults.get(0).getInstanceInfo().getInstanceOnlineAmount().equals(result.getNodeNum())
+// || nodeResults.stream().map(ApplicationNodeResult::getAgentVersion).distinct().count() > 1) {
+// return true;
+// }
+// // 自身异常
+// if (AppAccessStatusEnum.EXCEPTION.getCode().equals(result.getAccessStatus())) {
+// return true;
+// }
+// return false;
+// }).count();
+// }
@Override
public List getApplicationListVo(ApplicationQueryRequest queryParam) {
@@ -487,7 +499,6 @@ public Response getApplicationInfo(String id) {
if (tApplicationMnt == null) {
return Response.success(new ApplicationVo());
}
-
// 取应用节点数信息
List applicationResultList = applicationDAO.getApplicationByName(
Collections.singletonList(tApplicationMnt.getApplicationName()));
@@ -666,7 +677,7 @@ public void uploadAppStatus(NodeUploadDataDTO param) {
}
@Override
- public synchronized void syncApplicationAccessStatus() {
+ public void syncApplicationAccessStatus() {
try {
// 应用分页大小
int pageSize = 20;
@@ -675,19 +686,16 @@ public synchronized void syncApplicationAccessStatus() {
PageBaseDTO pageBaseDTO = new PageBaseDTO();
pageBaseDTO.setPageSize(pageSize);
-
do {
// 分页查询数据库应用列表
List applicationList = applicationDAO.pageFromSync(pageBaseDTO);
if (applicationList.isEmpty()) {
return;
}
-
// 下一页
pageBaseDTO.setCurrent(pageBaseDTO.getCurrent() + 1);
// 赋值查询出的应用数量
applicationNumber = applicationList.size();
-
// 收集应用名称
List appNames = applicationList.stream()
.map(ApplicationListResult::getApplicationName)
@@ -705,6 +713,7 @@ public synchronized void syncApplicationAccessStatus() {
// 正常的应用
Set normalApplicationIdSet = new HashSet<>(20);
+ Map errorInfo = Maps.newHashMap();
// 遍历比对
for (ApplicationListResult application : applicationList) {
String applicationName = application.getApplicationName();
@@ -721,6 +730,8 @@ public synchronized void syncApplicationAccessStatus() {
|| !Objects.equals(amdbApplication.getInstanceInfo().getInstanceOnlineAmount(), nodeNum)) {
// amdbApplicationMap 不存在, map.get 不存在, 或者节点数不一致
errorApplicationIdSet.add(applicationId);
+ errorInfo.put(applicationId, "节点数不一致");
+
} else if (!amdbApplicationMap.isEmpty()
&& (amdbApplication = amdbApplicationMap.get(applicationName)) != null
@@ -743,7 +754,7 @@ public synchronized void syncApplicationAccessStatus() {
}
- this.syncApplicationAccessStatus(applicationList,errorApplicationIdSet);
+ this.syncApplicationAccessStatus(applicationList, errorApplicationIdSet, errorInfo);
} while (applicationNumber == pageSize);
// 先执行一遍, 然后如果分页应用数量等于pageSize, 那么查询下一页
@@ -753,18 +764,28 @@ public synchronized void syncApplicationAccessStatus() {
log.debug("定时同步应用状态完成!");
}
- private void syncApplicationAccessStatus(List applicationList,Set errorApplicationIdSet) {
+ private void syncApplicationAccessStatus(List applicationList
+ , Set errorApplicationIdSet, Map errorInfo) {
if (CollectionUtils.isNotEmpty(applicationList)) {
- applicationList.forEach(app -> {
+ for (ApplicationListResult app : applicationList) {
Map result = applicationDAO.getStatus(app.getApplicationName());
long n = (long) result.get("n");
if (n != 0 || (errorApplicationIdSet.contains(app.getApplicationId()))) {
String e = (String) result.get("e");
+
if (StringUtils.isBlank(e)) {
- String a = (String)result.get("a");
+ String a = (String) result.get("a");
+ if (StringUtils.isEmpty(a)) {
+ if (!io.shulie.takin.utils.string.StringUtil
+ .isEmpty(errorInfo.get(app.getApplicationId()))) {
+ //节点不一致
+ applicationDAO.updateStatus(app.getApplicationId(), e);
+ }
+ continue;
+ }
e = "探针接入异常";
if (StringUtils.isNotEmpty(a)) {
- e += ",agentId为"+a;
+ e += ",agentId为" + a;
}
}
applicationDAO.updateStatus(app.getApplicationId(), e);
@@ -782,8 +803,10 @@ private void syncApplicationAccessStatus(List application
param.setSwitchErrorMap(map);
uploadAccessStatus(param);
} else {
- applicationDAO.updateStatus(app.getApplicationId());}
- });
+ applicationDAO.updateStatus(app.getApplicationId());
+ }
+
+ }
}
}
@@ -1186,8 +1209,8 @@ public String getApplicationNameByApplicationId(Long applicationId) {
@Override
public void uninstallAllAgent(List appIds) {
try {
- appIds = this.filterAppIds(appIds,AgentConstants.UNINSTALL);
- if (CollectionUtils.isEmpty(appIds)){
+ appIds = this.filterAppIds(appIds, AgentConstants.UNINSTALL);
+ if (CollectionUtils.isEmpty(appIds)) {
log.info("所有需要卸载的应用都被过滤掉了");
return;
}
@@ -1295,6 +1318,9 @@ public List getAllTenantApp(List commo
return applicationDAO.getAllTenantApp(commonExtList);
}
+ @Resource
+ ApplicationErrorServiceImpl applicationErrorService;
+
@Override
public PagingList pageApplication(ApplicationQueryRequestV2 request) {
QueryApplicationParam queryApplicationParam = BeanUtil.copyProperties(request, QueryApplicationParam.class);
@@ -1312,6 +1338,12 @@ public PagingList pageApplication(ApplicationQueryReq
List responseList = records.stream().map(result -> {
ApplicationListResponseV2 response = BeanUtil.copyProperties(result, ApplicationListResponseV2.class);
response.setId(result.getApplicationId().toString());
+
+ // 跟应用详情再对比下,同步下状态
+ Response vo = this.getApplicationInfo(response.getId());
+ if (vo.getSuccess() && vo.getData() != null) {
+ response.setAccessStatus(vo.getData().getAccessStatus());
+ }
return response;
}).collect(Collectors.toList());
return PagingList.of(responseList, applicationListResultPage.getTotal());
@@ -1340,7 +1372,7 @@ public PagingList listApplicationByUpgrade(App
@Override
public Response operateCheck(List appIds, String operate) {
- if (CollectionUtils.isEmpty(appIds) || StringUtil.isEmpty(operate)){
+ if (CollectionUtils.isEmpty(appIds) || StringUtil.isEmpty(operate)) {
return Response.fail("参数异常");
}
@@ -1356,25 +1388,25 @@ public Response operateCheck(List appIds, String operate) {
List appNames = applicationList.stream().map(ApplicationDetailResult::getApplicationName).collect(
Collectors.toList());
List applicationNodeProbeResults = applicationNodeProbeDAO.listByAppNameAndOperate(ApplicationNodeProbeOperateEnum.UNINSTALL.getCode(), appNames);
- long count = applicationNodeProbeResults == null ? 0 : applicationNodeProbeResults.stream().map(ApplicationNodeProbeResult::getApplicationName).distinct().count();
- if (AgentConstants.UNINSTALL.equals(operate)){
- if (count > 0){
+ long count = applicationNodeProbeResults == null ? 0 : applicationNodeProbeResults.stream().map(ApplicationNodeProbeResult::getApplicationName).distinct().count();
+ if (AgentConstants.UNINSTALL.equals(operate)) {
+ if (count > 0) {
//构建返回数据
List distinct = applicationNodeProbeResults.stream().map(ApplicationNodeProbeResult::getApplicationName).distinct().collect(Collectors.toList());
StringBuilder sb = new StringBuilder();
distinct.forEach(s -> {
sb.append(s).append("\n");
});
- return Response.success(String.format("已选择%d个应用,%d个应用已处于卸载状态\n应用名称为:",appIds.size(),count) + sb);
- }else {
- return Response.success(String.format("已选择%d个应用,点击继续卸载",appIds.size()));
+ return Response.success(String.format("已选择%d个应用,%d个应用已处于卸载状态\n应用名称为:", appIds.size(), count) + sb);
+ } else {
+ return Response.success(String.format("已选择%d个应用,点击继续卸载", appIds.size()));
}
}
- if (AgentConstants.RESUME.equals(operate)){
- if (appIds.size() > count){
+ if (AgentConstants.RESUME.equals(operate)) {
+ if (appIds.size() > count) {
//构建返回数据
List result = appNames;
- if (count != 0){
+ if (count != 0) {
List distinct = applicationNodeProbeResults.stream().map(ApplicationNodeProbeResult::getApplicationName).distinct().collect(Collectors.toList());
result = result.stream().filter(o -> !distinct.contains(o)).collect(Collectors.toList());
}
@@ -1382,9 +1414,9 @@ public Response operateCheck(List appIds, String operate) {
result.forEach(s -> {
sb.append(s).append("\n");
});
- return Response.success(String.format("已选择%d个应用,%d个应用处于非卸载状态\n应用名称为:",appIds.size(), appIds.size() - count) + sb);
- }else {
- return Response.success(String.format("已选择%d个应用,点击继续",appIds.size()));
+ return Response.success(String.format("已选择%d个应用,%d个应用处于非卸载状态\n应用名称为:", appIds.size(), appIds.size() - count) + sb);
+ } else {
+ return Response.success(String.format("已选择%d个应用,点击继续", appIds.size()));
}
}
return Response.fail("上传的状态当前不支持校验");
@@ -1392,7 +1424,7 @@ public Response operateCheck(List appIds, String operate) {
@Override
public List filterAppIds(List appIds, String operate) {
- if (CollectionUtils.isEmpty(appIds) || StringUtil.isEmpty(operate)){
+ if (CollectionUtils.isEmpty(appIds) || StringUtil.isEmpty(operate)) {
return null;
}
@@ -1412,12 +1444,12 @@ public List filterAppIds(List appIds, String operate) {
List uninstallAppNames = applicationNodeProbeResults.stream().map(ApplicationNodeProbeResult::getApplicationName)
.collect(Collectors.toList());
//需要卸载的数据,需要不存在卸载的数据
- if (AgentConstants.UNINSTALL.equals(operate)){
+ if (AgentConstants.UNINSTALL.equals(operate)) {
return applicationList.stream().filter(o -> !uninstallAppNames.contains(o.getApplicationName())).map(o ->
o.getApplicationId().toString()).collect(Collectors.toList());
}
//需要恢复的数据,需要是已经卸载的数据
- if (AgentConstants.RESUME.equals(operate)){
+ if (AgentConstants.RESUME.equals(operate)) {
return applicationList.stream().filter(o -> uninstallAppNames.contains(o.getApplicationName())).map(o ->
o.getApplicationId().toString()).collect(Collectors.toList());
}
@@ -2448,8 +2480,8 @@ public Response uploadMiddlewareStatus(Map reques
@Override
public void resumeAllAgent(List appIds) {
try {
- appIds = this.filterAppIds(appIds,AgentConstants.RESUME);
- if (CollectionUtils.isEmpty(appIds)){
+ appIds = this.filterAppIds(appIds, AgentConstants.RESUME);
+ if (CollectionUtils.isEmpty(appIds)) {
log.info("所有需要恢复的应用都被过滤掉了");
return;
}
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/impl/ShadowConsumerServiceImpl.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/impl/ShadowConsumerServiceImpl.java
index 6381118ef4..ee6e18eb8d 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/impl/ShadowConsumerServiceImpl.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/impl/ShadowConsumerServiceImpl.java
@@ -1,12 +1,7 @@
package io.shulie.takin.web.biz.service.impl;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Objects;
-import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Resource;
@@ -137,7 +132,7 @@ public PagingList pageMqConsumers(ShadowConsumerQueryInput
ApplicationDetailResult application = applicationDAO.getApplicationById(request.getApplicationId());
if (application == null) {
throw new TakinWebException(TakinWebExceptionEnum.APPLICATION_MANAGE_VALIDATE_ERROR,
- String.format("应用id:%s对应的应用不存在", request.getApplicationId()));
+ String.format("应用id:%s对应的应用不存在", request.getApplicationId()));
}
LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>();
if (StringUtils.isNotBlank(request.getTopicGroup())) {
@@ -148,7 +143,7 @@ public PagingList pageMqConsumers(ShadowConsumerQueryInput
}
if (request.getEnabled() != null) {
lambdaQueryWrapper.eq(ShadowMqConsumerEntity::getStatus,
- request.getEnabled() ? ShadowConsumerConstants.ENABLE : ShadowConsumerConstants.DISABLE);
+ request.getEnabled() ? ShadowConsumerConstants.ENABLE : ShadowConsumerConstants.DISABLE);
}
if (CollectionUtils.isNotEmpty(WebPluginUtils.getQueryAllowUserIdList())) {
lambdaQueryWrapper.in(ShadowMqConsumerEntity::getUserId, WebPluginUtils.getQueryAllowUserIdList());
@@ -167,72 +162,72 @@ public PagingList pageMqConsumers(ShadowConsumerQueryInput
}
private List filterResult(ShadowConsumerQueryInput request,
- List totalResult) {
+ List totalResult) {
if (request.getEnabled() != null) {
if (request.getEnabled()) {
totalResult = totalResult.stream().filter(ShadowConsumerOutput::getEnabled).collect(
- Collectors.toList());
+ Collectors.toList());
} else {
totalResult = totalResult.stream().filter(e -> !e.getEnabled()).collect(Collectors.toList());
}
}
if (StringUtils.isNotBlank(request.getTopicGroup())) {
totalResult = totalResult.stream().filter(e -> e.getTopicGroup().contains(request.getTopicGroup())).collect(
- Collectors.toList());
+ Collectors.toList());
}
if (request.getType() != null) {
totalResult = totalResult.stream().filter(e -> e.getType().equals(request.getType())).collect(
- Collectors.toList());
+ Collectors.toList());
}
return totalResult;
}
private List mergeResult(List amdbResult,
- List dbResult) {
+ List dbResult) {
Map amdbMap = new HashMap<>();
Map entityMap = mqConfigTemplateDAO.selectToMapWithNameKey();
if (CollectionUtils.isNotEmpty(amdbResult)) {
amdbMap = amdbResult.stream()
- .filter(item -> entityMap.containsKey(item.getType()))
- .map(e -> {
- ShadowConsumerOutput response = new ShadowConsumerOutput();
- response.setUnionId(
- MD5Util.getMD5(e.getApplicationName() + "#" + e.getTopicGroup() + "#" + e.getType()));
- response.setType(e.getType());
- response.setTopicGroup(e.getTopicGroup());
- response.setEnabled(e.getStatus() == ShadowConsumerConstants.ENABLE);
- response.setGmtCreate(e.getCreateTime());
- response.setGmtUpdate(e.getUpdateTime());
- response.setCanRemove(false);
- response.setCanEnableDisable(false);
- response.setIsManual(false);
- response.setShadowconsumerEnable(String.valueOf(e.getStatus()));
- return response;
- })
- .collect(Collectors.toMap(ShadowConsumerOutput::getUnionId, e -> e, (oV, nV) -> nV));
+ .filter(item -> entityMap.containsKey(item.getType()))
+ .map(e -> {
+ ShadowConsumerOutput response = new ShadowConsumerOutput();
+ response.setUnionId(
+ MD5Util.getMD5(e.getApplicationName() + "#" + e.getTopicGroup() + "#" + e.getType()));
+ response.setType(e.getType());
+ response.setTopicGroup(e.getTopicGroup());
+ response.setEnabled(e.getStatus() == ShadowConsumerConstants.ENABLE);
+ response.setGmtCreate(e.getCreateTime());
+ response.setGmtUpdate(e.getUpdateTime());
+ response.setCanRemove(false);
+ response.setCanEnableDisable(false);
+ response.setIsManual(false);
+ response.setShadowconsumerEnable(String.valueOf(e.getStatus()));
+ return response;
+ })
+ .collect(Collectors.toMap(ShadowConsumerOutput::getUnionId, e -> e, (oV, nV) -> nV));
}
Map dbMap = new HashMap<>(dbResult.size());
if (CollectionUtils.isNotEmpty(dbResult)) {
dbMap = dbResult.stream()
- .filter(item -> entityMap.containsKey(item.getType()))
- .map(e -> {
- ShadowConsumerOutput response = new ShadowConsumerOutput();
- response.setId(e.getId());
- response.setUnionId(
- MD5Util.getMD5(e.getApplicationName() + "#" + e.getTopicGroup() + "#" + e.getType()));
- response.setType(e.getType());
- response.setTopicGroup(e.getTopicGroup());
- response.setEnabled(e.getStatus() == ShadowConsumerConstants.ENABLE);
- response.setGmtCreate(e.getCreateTime());
- response.setGmtUpdate(e.getUpdateTime());
- response.setUserId(e.getUserId());
- response.setIsManual(e.getManualTag() == 1);
- response.setCanRemove(response.getIsManual());
- response.setShadowconsumerEnable(String.valueOf(e.getStatus()));
- WebPluginUtils.fillQueryResponse(response);
- return response;
- })
- .collect(Collectors.toMap(ShadowConsumerOutput::getUnionId, e -> e, (oV, nV) -> nV));
+ .filter(item -> entityMap.containsKey(item.getType()))
+ .map(e -> {
+ ShadowConsumerOutput response = new ShadowConsumerOutput();
+ response.setId(e.getId());
+ response.setUnionId(
+ MD5Util.getMD5(e.getApplicationName() + "#" + e.getTopicGroup() + "#" + e.getType()));
+ response.setType(e.getType());
+ response.setTopicGroup(e.getTopicGroup());
+ response.setEnabled(e.getStatus() == ShadowConsumerConstants.ENABLE);
+ response.setGmtCreate(e.getCreateTime());
+ response.setGmtUpdate(e.getUpdateTime());
+ response.setUserId(e.getUserId());
+ response.setIsManual(e.getManualTag() == 1);
+ response.setCanRemove(response.getIsManual());
+ response.setShadowconsumerEnable(String.valueOf(e.getStatus()));
+ WebPluginUtils.fillQueryResponse(response);
+ return response;
+ })
+ .collect(Collectors.toMap(ShadowConsumerOutput::getUnionId, e -> e, (oV, nV) -> nV));
}
// 原:在amdb自动梳理的基础上,补充数据库里面的记录,有的话用数据的记录
// 现:在db的基础上,补充amdb自动梳理的数据。
@@ -248,39 +243,51 @@ private List mergeResult(List amdb
}
private List queryAmdbDefaultEntrances(ShadowConsumerQueryInput request,
- String applicationName) {
+ String applicationName) {
List mqTopicGroups = applicationEntranceClient.getMqTopicGroups(applicationName);
if (CollectionUtils.isEmpty(mqTopicGroups)) {
return Lists.newArrayList();
}
if (Objects.nonNull(request.getType())) {
mqTopicGroups = mqTopicGroups.stream()
- .filter(dto -> dto.getMiddlewareName().equals(request.getType()))
- .collect(Collectors.toList());
+ .filter(dto -> dto.getMiddlewareName().equals(request.getType()))
+ .collect(Collectors.toList());
}
return mqTopicGroups.stream()
- .map(mqTopicGroup -> {
- ShadowMqConsumerOutput shadowMqConsumerOutput = new ShadowMqConsumerOutput();
- shadowMqConsumerOutput.setTopicGroup(
- mqTopicGroup.getServiceName() + "#" + mqTopicGroup.getMethodName());
- shadowMqConsumerOutput.setType(
- MiddlewareTypeGroupEnum.getMiddlewareGroupType(mqTopicGroup.getMiddlewareName()).getType());
- shadowMqConsumerOutput.setApplicationId(request.getApplicationId());
- shadowMqConsumerOutput.setApplicationName(applicationName);
- shadowMqConsumerOutput.setStatus(ShadowConsumerConstants.DISABLE);
- shadowMqConsumerOutput.setDeleted(ShadowConsumerConstants.LIVED);
- // 补充数据
- WebPluginUtils.fillUserData(shadowMqConsumerOutput);
- return shadowMqConsumerOutput;
- }).collect(Collectors.toList());
+ .map(mqTopicGroup -> {
+ ShadowMqConsumerOutput shadowMqConsumerOutput = new ShadowMqConsumerOutput();
+ shadowMqConsumerOutput.setTopicGroup(
+ mqTopicGroup.getServiceName() + "#" + mqTopicGroup.getMethodName());
+ shadowMqConsumerOutput.setType(
+ MiddlewareTypeGroupEnum.getMiddlewareGroupType(mqTopicGroup.getMiddlewareName()).getType());
+ shadowMqConsumerOutput.setApplicationId(request.getApplicationId());
+ shadowMqConsumerOutput.setApplicationName(applicationName);
+ shadowMqConsumerOutput.setStatus(ShadowConsumerConstants.DISABLE);
+ shadowMqConsumerOutput.setDeleted(ShadowConsumerConstants.LIVED);
+ // 补充数据
+ WebPluginUtils.fillUserData(shadowMqConsumerOutput);
+ return shadowMqConsumerOutput;
+ }).collect(Collectors.toList());
}
private PagingList splitPage(
- ShadowConsumerQueryInput request,
- List responses) {
+ ShadowConsumerQueryInput request,
+ List responses) {
responses.sort((o1, o2) -> {
+ boolean o1InValid = o1.getGmtCreate() == null;
+ boolean o2Invalid = o2.getGmtCreate() == null;
+ boolean bothInvalid = o1InValid && o2Invalid;
+ if (bothInvalid) {
+ return 0;
+ }
+ if (o1InValid) {
+ return -1;
+ }
+ if (o2Invalid) {
+ return 1;
+ }
if (o1.getGmtCreate() != null && o2.getGmtCreate() != null) {
int firstSort = -o1.getGmtCreate().compareTo(o2.getGmtCreate());
if (firstSort == 0) {
@@ -312,10 +319,10 @@ public void createMqConsumers(ShadowConsumerCreateInput request) {
throw new RuntimeException(String.format("应用id:%s对应的应用不存在", request.getApplicationId()));
}
List exists = getExists(request.getTopicGroup(), request.getApplicationId(),
- request.getType());
+ request.getType());
if (CollectionUtils.isNotEmpty(exists)) {
throw new RuntimeException(
- String.format("类型为[%s],对应的[%s]已存在", request.getType(), request.getTopicGroup()));
+ String.format("类型为[%s],对应的[%s]已存在", request.getType(), request.getTopicGroup()));
}
OperationLogContextHolder.operationType(OpTypes.CREATE);
OperationLogContextHolder.addVars(Vars.CONSUMER_TYPE, request.getType());
@@ -349,12 +356,12 @@ public void updateMqConsumers(ShadowConsumerUpdateInput request) {
throw new RuntimeException(String.format("应用id:%s对应的应用不存在", request.getApplicationId()));
}
List exists = getExists(request.getTopicGroup(), request.getApplicationId(),
- request.getType());
+ request.getType());
// 同名的自己不算
exists = exists.stream().filter(item -> !item.getId().equals(request.getId())).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(exists)) {
throw new RuntimeException(
- String.format("类型为[%s],对应的[%s]已存在", request.getType(), request.getTopicGroup()));
+ String.format("类型为[%s],对应的[%s]已存在", request.getType(), request.getTopicGroup()));
}
OperationLogContextHolder.operationType(OpTypes.UPDATE);
OperationLogContextHolder.addVars(Vars.CONSUMER_TYPE, request.getType());
@@ -371,7 +378,7 @@ public void updateMqConsumers(ShadowConsumerUpdateInput request) {
@Override
public void importUpdateMqConsumers(ShadowConsumerUpdateInput request) {
if (!request.getTopicGroup().contains("#")) {
- return;
+ return;
}
String[] split = request.getTopicGroup().split("#");
if (split.length != 2) {
@@ -382,7 +389,7 @@ public void importUpdateMqConsumers(ShadowConsumerUpdateInput request) {
return;
}
ShadowMqConsumerEntity updateEntity = new ShadowMqConsumerEntity();
- BeanUtils.copyProperties(request,updateEntity);
+ BeanUtils.copyProperties(request, updateEntity);
updateEntity.setTopicGroup(request.getTopicGroup());
updateEntity.setType(request.getType());
updateEntity.setStatus(request.getStatus());
@@ -452,12 +459,12 @@ public void operateMqConsumers(ShadowConsumersOperateInput requests) {
} else {
OperationLogContextHolder.operationType(OpTypes.DISABLE);
List ids = requests.getRequests().stream().map(ShadowConsumerOperateInput::getId).collect(
- Collectors.toList());
+ Collectors.toList());
LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.in(ShadowMqConsumerEntity::getId, ids);
lambdaQueryWrapper.eq(ShadowMqConsumerEntity::getDeleted, ShadowConsumerConstants.LIVED);
List shadowMqConsumerEntities = shadowMqConsumerMapper.selectList(
- lambdaQueryWrapper);
+ lambdaQueryWrapper);
if (CollectionUtils.isNotEmpty(shadowMqConsumerEntities)) {
for (ShadowMqConsumerEntity shadowMqConsumerEntity : shadowMqConsumerEntities) {
ShadowMqConsumerEntity updateEntity = new ShadowMqConsumerEntity();
@@ -473,23 +480,25 @@ public void operateMqConsumers(ShadowConsumersOperateInput requests) {
@Override
public List agentSelect(String appName) {
+ ApplicationDetailResult application = applicationDAO.getApplicationByTenantIdAndName(appName);
LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(ShadowMqConsumerEntity::getDeleted, ShadowConsumerConstants.LIVED);
lambdaQueryWrapper.eq(ShadowMqConsumerEntity::getStatus, ShadowConsumerConstants.ENABLE);
lambdaQueryWrapper.eq(ShadowMqConsumerEntity::getApplicationName, appName);
+ lambdaQueryWrapper.eq(ShadowMqConsumerEntity::getApplicationId, application.getApplicationId());
List entities = shadowMqConsumerMapper.selectList(lambdaQueryWrapper);
if (CollectionUtils.isEmpty(entities)) {
return Lists.newArrayList();
}
Map> collect = entities.stream()
- .filter(t -> {
- if (StringUtils.isNotBlank(t.getTopicGroup())) {
- String[] topicGroup = t.getTopicGroup().trim().split("#");
- return topicGroup.length == 2;
- } else {
- return false;
- }
- }).collect(Collectors.groupingBy(ShadowMqConsumerEntity::getType));
+ .filter(t -> {
+ if (StringUtils.isNotBlank(t.getTopicGroup())) {
+ String[] topicGroup = t.getTopicGroup().trim().split("#");
+ return topicGroup.length == 2;
+ } else {
+ return false;
+ }
+ }).collect(Collectors.groupingBy(ShadowMqConsumerEntity::getType));
if (MapUtils.isEmpty(collect)) {
return Lists.newArrayList();
}
@@ -526,7 +535,7 @@ public int allocationUser(ShadowConsumerUpdateUserInput request) {
LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(ShadowMqConsumerEntity::getApplicationId, request.getApplicationId());
List shadowMqConsumerEntityList = shadowMqConsumerMapper.selectList(
- queryWrapper);
+ queryWrapper);
if (CollectionUtils.isNotEmpty(shadowMqConsumerEntityList)) {
for (ShadowMqConsumerEntity entity : shadowMqConsumerEntityList) {
entity.setUserId(request.getUserId());
@@ -571,7 +580,7 @@ public List queryMqSupportProgramme(String engName) {
@Override
@Transactional(rollbackFor = Throwable.class)
public void updateMqConsumersV2(ShadowConsumerUpdateInput request) {
- request.setTopicGroup(StringUtil.isEmpty(request.getTopicGroup()) ?"":request.getTopicGroup().trim());
+ request.setTopicGroup(StringUtil.isEmpty(request.getTopicGroup()) ? "" : request.getTopicGroup().trim());
if (Objects.isNull(request.getId())) {
this.createMqConsumersV2(request, false);
} else {
@@ -587,12 +596,12 @@ public void updateMqConsumersV2(ShadowConsumerUpdateInput request) {
throw new RuntimeException(String.format("应用id:%s对应的应用不存在", request.getApplicationId()));
}
List exists = getExists(request.getTopicGroup(), request.getApplicationId(),
- request.getType());
+ request.getType());
// 同名的自己不算
exists = exists.stream().filter(item -> !item.getId().equals(request.getId())).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(exists)) {
throw new RuntimeException(
- String.format("类型为[%s],对应的[%s]已存在", request.getType(), request.getTopicGroup()));
+ String.format("类型为[%s],对应的[%s]已存在", request.getType(), request.getTopicGroup()));
}
OperationLogContextHolder.operationType(OpTypes.UPDATE);
OperationLogContextHolder.addVars(Vars.CONSUMER_TYPE, request.getType());
@@ -623,10 +632,10 @@ public void createMqConsumersV2(ShadowConsumerCreateInput request, Boolean manua
throw new RuntimeException(String.format("应用id:%s对应的应用不存在", request.getApplicationId()));
}
List exists = getExists(request.getTopicGroup(), request.getApplicationId(),
- request.getType());
+ request.getType());
if (CollectionUtils.isNotEmpty(exists)) {
throw new RuntimeException(
- String.format("类型为[%s],对应的[%s]已存在", request.getType(), request.getTopicGroup()));
+ String.format("类型为[%s],对应的[%s]已存在", request.getType(), request.getTopicGroup()));
}
OperationLogContextHolder.operationType(OpTypes.CREATE);
OperationLogContextHolder.addVars(Vars.CONSUMER_TYPE, request.getType());
@@ -651,7 +660,7 @@ public PagingList pageMqConsumersV2(ShadowConsumerQueryInp
ShadowConsumerQueryInput queryInput = Convert.convert(ShadowConsumerQueryInput.class, request);
if (application == null) {
throw new TakinWebException(TakinWebExceptionEnum.APPLICATION_MANAGE_VALIDATE_ERROR,
- String.format("应用id:%s对应的应用不存在", request.getApplicationId()));
+ String.format("应用id:%s对应的应用不存在", request.getApplicationId()));
}
LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>();
if (StringUtils.isNotBlank(request.getTopicGroup())) {
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/AppRemoteCallServiceImpl.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/AppRemoteCallServiceImpl.java
index 5a8e6b9561..1bb3851ef8 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/AppRemoteCallServiceImpl.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/AppRemoteCallServiceImpl.java
@@ -95,6 +95,7 @@
import org.apache.commons.lang3.StringUtils;
import org.mockito.internal.util.collections.Sets;
import org.springframework.beans.BeanUtils;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
/**
@@ -131,6 +132,9 @@ public class AppRemoteCallServiceImpl implements AppRemoteCallService {
@Resource
private ThreadPoolExecutor queryAsyncThreadPool;
+ @Value("${takin.job.app.limit:50}")
+ private int appSize = 0;
+
@PostConstruct
public void init() {
criticaValue = ConfigServerHelper.getWrapperIntegerValueByKey(
@@ -545,18 +549,17 @@ public void syncAmdb() {
}
List voList = dictionaryDataDAO.getDictByCode("REMOTE_CALL_TYPE");
- int size = 50;
// size个轮询一次
- if (results.size() > size) {
+ if (results.size() > appSize) {
int i = 1;
boolean loop = true;
do {
List subList;
//批量处理
- if (results.size() > i * size) {
- subList = results.subList((i - 1) * size, i * size);
+ if (results.size() > i * appSize) {
+ subList = results.subList((i - 1) * appSize, i * appSize);
} else {
- subList = results.subList((i - 1) * size, results.size());
+ subList = results.subList((i - 1) * appSize, results.size());
loop = false;
}
i++;
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/ApplicationApiServiceImpl.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/ApplicationApiServiceImpl.java
index 3fe2649083..09141bf539 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/ApplicationApiServiceImpl.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/ApplicationApiServiceImpl.java
@@ -179,8 +179,10 @@ public Response pullApi(String appName) {
@Override
public Map> pullApiV1(String appName) {
+ ApplicationDetailResult application = applicationDAO.getApplicationByTenantIdAndName(appName);
ApplicationApiParam apiParam = new ApplicationApiParam();
apiParam.setAppName(appName);
+ apiParam.setAppId(application.getApplicationId());
List all = applicationApiDAO.querySimpleWithTenant(apiParam);
if (CollectionUtils.isEmpty(all)) {
return null;
@@ -275,6 +277,12 @@ public Response create(ApiCreateVo vo) {
DictionaryCache.getObjectByParam(HTTP_METHOD_TYPE, Integer.parseInt(vo.getMethod())).getLabel());
createParam.setApi(vo.getApi());
createParam.setApplicationName(vo.getApplicationName());
+ ApplicationDetailResult applicationDetailResult = applicationDAO.getApplicationByTenantIdAndName(vo.getApplicationName());
+ if (applicationDetailResult == null) {
+ throw new TakinWebException(TakinWebExceptionEnum.AGENT_REGISTER_API,
+ String.format("应用不存在, 应用名称: %s", vo.getApplicationName()));
+ }
+ createParam.setApplicationId(applicationDetailResult.getApplicationId());
createParam.setIsDeleted((byte)0);
createParam.setUpdateTime(new Date());
createParam.setCreateTime(new Date());
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/LinkGuardServiceImpl.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/LinkGuardServiceImpl.java
index 3ba0e5712a..b615ec0e55 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/LinkGuardServiceImpl.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/LinkGuardServiceImpl.java
@@ -184,7 +184,8 @@ public Response> selectByExample(LinkGuardQueryParam param) {
@Override
public List agentSelect(String appName) {
- List results = linkGuardDAO.selectByAppNameUnderCurrentUser(appName);
+ ApplicationDetailResult application = applicationDAO.getApplicationByTenantIdAndName(appName);
+ List results = linkGuardDAO.selectByAppNameUnderCurrentUser(application.getApplicationId());
return results.stream().map(item -> {
LinkGuardVo target = new LinkGuardVo();
BeanUtils.copyProperties(item, target);
diff --git a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/WhiteListFileService.java b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/WhiteListFileService.java
index 2a76dda0ac..4f716d66c8 100644
--- a/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/WhiteListFileService.java
+++ b/takin-web-biz-service/src/main/java/io/shulie/takin/web/biz/service/linkmanage/impl/WhiteListFileService.java
@@ -79,13 +79,21 @@ public class WhiteListFileService {
@Autowired
private WhiteListService whiteListService;
+ // 是否默认初始化白名单
+ @Value("${takin.enable.initWhiteList:false}")
+ private boolean initWhiteList;
+
@PostConstruct
public void init() {
+ if (!initWhiteList) {
+ log.info("不初始化白名单到文件");
+ return;
+ }
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
- 0, 1,
- 0, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<>(1),
- r -> new Thread(r, "初始化白名单"), new CallerRunsPolicy());
+ 0, 1,
+ 0, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(1),
+ r -> new Thread(r, "初始化白名单"), new CallerRunsPolicy());
threadPoolExecutor.submit(() -> {
log.info("开始初始化白名单");
// 老版本 agent 新版本agent 已转到远程调用模块
@@ -173,10 +181,10 @@ public Map queryBlackWhiteList(String appName, TenantCommonExt e
Map> whitelistMap;
boolean isCheckDuplicateName = Boolean.parseBoolean(
- ConfigServerHelper.getValueByKey(ConfigServerKeyEnum.TAKIN_WHITE_LIST_DUPLICATE_NAME_CHECK));
+ ConfigServerHelper.getValueByKey(ConfigServerKeyEnum.TAKIN_WHITE_LIST_DUPLICATE_NAME_CHECK));
if (isCheckDuplicateName) {
List armdString = agentWhiteLists.stream().map(AgentWhiteList::getInterfaceName).collect(
- Collectors.toList());
+ Collectors.toList());
existWhite = whiteListService.getExistWhite(armdString, Lists.newArrayList());
// todo 这里再获取一次,感觉很多余,但是不改上面的逻辑,所有这里数据再次从新获取,之后可以重构下
WhitelistSearchParam param = new WhitelistSearchParam();
@@ -184,7 +192,7 @@ public Map queryBlackWhiteList(String appName, TenantCommonExt e
param.setUseYn(1);
List results = whiteListDAO.getList(param);
whitelistMap = results.stream().collect(
- Collectors.groupingBy(e -> e.getInterfaceName() + "@@" + e.getType()));
+ Collectors.groupingBy(e -> e.getInterfaceName() + "@@" + e.getType()));
} else {
// 获取所有白名单,是否有全局属性
WhitelistSearchParam param = new WhitelistSearchParam();
@@ -193,7 +201,7 @@ public Map queryBlackWhiteList(String appName, TenantCommonExt e
param.setUseYn(1);
List results = whiteListDAO.getList(param);
whitelistMap = results.stream()
- .collect(Collectors.groupingBy(e -> WhitelistUtil.buildWhiteId(e.getType(), e.getInterfaceName())));
+ .collect(Collectors.groupingBy(e -> WhitelistUtil.buildWhiteId(e.getType(), e.getInterfaceName())));
}
// 获取所有生效效应,是否有局部应用
@@ -203,7 +211,7 @@ public Map queryBlackWhiteList(String appName, TenantCommonExt e
searchParam.setWlistIds(ids);
List appResults = whitelistEffectiveAppDao.getList(searchParam);
Map> appResultsMap = appResults.stream()
- .collect(Collectors.groupingBy(e -> WhitelistUtil.buildWhiteId(e.getType(), e.getInterfaceName())));
+ .collect(Collectors.groupingBy(e -> WhitelistUtil.buildWhiteId(e.getType(), e.getInterfaceName())));
List