diff --git a/datastore/redis.go b/datastore/redis.go index 36a0ebc2..ab54b3b6 100644 --- a/datastore/redis.go +++ b/datastore/redis.go @@ -147,6 +147,7 @@ func NewRedisCache(prefix, redisURI, readonlyURI string) (*RedisCache, error) { keyBlockBuilderStatus: fmt.Sprintf("%s/%s:block-builder-status", redisPrefix, prefix), keyLastSlotDelivered: fmt.Sprintf("%s/%s:last-slot-delivered", redisPrefix, prefix), keyLastHashDelivered: fmt.Sprintf("%s/%s:last-hash-delivered", redisPrefix, prefix), + currentSlot: 0, }, nil } @@ -749,7 +750,7 @@ func (r *RedisCache) SetFloorBidValue(slot uint64, parentHash, proposerPubkey, v func (r *RedisCache) BeginProcessingSlot(ctx context.Context, slot uint64) (err error) { // Should never process more than one slot at a time if r.currentSlot != 0 { - return fmt.Errorf("already processing slot %d", r.currentSlot) + return fmt.Errorf("already processing slot %d", r.currentSlot) //nolint:goerr113 } keyProcessingSlot := r.keyProcessingSlot(slot) diff --git a/services/api/optimistic_test.go b/services/api/optimistic_test.go index b0f6e491..b5e51b2a 100644 --- a/services/api/optimistic_test.go +++ b/services/api/optimistic_test.go @@ -295,7 +295,7 @@ func TestPrepareBuildersForSlot(t *testing.T) { pkStr := pubkey.String() // Clear cache. backend.relay.blockBuildersCache = map[string]*blockBuilderCacheEntry{} - backend.relay.prepareBuildersForSlot(slot + 1) + backend.relay.prepareBuildersForSlot(slot+1, slot) entry, ok := backend.relay.blockBuildersCache[pkStr] require.True(t, ok) require.Equal(t, true, entry.status.IsHighPrio) diff --git a/services/api/service.go b/services/api/service.go index 4d7fef74..25893b4f 100644 --- a/services/api/service.go +++ b/services/api/service.go @@ -507,7 +507,7 @@ func (api *RelayAPI) IsReady() bool { // - Stop returning bids // - Set ready /readyz to negative status // - Wait a bit to allow removal of service from load balancer and draining of requests -// - If in the middle of proccessing optimistic blocks, wait for those to finish and release redis lock +// - If in the middle of processing optimistic blocks, wait for those to finish and release redis lock func (api *RelayAPI) StopServer() (err error) { // avoid running this twice. setting srvShutdown to true makes /readyz switch to negative status if wasStopping := api.srvShutdown.Swap(true); wasStopping { @@ -532,7 +532,10 @@ func (api *RelayAPI) StopServer() (err error) { // wait for optimistic blocks api.optimisticBlocksWG.Wait() - api.redis.EndProcessingSlot(context.Background()) + err = api.redis.EndProcessingSlot(context.Background()) + if err != nil { + api.log.WithError(err).Error("failed to update redis optimistic processing slot") + } // shutdown return api.srv.Shutdown(context.Background()) @@ -791,13 +794,19 @@ func (api *RelayAPI) updateProposerDuties(headSlot uint64) { api.log.Infof("proposer duties updated: %s", strings.Join(_duties, ", ")) } -func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64, prevHeadSlot uint64) { +func (api *RelayAPI) prepareBuildersForSlot(headSlot, prevHeadSlot uint64) { // First wait for this process to finish processing optimistic blocks api.optimisticBlocksWG.Wait() // Now we release our lock and wait for all other builder processes to wrap up - api.redis.EndProcessingSlot(context.Background()) - api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot + 1) + err := api.redis.EndProcessingSlot(context.Background()) + if err != nil { + api.log.WithError(err).Error("failed to update redis optimistic processing slot") + } + err = api.redis.WaitForSlotComplete(context.Background(), prevHeadSlot+1) + if err != nil { + api.log.WithError(err).Error("failed to get redis optimistic processing slot") + } // Prevent race with StopServer, make sure we don't lock up redis if the server is shutting down if api.srvShutdown.Load() { @@ -806,7 +815,10 @@ func (api *RelayAPI) prepareBuildersForSlot(headSlot uint64, prevHeadSlot uint64 // Update the optimistic slot and signal processing of the next slot api.optimisticSlot.Store(headSlot + 1) - api.redis.BeginProcessingSlot(context.Background(), headSlot + 1) + err = api.redis.BeginProcessingSlot(context.Background(), headSlot+1) + if err != nil { + api.log.WithError(err).Error("failed to update redis optimistic processing slot") + } builders, err := api.db.GetBlockBuilders() if err != nil { @@ -1321,7 +1333,10 @@ func (api *RelayAPI) handleGetPayload(w http.ResponseWriter, req *http.Request) } // Wait until optimistic blocks are complete using the redis waitgroup - api.redis.WaitForSlotComplete(context.Background(), payload.Slot()) + err = api.redis.WaitForSlotComplete(context.Background(), payload.Slot()) + if err != nil { + api.log.WithError(err).Error("failed to get redis optimistic processing slot") + } // Check if there is a demotion for the winning block. _, err = api.db.GetBuilderDemotion(bidTrace)