Skip to content

Commit

Permalink
Enhance the election code #141
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaopeng-he committed Apr 27, 2017
1 parent 8e1614d commit dac0e81
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,17 @@ public void nodeChanged() throws Exception {
@Override
public void run() {
try {
if (isShutdown) return;
log.debug("[{}] msg=Leader host nodeChanged", jobName);
if (isShutdown) {
log.debug("[{}] msg=ElectionListenerManager has been shutdown", jobName);
return;
}
if (!leaderElectionService.hasLeader()) {
log.info("[{}] msg=Elastic job: leader crashed, elect a new leader now.", jobName);
log.info("[{}] msg=Leader crashed, elect a new leader now", jobName);
leaderElectionService.leaderElection();
log.info("[{}] msg=Elastic job: leader election completed.", jobName);
log.info("[{}] msg=Leader election completed", jobName);
} else {
log.debug("[{}] msg=Leader is already existing, unnecessary to election", jobName);
}
} catch (Throwable t) {
log.error(t.getMessage(), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public void shutdown() {
try { // Release my leader position
if (executorName.equals(getJobNodeStorage().getJobNodeDataDirectly(ElectionNode.LEADER_HOST))) {
getJobNodeStorage().removeJobNodeIfExisted(ElectionNode.LEADER_HOST);
log.info("[{}] msg=I'm {} that was {}'s leader, I have released myself", jobName, executorName, jobName);
log.info("[{}] msg={} that was {}'s leader, released itself", jobName, executorName, jobName);
}
} catch (Throwable t) {
log.error(String.format(SaturnConstant.ERROR_LOG_FORMAT, jobName, "release my leader error"), t);
log.error(t.getMessage(), t);
}
}
}
Expand All @@ -67,29 +67,20 @@ public void leaderElection() {
}

/**
* 判断当前节点是否是主节点.
*
* <p>
* 如果主节点正在选举中而导致取不到主节点, 则阻塞至主节点选举完成再返回.
* </p>
*
* 判断当前节点是否是主节点,如果没有主节点,则选举,直到有主节点
*
* @return 当前节点是否是主节点
*/
public Boolean isLeader() {
while (!isShutdown.get() && !hasLeader()) {
log.info("[{}] msg=Elastic job: {} leader node is electing, waiting for 100 ms at executor '{}'", jobName, jobName, executorName);
BlockUtils.waitingShortTime();
log.info("[{}] msg=No leader, try to election", jobName);
leaderElection();
}
return executorName.equals(getJobNodeStorage().getJobNodeDataDirectly(ElectionNode.LEADER_HOST));
}

/**
* 判断是否已经有主节点.
*
* <p>
* 仅为选举监听使用.
* 程序中其他地方判断是否有主节点应使用{@code isLeader() }方法.
* </p>
* 判断是否已经有主节点
*
* @return 是否已经有主节点
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,14 @@ private boolean blockUntilShardingComplatedIfNotLeader() throws JobShuttingDownE
if(!(isNeedSharding() || getJobNodeStorage().isJobNodeExisted(ShardingNode.PROCESSING))) {
return true;
}
log.debug("Elastic job: sleep short time until sharding completed.");
log.debug("[{}] msg=Sleep short time until sharding completed", jobName);
BlockUtils.waitingShortTime();
}
}

private void waitingOtherJobCompleted() {
while (!isShutdown && executionService.hasRunningItems()) {
log.info("Elastic job: sleep short time until other job completed.");
log.info("[{}] msg=Sleep short time until other job completed.", jobName);
BlockUtils.waitingShortTime();
}
}
Expand Down

0 comments on commit dac0e81

Please sign in to comment.