Skip to content

Commit

Permalink
process_registry_updates without a full copy of the validator set (#1…
Browse files Browse the repository at this point in the history
…4130)

* Avoid copying validator set in ProcessRegistryUpdates

* Fix bug with sortIndices. Thanks to @terencechain for the expert debugging
  • Loading branch information
prestonvanloon authored Jun 27, 2024
1 parent e5dd73f commit 787e4a3
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 66 deletions.
7 changes: 6 additions & 1 deletion beacon-chain/core/epoch/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["epoch_processing.go"],
srcs = [
"epoch_processing.go",
"sortable_indices.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/epoch",
visibility = [
"//beacon-chain:__subpackages__",
Expand Down Expand Up @@ -31,6 +34,7 @@ go_test(
srcs = [
"epoch_processing_fuzz_test.go",
"epoch_processing_test.go",
"sortable_indices_test.go",
],
embed = [":go_default_library"],
deps = [
Expand All @@ -47,6 +51,7 @@ go_test(
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@com_github_google_go_cmp//cmp:go_default_library",
"@com_github_google_gofuzz//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
Expand Down
112 changes: 57 additions & 55 deletions beacon-chain/core/epoch/epoch_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/runtime/version"
)

// sortableIndices implements the Sort interface to sort newly activated validator indices
// by activation epoch and by index number.
type sortableIndices struct {
indices []primitives.ValidatorIndex
validators []*ethpb.Validator
}

// Len is the number of elements in the collection.
func (s sortableIndices) Len() int { return len(s.indices) }

// Swap swaps the elements with indexes i and j.
func (s sortableIndices) Swap(i, j int) { s.indices[i], s.indices[j] = s.indices[j], s.indices[i] }

// Less reports whether the element with index i must sort before the element with index j.
func (s sortableIndices) Less(i, j int) bool {
if s.validators[s.indices[i]].ActivationEligibilityEpoch == s.validators[s.indices[j]].ActivationEligibilityEpoch {
return s.indices[i] < s.indices[j]
}
return s.validators[s.indices[i]].ActivationEligibilityEpoch < s.validators[s.indices[j]].ActivationEligibilityEpoch
}

// AttestingBalance returns the total balance from all the attesting indices.
//
// WARNING: This method allocates a new copy of the attesting validator indices set and is
Expand Down Expand Up @@ -91,55 +70,78 @@ func AttestingBalance(ctx context.Context, state state.ReadOnlyBeaconState, atts
// for index in activation_queue[:get_validator_churn_limit(state)]:
// validator = state.validators[index]
// validator.activation_epoch = compute_activation_exit_epoch(get_current_epoch(state))
func ProcessRegistryUpdates(ctx context.Context, state state.BeaconState) (state.BeaconState, error) {
currentEpoch := time.CurrentEpoch(state)
vals := state.Validators()
func ProcessRegistryUpdates(ctx context.Context, st state.BeaconState) (state.BeaconState, error) {
currentEpoch := time.CurrentEpoch(st)
var err error
ejectionBal := params.BeaconConfig().EjectionBalance
activationEligibilityEpoch := time.CurrentEpoch(state) + 1
for idx, validator := range vals {
// Process the validators for activation eligibility.
if helpers.IsEligibleForActivationQueue(validator, currentEpoch) {
validator.ActivationEligibilityEpoch = activationEligibilityEpoch
if err := state.UpdateValidatorAtIndex(primitives.ValidatorIndex(idx), validator); err != nil {
return nil, err
}

// To avoid copying the state validator set via st.Validators(), we will perform a read only pass
// over the validator set while collecting validator indices where the validator copy is actually
// necessary, then we will process these operations.
eligibleForActivationQ := make([]primitives.ValidatorIndex, 0)
eligibleForActivation := make([]primitives.ValidatorIndex, 0)
eligibleForEjection := make([]primitives.ValidatorIndex, 0)

if err := st.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error {
// Collect validators eligible to enter the activation queue.
if helpers.IsEligibleForActivationQueue(val, currentEpoch) {
eligibleForActivationQ = append(eligibleForActivationQ, primitives.ValidatorIndex(idx))
}

// Process the validators for ejection.
isActive := helpers.IsActiveValidator(validator, currentEpoch)
belowEjectionBalance := validator.EffectiveBalance <= ejectionBal
// Collect validators to eject.
isActive := helpers.IsActiveValidatorUsingTrie(val, currentEpoch)
belowEjectionBalance := val.EffectiveBalance() <= ejectionBal
if isActive && belowEjectionBalance {
// Here is fine to do a quadratic loop since this should
// barely happen
maxExitEpoch, churn := validators.MaxExitEpochAndChurn(state)
state, _, err = validators.InitiateValidatorExit(ctx, state, primitives.ValidatorIndex(idx), maxExitEpoch, churn)
if err != nil && !errors.Is(err, validators.ErrValidatorAlreadyExited) {
return nil, errors.Wrapf(err, "could not initiate exit for validator %d", idx)
}
eligibleForEjection = append(eligibleForEjection, primitives.ValidatorIndex(idx))
}

// Collect validators eligible for activation and not yet dequeued for activation.
if helpers.IsEligibleForActivationUsingTrie(st, val) {
eligibleForActivation = append(eligibleForActivation, primitives.ValidatorIndex(idx))
}

return nil
}); err != nil {
return st, fmt.Errorf("failed to read validators: %w", err)
}

// Queue validators eligible for activation and not yet dequeued for activation.
var activationQ []primitives.ValidatorIndex
for idx, validator := range vals {
if helpers.IsEligibleForActivation(state, validator) {
activationQ = append(activationQ, primitives.ValidatorIndex(idx))
// Process validators for activation eligibility.
activationEligibilityEpoch := time.CurrentEpoch(st) + 1
for _, idx := range eligibleForActivationQ {
v, err := st.ValidatorAtIndex(idx)
if err != nil {
return nil, err
}
v.ActivationEligibilityEpoch = activationEligibilityEpoch
if err := st.UpdateValidatorAtIndex(idx, v); err != nil {
return nil, err
}
}

// Process validators eligible for ejection.
for _, idx := range eligibleForEjection {
// Here is fine to do a quadratic loop since this should
// barely happen
maxExitEpoch, churn := validators.MaxExitEpochAndChurn(st)
st, _, err = validators.InitiateValidatorExit(ctx, st, idx, maxExitEpoch, churn)
if err != nil && !errors.Is(err, validators.ErrValidatorAlreadyExited) {
return nil, errors.Wrapf(err, "could not initiate exit for validator %d", idx)
}
}

sort.Sort(sortableIndices{indices: activationQ, validators: vals})
// Queue validators eligible for activation and not yet dequeued for activation.
sort.Sort(sortableIndices{indices: eligibleForActivation, state: st})

// Only activate just enough validators according to the activation churn limit.
limit := uint64(len(activationQ))
activeValidatorCount, err := helpers.ActiveValidatorCount(ctx, state, currentEpoch)
limit := uint64(len(eligibleForActivation))
activeValidatorCount, err := helpers.ActiveValidatorCount(ctx, st, currentEpoch)
if err != nil {
return nil, errors.Wrap(err, "could not get active validator count")
}

churnLimit := helpers.ValidatorActivationChurnLimit(activeValidatorCount)

if state.Version() >= version.Deneb {
if st.Version() >= version.Deneb {
churnLimit = helpers.ValidatorActivationChurnLimitDeneb(activeValidatorCount)
}

Expand All @@ -149,17 +151,17 @@ func ProcessRegistryUpdates(ctx context.Context, state state.BeaconState) (state
}

activationExitEpoch := helpers.ActivationExitEpoch(currentEpoch)
for _, index := range activationQ[:limit] {
validator, err := state.ValidatorAtIndex(index)
for _, index := range eligibleForActivation[:limit] {
validator, err := st.ValidatorAtIndex(index)
if err != nil {
return nil, err
}
validator.ActivationEpoch = activationExitEpoch
if err := state.UpdateValidatorAtIndex(index, validator); err != nil {
if err := st.UpdateValidatorAtIndex(index, validator); err != nil {
return nil, err
}
}
return state, nil
return st, nil
}

// ProcessSlashings processes the slashed validators during epoch processing,
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/core/epoch/epoch_processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,15 @@ func TestProcessRegistryUpdates_NoRotation(t *testing.T) {
}

func TestProcessRegistryUpdates_EligibleToActivate(t *testing.T) {
finalizedEpoch := primitives.Epoch(4)
base := &ethpb.BeaconState{
Slot: 5 * params.BeaconConfig().SlotsPerEpoch,
FinalizedCheckpoint: &ethpb.Checkpoint{Epoch: 6, Root: make([]byte, fieldparams.RootLength)},
FinalizedCheckpoint: &ethpb.Checkpoint{Epoch: finalizedEpoch, Root: make([]byte, fieldparams.RootLength)},
}
limit := helpers.ValidatorActivationChurnLimit(0)
for i := uint64(0); i < limit+10; i++ {
base.Validators = append(base.Validators, &ethpb.Validator{
ActivationEligibilityEpoch: params.BeaconConfig().FarFutureEpoch,
ActivationEligibilityEpoch: finalizedEpoch,
EffectiveBalance: params.BeaconConfig().MaxEffectiveBalance,
ActivationEpoch: params.BeaconConfig().FarFutureEpoch,
})
Expand All @@ -325,7 +326,6 @@ func TestProcessRegistryUpdates_EligibleToActivate(t *testing.T) {
newState, err := epoch.ProcessRegistryUpdates(context.Background(), beaconState)
require.NoError(t, err)
for i, validator := range newState.Validators() {
assert.Equal(t, currentEpoch+1, validator.ActivationEligibilityEpoch, "Could not update registry %d, unexpected activation eligibility epoch", i)
if uint64(i) < limit && validator.ActivationEpoch != helpers.ActivationExitEpoch(currentEpoch) {
t.Errorf("Could not update registry %d, validators failed to activate: wanted activation epoch %d, got %d",
i, helpers.ActivationExitEpoch(currentEpoch), validator.ActivationEpoch)
Expand All @@ -338,9 +338,10 @@ func TestProcessRegistryUpdates_EligibleToActivate(t *testing.T) {
}

func TestProcessRegistryUpdates_EligibleToActivate_Cancun(t *testing.T) {
finalizedEpoch := primitives.Epoch(4)
base := &ethpb.BeaconStateDeneb{
Slot: 5 * params.BeaconConfig().SlotsPerEpoch,
FinalizedCheckpoint: &ethpb.Checkpoint{Epoch: 6, Root: make([]byte, fieldparams.RootLength)},
FinalizedCheckpoint: &ethpb.Checkpoint{Epoch: finalizedEpoch, Root: make([]byte, fieldparams.RootLength)},
}
cfg := params.BeaconConfig()
cfg.MinPerEpochChurnLimit = 10
Expand All @@ -349,7 +350,7 @@ func TestProcessRegistryUpdates_EligibleToActivate_Cancun(t *testing.T) {

for i := uint64(0); i < 10; i++ {
base.Validators = append(base.Validators, &ethpb.Validator{
ActivationEligibilityEpoch: params.BeaconConfig().FarFutureEpoch,
ActivationEligibilityEpoch: finalizedEpoch,
EffectiveBalance: params.BeaconConfig().MaxEffectiveBalance,
ActivationEpoch: params.BeaconConfig().FarFutureEpoch,
})
Expand All @@ -360,7 +361,6 @@ func TestProcessRegistryUpdates_EligibleToActivate_Cancun(t *testing.T) {
newState, err := epoch.ProcessRegistryUpdates(context.Background(), beaconState)
require.NoError(t, err)
for i, validator := range newState.Validators() {
assert.Equal(t, currentEpoch+1, validator.ActivationEligibilityEpoch, "Could not update registry %d, unexpected activation eligibility epoch", i)
// Note: In Deneb, only validators indices before `MaxPerEpochActivationChurnLimit` should be activated.
if uint64(i) < params.BeaconConfig().MaxPerEpochActivationChurnLimit && validator.ActivationEpoch != helpers.ActivationExitEpoch(currentEpoch) {
t.Errorf("Could not update registry %d, validators failed to activate: wanted activation epoch %d, got %d",
Expand Down
35 changes: 35 additions & 0 deletions beacon-chain/core/epoch/sortable_indices.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package epoch

import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
)

// sortableIndices implements the Sort interface to sort newly activated validator indices
// by activation epoch and by index number.
type sortableIndices struct {
indices []primitives.ValidatorIndex
state state.ReadOnlyValidators
}

// Len is the number of elements in the collection.
func (s sortableIndices) Len() int { return len(s.indices) }

// Swap swaps the elements with indexes i and j.
func (s sortableIndices) Swap(i, j int) { s.indices[i], s.indices[j] = s.indices[j], s.indices[i] }

// Less reports whether the element with index i must sort before the element with index j.
func (s sortableIndices) Less(i, j int) bool {
vi, erri := s.state.ValidatorAtIndexReadOnly(s.indices[i])
vj, errj := s.state.ValidatorAtIndexReadOnly(s.indices[j])

if erri != nil || errj != nil {
return false
}

a, b := vi.ActivationEligibilityEpoch(), vj.ActivationEligibilityEpoch()
if a == b {
return s.indices[i] < s.indices[j]
}
return a < b
}
53 changes: 53 additions & 0 deletions beacon-chain/core/epoch/sortable_indices_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package epoch

import (
"sort"
"testing"

"github.com/google/go-cmp/cmp"
state_native "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/state-native"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/require"
)

func TestSortableIndices(t *testing.T) {
st, err := state_native.InitializeFromProtoPhase0(&eth.BeaconState{
Validators: []*eth.Validator{
{ActivationEligibilityEpoch: 0},
{ActivationEligibilityEpoch: 5},
{ActivationEligibilityEpoch: 4},
{ActivationEligibilityEpoch: 4},
{ActivationEligibilityEpoch: 2},
{ActivationEligibilityEpoch: 1},
},
})
require.NoError(t, err)

s := sortableIndices{
indices: []primitives.ValidatorIndex{
4,
2,
5,
3,
1,
0,
},
state: st,
}

sort.Sort(s)

want := []primitives.ValidatorIndex{
0,
5,
4,
2, // Validators with the same ActivationEligibilityEpoch are sorted by index, ascending.
3,
1,
}

if !cmp.Equal(s.indices, want) {
t.Errorf("Failed to sort indices correctly, wanted %v, got %v", want, s.indices)
}
}
6 changes: 3 additions & 3 deletions beacon-chain/core/helpers/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,11 @@ func ComputeProposerIndex(bState state.ReadOnlyValidators, activeIndices []primi
// validator.activation_eligibility_epoch == FAR_FUTURE_EPOCH
// and validator.effective_balance >= MIN_ACTIVATION_BALANCE # [Modified in Electra:EIP7251]
// )
func IsEligibleForActivationQueue(validator *ethpb.Validator, currentEpoch primitives.Epoch) bool {
func IsEligibleForActivationQueue(validator state.ReadOnlyValidator, currentEpoch primitives.Epoch) bool {
if currentEpoch >= params.BeaconConfig().ElectraForkEpoch {
return isEligibleForActivationQueueElectra(validator.ActivationEligibilityEpoch, validator.EffectiveBalance)
return isEligibleForActivationQueueElectra(validator.ActivationEligibilityEpoch(), validator.EffectiveBalance())
}
return isEligibleForActivationQueue(validator.ActivationEligibilityEpoch, validator.EffectiveBalance)
return isEligibleForActivationQueue(validator.ActivationEligibilityEpoch(), validator.EffectiveBalance())
}

// isEligibleForActivationQueue carries out the logic for IsEligibleForActivationQueue
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/core/helpers/validators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,9 @@ func TestIsEligibleForActivationQueue(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
helpers.ClearCache()

assert.Equal(t, tt.want, helpers.IsEligibleForActivationQueue(tt.validator, tt.currentEpoch), "IsEligibleForActivationQueue()")
v, err := state_native.NewValidator(tt.validator)
assert.NoError(t, err)
assert.Equal(t, tt.want, helpers.IsEligibleForActivationQueue(v, tt.currentEpoch), "IsEligibleForActivationQueue()")
})
}
}
Expand Down

0 comments on commit 787e4a3

Please sign in to comment.