From dac0e8134beb3ed848498dfdb66b707d1300a16c Mon Sep 17 00:00:00 2001 From: hebelala Date: Thu, 27 Apr 2017 14:59:18 +0800 Subject: [PATCH] Enhance the election code #141 --- .../election/ElectionListenerManager.java | 12 +++++++--- .../election/LeaderElectionService.java | 23 ++++++------------- .../internal/sharding/ShardingService.java | 4 ++-- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/saturn-core/src/main/java/com/vip/saturn/job/internal/election/ElectionListenerManager.java b/saturn-core/src/main/java/com/vip/saturn/job/internal/election/ElectionListenerManager.java index 3a52edbe9..6b983a465 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/internal/election/ElectionListenerManager.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/internal/election/ElectionListenerManager.java @@ -62,11 +62,17 @@ public void nodeChanged() throws Exception { @Override public void run() { try { - if (isShutdown) return; + log.debug("[{}] msg=Leader host nodeChanged", jobName); + if (isShutdown) { + log.debug("[{}] msg=ElectionListenerManager has been shutdown", jobName); + return; + } if (!leaderElectionService.hasLeader()) { - log.info("[{}] msg=Elastic job: leader crashed, elect a new leader now.", jobName); + log.info("[{}] msg=Leader crashed, elect a new leader now", jobName); leaderElectionService.leaderElection(); - log.info("[{}] msg=Elastic job: leader election completed.", jobName); + log.info("[{}] msg=Leader election completed", jobName); + } else { + log.debug("[{}] msg=Leader is already existing, unnecessary to election", jobName); } } catch (Throwable t) { log.error(t.getMessage(), t); diff --git a/saturn-core/src/main/java/com/vip/saturn/job/internal/election/LeaderElectionService.java b/saturn-core/src/main/java/com/vip/saturn/job/internal/election/LeaderElectionService.java index 3e53c352c..ca667ecd8 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/internal/election/LeaderElectionService.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/internal/election/LeaderElectionService.java @@ -50,10 +50,10 @@ public void shutdown() { try { // Release my leader position if (executorName.equals(getJobNodeStorage().getJobNodeDataDirectly(ElectionNode.LEADER_HOST))) { getJobNodeStorage().removeJobNodeIfExisted(ElectionNode.LEADER_HOST); - log.info("[{}] msg=I'm {} that was {}'s leader, I have released myself", jobName, executorName, jobName); + log.info("[{}] msg={} that was {}'s leader, released itself", jobName, executorName, jobName); } } catch (Throwable t) { - log.error(String.format(SaturnConstant.ERROR_LOG_FORMAT, jobName, "release my leader error"), t); + log.error(t.getMessage(), t); } } } @@ -67,29 +67,20 @@ public void leaderElection() { } /** - * 判断当前节点是否是主节点. - * - *

- * 如果主节点正在选举中而导致取不到主节点, 则阻塞至主节点选举完成再返回. - *

- * + * 判断当前节点是否是主节点,如果没有主节点,则选举,直到有主节点 + * * @return 当前节点是否是主节点 */ public Boolean isLeader() { while (!isShutdown.get() && !hasLeader()) { - log.info("[{}] msg=Elastic job: {} leader node is electing, waiting for 100 ms at executor '{}'", jobName, jobName, executorName); - BlockUtils.waitingShortTime(); + log.info("[{}] msg=No leader, try to election", jobName); + leaderElection(); } return executorName.equals(getJobNodeStorage().getJobNodeDataDirectly(ElectionNode.LEADER_HOST)); } /** - * 判断是否已经有主节点. - * - *

- * 仅为选举监听使用. - * 程序中其他地方判断是否有主节点应使用{@code isLeader() }方法. - *

+ * 判断是否已经有主节点 * * @return 是否已经有主节点 */ diff --git a/saturn-core/src/main/java/com/vip/saturn/job/internal/sharding/ShardingService.java b/saturn-core/src/main/java/com/vip/saturn/job/internal/sharding/ShardingService.java index cbb639f9d..87586de4b 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/internal/sharding/ShardingService.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/internal/sharding/ShardingService.java @@ -197,14 +197,14 @@ private boolean blockUntilShardingComplatedIfNotLeader() throws JobShuttingDownE if(!(isNeedSharding() || getJobNodeStorage().isJobNodeExisted(ShardingNode.PROCESSING))) { return true; } - log.debug("Elastic job: sleep short time until sharding completed."); + log.debug("[{}] msg=Sleep short time until sharding completed", jobName); BlockUtils.waitingShortTime(); } } private void waitingOtherJobCompleted() { while (!isShutdown && executionService.hasRunningItems()) { - log.info("Elastic job: sleep short time until other job completed."); + log.info("[{}] msg=Sleep short time until other job completed.", jobName); BlockUtils.waitingShortTime(); } }