Skip to content

Commit

Permalink
Merge pull request #5660 from oasisprotocol/peternose/bugfix/retry-sc…
Browse files Browse the repository at this point in the history
…heduling

go/worker/compute/executor/committee: Retry scheduling on failure
  • Loading branch information
peternose authored Apr 24, 2024
2 parents 0bb395b + 7e631d5 commit 76f849c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 2 deletions.
1 change: 1 addition & 0 deletions .changelog/5660.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/compute/executor/committee: Retry scheduling on failure
15 changes: 13 additions & 2 deletions go/worker/compute/executor/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,8 @@ func (n *Node) startSchedulingBatch(ctx context.Context, batch []*txpool.TxQueue
n.logger.Error("runtime batch execution failed",
"err", err,
)
// Notify the round worker that the execution failed.
n.processedBatchCh <- nil
return
}

Expand All @@ -503,7 +505,7 @@ func (n *Node) startSchedulingBatch(ctx context.Context, batch []*txpool.TxQueue
Batch: rsp.TxHashes,
}

// Submit response to the executor worker.
// Submit response to the round worker.
n.processedBatchCh <- &processedBatch{
proposal: &proposal,
rank: n.rank,
Expand Down Expand Up @@ -639,6 +641,8 @@ func (n *Node) startProcessingBatch(ctx context.Context, proposal *commitment.Pr
n.logger.Error("runtime batch execution failed",
"err", err,
)
// Notify the round worker that the execution failed.
n.processedBatchCh <- nil
return
}

Expand Down Expand Up @@ -1042,6 +1046,13 @@ func (n *Node) handleProcessedBatch(ctx context.Context, batch *processedBatch)
}
lastHeader := n.blockInfo.RuntimeBlock.Header

// A nil batch indicates that scheduling or processing has failed.
// Return to the initial state and retry.
if batch == nil {
n.transitionState(StateWaitingForBatch{})
return
}

// Check if there was an issue during batch processing.
if batch.computed == nil {
n.logger.Warn("worker has aborted batch processing")
Expand Down Expand Up @@ -1509,7 +1520,7 @@ func (n *Node) roundWorker(ctx context.Context) {
// Process observed executor commitments.
n.handleObservedExecutorCommitment(ctx, ec)
case batch := <-n.processedBatchCh:
// Batch processing has finished.
// Batch processing has either finished or failed.
n.handleProcessedBatch(ctx, batch)
case <-schedulerRankTicker.C:
// Change scheduler rank and try again.
Expand Down

0 comments on commit 76f849c

Please sign in to comment.