diff --git a/saturn-console-api/src/main/java/com/vip/saturn/job/console/service/impl/JobServiceImpl.java b/saturn-console-api/src/main/java/com/vip/saturn/job/console/service/impl/JobServiceImpl.java index a132b0806..2681d8667 100644 --- a/saturn-console-api/src/main/java/com/vip/saturn/job/console/service/impl/JobServiceImpl.java +++ b/saturn-console-api/src/main/java/com/vip/saturn/job/console/service/impl/JobServiceImpl.java @@ -694,11 +694,18 @@ private void validateAndUpdateStream(JobConfig jobConfig, Set stream, Li continue; } if (isDownStream) { - otherJob.setUpStream(removeFromStream(jobName, otherJob.getUpStream())); + String upStream = removeFromStreamIfNecessary(jobName, otherJob.getUpStream()); + if (upStream != null) { + otherJob.setUpStream(upStream); + streamChangedJobs.add(otherJob); + } } else { - otherJob.setDownStream(removeFromStream(jobName, otherJob.getDownStream())); + String downStream = removeFromStreamIfNecessary(jobName, otherJob.getDownStream()); + if (downStream != null) { + otherJob.setDownStream(downStream); + streamChangedJobs.add(otherJob); + } } - streamChangedJobs.add(otherJob); } } @@ -710,12 +717,14 @@ private String appendToStream(String jobName, String stream) { return formatStream(streamSet); } - private String removeFromStream(String jobName, String stream) { + private String removeFromStreamIfNecessary(String jobName, String stream) { Set streamSet = parseStreamToList(stream); if (StringUtils.isNotBlank(jobName)) { - streamSet.remove(jobName); + if (streamSet.remove(jobName)) { + return formatStream(streamSet); + } } - return formatStream(streamSet); + return null; } private String formatStream(Set streamSet) { @@ -1121,9 +1130,16 @@ private void createJobConfigToZk(JobConfig jobConfig, Set streamChang // 添加作业根节点和config结点 curatorFrameworkOp.create(JobNodePath.getConfigNodePath(jobName), ""); CuratorFrameworkOp.CuratorTransactionOp curatorTransactionOp = curatorFrameworkOp.inTransaction(); + // 数据库有可能有重复作业的数据,去重,zk无需更新两次 + Collection streamChangedJobsNew = removeDuplicateByJobName(streamChangedJobs); // 更新关联作业的上下游 - for (JobConfig streamChangedJob : streamChangedJobs) { + for (JobConfig streamChangedJob : streamChangedJobsNew) { String changedJobName = streamChangedJob.getJobName(); + if (!curatorFrameworkOp.checkExists(JobNodePath.getConfigNodePath(changedJobName))) { + // 数据库存在该作业,但是zk不存在该作业,为垃圾数据 + log.warn("the job({}) config node is not existing in zk", changedJobName); + continue; + } curatorTransactionOp .replaceIfChanged(JobNodePath.getConfigNodePath(changedJobName, CONFIG_ITEM_UPSTREAM), streamChangedJob.getUpStream()) @@ -2216,9 +2232,16 @@ private void updateJobConfigToZk(JobConfig jobConfig, Set streamChang } } CuratorFrameworkOp.CuratorTransactionOp curatorTransactionOp = curatorFrameworkOp.inTransaction(); + // 数据库有可能有重复作业的数据,去重,zk无需更新两次 + Collection streamChangedJobsNew = removeDuplicateByJobName(streamChangedJobs); // 更新关联作业的上下游 - for (JobConfig streamChangedJob : streamChangedJobs) { + for (JobConfig streamChangedJob : streamChangedJobsNew) { String changedJobName = streamChangedJob.getJobName(); + if (!curatorFrameworkOp.checkExists(JobNodePath.getConfigNodePath(changedJobName))) { + // 数据库存在该作业,但是zk不存在该作业,为垃圾数据 + log.warn("the job({}) config node is not existing in ZK", changedJobName); + continue; + } curatorTransactionOp .replaceIfChanged(JobNodePath.getConfigNodePath(changedJobName, CONFIG_ITEM_UPSTREAM), streamChangedJob.getUpStream()) @@ -2297,6 +2320,19 @@ private void updateJobConfigToZk(JobConfig jobConfig, Set streamChang } } + private Collection removeDuplicateByJobName(Set streamChangedJobs) { + Map streamChangedJobsMap = new HashMap<>(); + for (JobConfig streamChangedJob : streamChangedJobs) { + String jobName = streamChangedJob.getJobName(); + if (streamChangedJobsMap.containsKey(jobName)) { + log.warn("the DB have duplicated jobName({})", jobName); + } else { + streamChangedJobsMap.put(jobName, streamChangedJob); + } + } + return streamChangedJobsMap.values(); + } + @Override public List getAllJobNamesFromZK(String namespace) throws SaturnJobConsoleException { CuratorRepository.CuratorFrameworkOp curatorFrameworkOp = registryCenterService