Skip to content

Commit

Permalink
[CELEBORN-932] Fix worker register after gracefaully restart
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Worker will firstly register failed after worker gracefully restart in HA mode, it will be really registered after one heartbeat.
<img width="889" alt="image" src="https://github.com/apache/incubator-celeborn/assets/19429353/371aa0e0-b2e9-4c1f-9e40-276dc1460219">
This is because master here uses same `requestId` to submit request,  causing the second request not be processed correctly, due to Ratis `RetryCache`.
Master logs like below:
(worker gracefully stop)
Master: Receive ReportNodeFailure
(worker start)
Master: Received RegisterWorker request
Master: Received heartbeat from unknown worker
Master: Registered worker

So here improve AbstractMetaManager#updateRegisterWorkerMeta to cover `WorkerRemove` logic. For back compatibility and possible inconsistencies during rolling upgrade, temporarily fix duplicate requestId and keep remove function. And we can try to remove `WorkerRemove` logic in the future version.

### Why are the changes needed?
Ditto

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Cluster test

Closes #1863 from onebox-li/fix-restart-register.

Authored-by: onebox-li <lyh-36@163.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
  • Loading branch information
onebox-li authored and waitinfuture committed Sep 11, 2023
1 parent d7e900f commit 0e53a3d
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ public void updateRegisterWorkerMeta(
synchronized (workers) {
if (!workers.contains(workerInfo)) {
workers.add(workerInfo);
shutdownWorkers.remove(workerInfo);
lostWorkers.remove(workerInfo);
}
shutdownWorkers.remove(workerInfo);
lostWorkers.remove(workerInfo);
excludedWorkers.remove(workerInfo);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,9 @@ private[celeborn] class Master(
if (workersSnapShot.contains(workerToRegister)) {
logWarning(s"Receive RegisterWorker while worker" +
s" ${workerToRegister.toString()} already exists, re-register.")
// TODO: remove `WorkerRemove` because we have improve register logic to cover `WorkerRemove`
statusSystem.handleWorkerRemove(host, rpcPort, pushPort, fetchPort, replicatePort, requestId)
val newRequestId = MasterClient.genRequestId()
statusSystem.handleRegisterWorker(
host,
rpcPort,
Expand All @@ -532,7 +534,7 @@ private[celeborn] class Master(
replicatePort,
disks,
userResourceConsumption,
requestId)
newRequestId)
context.reply(RegisterWorkerResponse(true, "Worker in snapshot, re-register."))
} else if (statusSystem.workerLostEvents.contains(workerToRegister)) {
logWarning(s"Receive RegisterWorker while worker $workerToRegister " +
Expand Down

0 comments on commit 0e53a3d

Please sign in to comment.