Skip to content

Commit

Permalink
core/validatorapi: return requested validators (#3123)
Browse files Browse the repository at this point in the history
Instead of always returning the full cache on Validators() requests, only return whatever was requested originally.

Still check cache first, then upstream BN, then if anything comes up return to the caller.

Harden tests to check for this behavior.

Log a debug message when refreshing the validator cache.

Make sure to always refresh the validator cache as soon as the first observed epoch begins, then fall back to the standard refresh pattern (once every epoch).

category: bug
ticket: none
  • Loading branch information
gsora authored Jun 6, 2024
1 parent 43979af commit ed1d55a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 15 deletions.
33 changes: 31 additions & 2 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,14 +422,43 @@ func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
// Setup validator cache, refreshing it every epoch.
valCache := eth2wrap.NewValidatorCache(eth2Cl, eth2Pubkeys)
eth2Cl.SetValidatorCache(valCache.Get)

firstValCacheRefresh := true
var fvcrLock sync.RWMutex

shouldUpdateCache := func(slot core.Slot, lock *sync.RWMutex) bool {
lock.RLock()
defer lock.RUnlock()

if !slot.FirstInEpoch() && !firstValCacheRefresh {
return false
}

return true
}

sched.SubscribeSlots(func(ctx context.Context, slot core.Slot) error {
if !slot.FirstInEpoch() {
if !shouldUpdateCache(slot, &fvcrLock) {
return nil
}

fvcrLock.Lock()
defer fvcrLock.Unlock()

ctx = log.WithCtx(ctx, z.Bool("first_refresh", firstValCacheRefresh))

log.Debug(ctx, "Refreshing validator cache")

valCache.Trim()
_, _, err := valCache.Get(ctx)
if err != nil {
log.Error(ctx, "Cannot refresh validator cache", err)
return err
}

return err
firstValCacheRefresh = false

return nil
})

gaterFunc, err := core.NewDutyGater(ctx, eth2Cl)
Expand Down
25 changes: 16 additions & 9 deletions core/validatorapi/validatorapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,23 +991,30 @@ func (c Component) Validators(ctx context.Context, opts *eth2api.ValidatorsOpts)
pubkeys = append(pubkeys, pubkey)
}

var nonCachedPubkeys []eth2p0.BLSPubKey
var (
nonCachedPubkeys []eth2p0.BLSPubKey
ret = make(map[eth2p0.ValidatorIndex]*eth2v1.Validator)
)

// Index cached validators by their pubkey for quicker lookup
cvMap := make(map[eth2p0.BLSPubKey]struct{})
for _, cpubkey := range cachedValidators {
cvMap[cpubkey.Validator.PublicKey] = struct{}{}
cvMap := make(map[eth2p0.BLSPubKey]eth2p0.ValidatorIndex)
for vIdx, cpubkey := range cachedValidators {
cvMap[cpubkey.Validator.PublicKey] = vIdx
}

// Check if any of the pubkeys passed as argument are already cached
for _, ncVal := range pubkeys {
if _, ok := cvMap[ncVal]; !ok {
vIdx, ok := cvMap[ncVal]
if !ok {
nonCachedPubkeys = append(nonCachedPubkeys, ncVal)
continue
}

ret[vIdx] = cachedValidators[vIdx]
}

if len(nonCachedPubkeys) != 0 {
log.Debug(ctx, "Validators HTTP request for non-cached validators", z.Int("pubkeys_amount", len(nonCachedPubkeys)))
if len(nonCachedPubkeys) != 0 || len(opts.Indices) > 0 {
log.Debug(ctx, "Requesting validators to upstream beacon node", z.Int("non_cached_pubkeys_amount", len(nonCachedPubkeys)), z.Int("indices", len(opts.Indices)))

opts.PubKeys = nonCachedPubkeys

Expand All @@ -1016,13 +1023,13 @@ func (c Component) Validators(ctx context.Context, opts *eth2api.ValidatorsOpts)
return nil, errors.Wrap(err, "fetching non-cached validators from BN")
}
for idx, val := range eth2Resp.Data {
cachedValidators[idx] = val
ret[idx] = val
}
} else {
log.Debug(ctx, "All validators requested were cached", z.Int("amount_requested", len(opts.PubKeys)))
}

convertedVals, err := c.convertValidators(cachedValidators, len(opts.Indices) == 0)
convertedVals, err := c.convertValidators(ret, len(opts.Indices) == 0)
if err != nil {
return nil, err
}
Expand Down
22 changes: 18 additions & 4 deletions core/validatorapi/validatorapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,7 @@ func TestComponent_ValidatorCache(t *testing.T) {
var (
allPubSharesByKey = make(map[core.PubKey]map[int]tbls.PublicKey)
keyByPubshare = make(map[tbls.PublicKey]core.PubKey)
valByPubkey = make(map[eth2p0.BLSPubKey]*eth2v1.Validator)

complete = make(eth2wrap.CompleteValidators)
pubshares []eth2p0.BLSPubKey
Expand All @@ -1591,6 +1592,7 @@ func TestComponent_ValidatorCache(t *testing.T) {

for idx, val := range baseValSet {
complete[idx] = val
valByPubkey[val.Validator.PublicKey] = val
}

bmock, err := beaconmock.New(beaconmock.WithValidatorSet(baseValSet))
Expand All @@ -1606,7 +1608,16 @@ func TestComponent_ValidatorCache(t *testing.T) {
var valEndpointInvocations int
bmock.ValidatorsFunc = func(ctx context.Context, opts *eth2api.ValidatorsOpts) (map[eth2p0.ValidatorIndex]*eth2v1.Validator, error) {
valEndpointInvocations += len(opts.PubKeys) + len(opts.Indices)
return baseValSet, nil

ret := make(map[eth2p0.ValidatorIndex]*eth2v1.Validator)

for _, pk := range opts.PubKeys {
if val, ok := valByPubkey[pk]; ok {
ret[val.Index] = val
}
}

return ret, nil
}

i := 4
Expand All @@ -1633,32 +1644,35 @@ func TestComponent_ValidatorCache(t *testing.T) {
require.NoError(t, err)

// request validators that are completely cached
_, err = vapi.Validators(context.Background(), &eth2api.ValidatorsOpts{
ret, err := vapi.Validators(context.Background(), &eth2api.ValidatorsOpts{
State: "head",
PubKeys: pubshares,
})
require.NoError(t, err)
require.Equal(t, 0, valEndpointInvocations)
require.Len(t, ret.Data, len(pubshares))

// request validators that are not cached at all by removing singleVal from the cache
delete(complete, singleVal.Index)

share := allPubSharesByKey[core.PubKeyFrom48Bytes(singleVal.Validator.PublicKey)][1]

_, err = vapi.Validators(context.Background(), &eth2api.ValidatorsOpts{
ret, err = vapi.Validators(context.Background(), &eth2api.ValidatorsOpts{
State: "head",
PubKeys: []eth2p0.BLSPubKey{eth2p0.BLSPubKey(share)},
})
require.NoError(t, err)
require.Equal(t, 1, valEndpointInvocations)
require.Len(t, ret.Data, 1)

// request half-half validators
_, err = vapi.Validators(context.Background(), &eth2api.ValidatorsOpts{
ret, err = vapi.Validators(context.Background(), &eth2api.ValidatorsOpts{
State: "head",
PubKeys: pubshares,
})
require.NoError(t, err)
require.Equal(t, 2, valEndpointInvocations)
require.Len(t, ret.Data, len(pubshares))
}

func TestComponent_GetAllValidators(t *testing.T) {
Expand Down

0 comments on commit ed1d55a

Please sign in to comment.