diff --git a/mevshare/sim_queue.go b/mevshare/sim_queue.go index e1f940e..76985e9 100644 --- a/mevshare/sim_queue.go +++ b/mevshare/sim_queue.go @@ -140,7 +140,7 @@ func (w *SimulationWorker) Process(ctx context.Context, data []byte, info simque if bundle.Metadata != nil { hash = bundle.Metadata.BundleHash } - logger := w.log.With(zap.String("bundle", hash.Hex())) + logger := w.log.With(zap.String("bundle", hash.Hex()), zap.Uint64("target_block", info.TargetBlock)) // Check if bundle was cancelled cancelled, err := w.isBundleCancelled(ctx, &bundle) @@ -179,7 +179,11 @@ func (w *SimulationWorker) Process(ctx context.Context, data []byte, info simque return simqueue.ErrProcessScheduleNextBlock } } - + // mev-share-node knows that new block already arrived, but simcluster node lagging behind so we should retry + if uint64(result.StateBlock) < info.TargetBlock-1 { + logger.Warn("Bundle simulated on outdated block, retrying") + return simqueue.ErrProcessWorkerError + } w.backgroundWg.Add(1) go func() { defer w.backgroundWg.Done() diff --git a/simqueue/queue.go b/simqueue/queue.go index 797bcbe..85488eb 100644 --- a/simqueue/queue.go +++ b/simqueue/queue.go @@ -106,7 +106,8 @@ var ( type QueueItemInfo struct { // Number of times this item was retried before the success. - Retries int + Retries int + TargetBlock uint64 } type ProcessFunc func(ctx context.Context, data []byte, info QueueItemInfo) error @@ -164,6 +165,7 @@ func (s *RedisQueue) UpdateBlock(block uint64) error { if current > block { return ErrBlockNumberIncorrect } + s.log.Debug("updating block, sbundles for the next block will be processed", zap.Uint64("current", current), zap.Uint64("new", block), zap.Time("time", time.Now())) atomic.StoreUint64(s.currentBlock, block) return nil } @@ -338,7 +340,7 @@ func (s *RedisQueue) processNextItem(ctx context.Context, process ProcessFunc) e // process item workerCtx, workerCancel := context.WithTimeout(ctx, s.Config.WorkerTimeout) defer workerCancel() - info := QueueItemInfo{Retries: int(args.iteration)} + info := QueueItemInfo{Retries: int(args.iteration), TargetBlock: args.minTargetBlock} err = process(workerCtx, args.data, info) switch {