Skip to content

Commit

Permalink
fix block lagging
Browse files Browse the repository at this point in the history
  • Loading branch information
TymKh committed Mar 8, 2024
1 parent daf636b commit bdfca5b
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
8 changes: 6 additions & 2 deletions mevshare/sim_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (w *SimulationWorker) Process(ctx context.Context, data []byte, info simque
if bundle.Metadata != nil {
hash = bundle.Metadata.BundleHash
}
logger := w.log.With(zap.String("bundle", hash.Hex()))
logger := w.log.With(zap.String("bundle", hash.Hex()), zap.Uint64("target_block", info.TargetBlock))

// Check if bundle was cancelled
cancelled, err := w.isBundleCancelled(ctx, &bundle)
Expand Down Expand Up @@ -179,7 +179,11 @@ func (w *SimulationWorker) Process(ctx context.Context, data []byte, info simque
return simqueue.ErrProcessScheduleNextBlock
}
}

// mev-share-node knows that new block already arrived, but simcluster node lagging behind so we should retry
if uint64(result.StateBlock) < info.TargetBlock-1 {
logger.Warn("Bundle simulated on outdated block, retrying")
return simqueue.ErrProcessWorkerError
}
w.backgroundWg.Add(1)
go func() {
defer w.backgroundWg.Done()
Expand Down
6 changes: 4 additions & 2 deletions simqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ var (

type QueueItemInfo struct {
// Number of times this item was retried before the success.
Retries int
Retries int
TargetBlock uint64
}

type ProcessFunc func(ctx context.Context, data []byte, info QueueItemInfo) error
Expand Down Expand Up @@ -164,6 +165,7 @@ func (s *RedisQueue) UpdateBlock(block uint64) error {
if current > block {
return ErrBlockNumberIncorrect
}
s.log.Debug("updating block, sbundles for the next block will be processed", zap.Uint64("current", current), zap.Uint64("new", block), zap.Time("time", time.Now()))
atomic.StoreUint64(s.currentBlock, block)
return nil
}
Expand Down Expand Up @@ -338,7 +340,7 @@ func (s *RedisQueue) processNextItem(ctx context.Context, process ProcessFunc) e
// process item
workerCtx, workerCancel := context.WithTimeout(ctx, s.Config.WorkerTimeout)
defer workerCancel()
info := QueueItemInfo{Retries: int(args.iteration)}
info := QueueItemInfo{Retries: int(args.iteration), TargetBlock: args.minTargetBlock}
err = process(workerCtx, args.data, info)

switch {
Expand Down

0 comments on commit bdfca5b

Please sign in to comment.