From f996ea46234c14efcf220622fd89c5c7cdc8a24a Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Thu, 27 Jun 2024 11:10:29 -0500 Subject: [PATCH] process_registry_updates without a full copy of the validator set (#14130) * Avoid copying validator set in ProcessRegistryUpdates * Fix bug with sortIndices. Thanks to @terencechain for the expert debugging --- beacon-chain/core/epoch/BUILD.bazel | 7 +- beacon-chain/core/epoch/epoch_processing.go | 112 +++++++++--------- .../core/epoch/epoch_processing_test.go | 12 +- beacon-chain/core/epoch/sortable_indices.go | 35 ++++++ .../core/epoch/sortable_indices_test.go | 53 +++++++++ beacon-chain/core/helpers/validators.go | 6 +- beacon-chain/core/helpers/validators_test.go | 4 +- 7 files changed, 163 insertions(+), 66 deletions(-) create mode 100644 beacon-chain/core/epoch/sortable_indices.go create mode 100644 beacon-chain/core/epoch/sortable_indices_test.go diff --git a/beacon-chain/core/epoch/BUILD.bazel b/beacon-chain/core/epoch/BUILD.bazel index 72e233695905..e6f559e55916 100644 --- a/beacon-chain/core/epoch/BUILD.bazel +++ b/beacon-chain/core/epoch/BUILD.bazel @@ -2,7 +2,10 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["epoch_processing.go"], + srcs = [ + "epoch_processing.go", + "sortable_indices.go", + ], importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/epoch", visibility = [ "//beacon-chain:__subpackages__", @@ -31,6 +34,7 @@ go_test( srcs = [ "epoch_processing_fuzz_test.go", "epoch_processing_test.go", + "sortable_indices_test.go", ], embed = [":go_default_library"], deps = [ @@ -47,6 +51,7 @@ go_test( "//testing/assert:go_default_library", "//testing/require:go_default_library", "//testing/util:go_default_library", + "@com_github_google_go_cmp//cmp:go_default_library", "@com_github_google_gofuzz//:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@org_golang_google_protobuf//proto:go_default_library", diff --git a/beacon-chain/core/epoch/epoch_processing.go b/beacon-chain/core/epoch/epoch_processing.go index e941997179e4..a1ea5a289759 100644 --- a/beacon-chain/core/epoch/epoch_processing.go +++ b/beacon-chain/core/epoch/epoch_processing.go @@ -24,27 +24,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/runtime/version" ) -// sortableIndices implements the Sort interface to sort newly activated validator indices -// by activation epoch and by index number. -type sortableIndices struct { - indices []primitives.ValidatorIndex - validators []*ethpb.Validator -} - -// Len is the number of elements in the collection. -func (s sortableIndices) Len() int { return len(s.indices) } - -// Swap swaps the elements with indexes i and j. -func (s sortableIndices) Swap(i, j int) { s.indices[i], s.indices[j] = s.indices[j], s.indices[i] } - -// Less reports whether the element with index i must sort before the element with index j. -func (s sortableIndices) Less(i, j int) bool { - if s.validators[s.indices[i]].ActivationEligibilityEpoch == s.validators[s.indices[j]].ActivationEligibilityEpoch { - return s.indices[i] < s.indices[j] - } - return s.validators[s.indices[i]].ActivationEligibilityEpoch < s.validators[s.indices[j]].ActivationEligibilityEpoch -} - // AttestingBalance returns the total balance from all the attesting indices. // // WARNING: This method allocates a new copy of the attesting validator indices set and is @@ -91,55 +70,78 @@ func AttestingBalance(ctx context.Context, state state.ReadOnlyBeaconState, atts // for index in activation_queue[:get_validator_churn_limit(state)]: // validator = state.validators[index] // validator.activation_epoch = compute_activation_exit_epoch(get_current_epoch(state)) -func ProcessRegistryUpdates(ctx context.Context, state state.BeaconState) (state.BeaconState, error) { - currentEpoch := time.CurrentEpoch(state) - vals := state.Validators() +func ProcessRegistryUpdates(ctx context.Context, st state.BeaconState) (state.BeaconState, error) { + currentEpoch := time.CurrentEpoch(st) var err error ejectionBal := params.BeaconConfig().EjectionBalance - activationEligibilityEpoch := time.CurrentEpoch(state) + 1 - for idx, validator := range vals { - // Process the validators for activation eligibility. - if helpers.IsEligibleForActivationQueue(validator, currentEpoch) { - validator.ActivationEligibilityEpoch = activationEligibilityEpoch - if err := state.UpdateValidatorAtIndex(primitives.ValidatorIndex(idx), validator); err != nil { - return nil, err - } + + // To avoid copying the state validator set via st.Validators(), we will perform a read only pass + // over the validator set while collecting validator indices where the validator copy is actually + // necessary, then we will process these operations. + eligibleForActivationQ := make([]primitives.ValidatorIndex, 0) + eligibleForActivation := make([]primitives.ValidatorIndex, 0) + eligibleForEjection := make([]primitives.ValidatorIndex, 0) + + if err := st.ReadFromEveryValidator(func(idx int, val state.ReadOnlyValidator) error { + // Collect validators eligible to enter the activation queue. + if helpers.IsEligibleForActivationQueue(val, currentEpoch) { + eligibleForActivationQ = append(eligibleForActivationQ, primitives.ValidatorIndex(idx)) } - // Process the validators for ejection. - isActive := helpers.IsActiveValidator(validator, currentEpoch) - belowEjectionBalance := validator.EffectiveBalance <= ejectionBal + // Collect validators to eject. + isActive := helpers.IsActiveValidatorUsingTrie(val, currentEpoch) + belowEjectionBalance := val.EffectiveBalance() <= ejectionBal if isActive && belowEjectionBalance { - // Here is fine to do a quadratic loop since this should - // barely happen - maxExitEpoch, churn := validators.MaxExitEpochAndChurn(state) - state, _, err = validators.InitiateValidatorExit(ctx, state, primitives.ValidatorIndex(idx), maxExitEpoch, churn) - if err != nil && !errors.Is(err, validators.ErrValidatorAlreadyExited) { - return nil, errors.Wrapf(err, "could not initiate exit for validator %d", idx) - } + eligibleForEjection = append(eligibleForEjection, primitives.ValidatorIndex(idx)) } + + // Collect validators eligible for activation and not yet dequeued for activation. + if helpers.IsEligibleForActivationUsingTrie(st, val) { + eligibleForActivation = append(eligibleForActivation, primitives.ValidatorIndex(idx)) + } + + return nil + }); err != nil { + return st, fmt.Errorf("failed to read validators: %w", err) } - // Queue validators eligible for activation and not yet dequeued for activation. - var activationQ []primitives.ValidatorIndex - for idx, validator := range vals { - if helpers.IsEligibleForActivation(state, validator) { - activationQ = append(activationQ, primitives.ValidatorIndex(idx)) + // Process validators for activation eligibility. + activationEligibilityEpoch := time.CurrentEpoch(st) + 1 + for _, idx := range eligibleForActivationQ { + v, err := st.ValidatorAtIndex(idx) + if err != nil { + return nil, err + } + v.ActivationEligibilityEpoch = activationEligibilityEpoch + if err := st.UpdateValidatorAtIndex(idx, v); err != nil { + return nil, err + } + } + + // Process validators eligible for ejection. + for _, idx := range eligibleForEjection { + // Here is fine to do a quadratic loop since this should + // barely happen + maxExitEpoch, churn := validators.MaxExitEpochAndChurn(st) + st, _, err = validators.InitiateValidatorExit(ctx, st, idx, maxExitEpoch, churn) + if err != nil && !errors.Is(err, validators.ErrValidatorAlreadyExited) { + return nil, errors.Wrapf(err, "could not initiate exit for validator %d", idx) } } - sort.Sort(sortableIndices{indices: activationQ, validators: vals}) + // Queue validators eligible for activation and not yet dequeued for activation. + sort.Sort(sortableIndices{indices: eligibleForActivation, state: st}) // Only activate just enough validators according to the activation churn limit. - limit := uint64(len(activationQ)) - activeValidatorCount, err := helpers.ActiveValidatorCount(ctx, state, currentEpoch) + limit := uint64(len(eligibleForActivation)) + activeValidatorCount, err := helpers.ActiveValidatorCount(ctx, st, currentEpoch) if err != nil { return nil, errors.Wrap(err, "could not get active validator count") } churnLimit := helpers.ValidatorActivationChurnLimit(activeValidatorCount) - if state.Version() >= version.Deneb { + if st.Version() >= version.Deneb { churnLimit = helpers.ValidatorActivationChurnLimitDeneb(activeValidatorCount) } @@ -149,17 +151,17 @@ func ProcessRegistryUpdates(ctx context.Context, state state.BeaconState) (state } activationExitEpoch := helpers.ActivationExitEpoch(currentEpoch) - for _, index := range activationQ[:limit] { - validator, err := state.ValidatorAtIndex(index) + for _, index := range eligibleForActivation[:limit] { + validator, err := st.ValidatorAtIndex(index) if err != nil { return nil, err } validator.ActivationEpoch = activationExitEpoch - if err := state.UpdateValidatorAtIndex(index, validator); err != nil { + if err := st.UpdateValidatorAtIndex(index, validator); err != nil { return nil, err } } - return state, nil + return st, nil } // ProcessSlashings processes the slashed validators during epoch processing, diff --git a/beacon-chain/core/epoch/epoch_processing_test.go b/beacon-chain/core/epoch/epoch_processing_test.go index 71ebe63cd1ad..dd78dd5ca626 100644 --- a/beacon-chain/core/epoch/epoch_processing_test.go +++ b/beacon-chain/core/epoch/epoch_processing_test.go @@ -307,14 +307,15 @@ func TestProcessRegistryUpdates_NoRotation(t *testing.T) { } func TestProcessRegistryUpdates_EligibleToActivate(t *testing.T) { + finalizedEpoch := primitives.Epoch(4) base := ðpb.BeaconState{ Slot: 5 * params.BeaconConfig().SlotsPerEpoch, - FinalizedCheckpoint: ðpb.Checkpoint{Epoch: 6, Root: make([]byte, fieldparams.RootLength)}, + FinalizedCheckpoint: ðpb.Checkpoint{Epoch: finalizedEpoch, Root: make([]byte, fieldparams.RootLength)}, } limit := helpers.ValidatorActivationChurnLimit(0) for i := uint64(0); i < limit+10; i++ { base.Validators = append(base.Validators, ðpb.Validator{ - ActivationEligibilityEpoch: params.BeaconConfig().FarFutureEpoch, + ActivationEligibilityEpoch: finalizedEpoch, EffectiveBalance: params.BeaconConfig().MaxEffectiveBalance, ActivationEpoch: params.BeaconConfig().FarFutureEpoch, }) @@ -325,7 +326,6 @@ func TestProcessRegistryUpdates_EligibleToActivate(t *testing.T) { newState, err := epoch.ProcessRegistryUpdates(context.Background(), beaconState) require.NoError(t, err) for i, validator := range newState.Validators() { - assert.Equal(t, currentEpoch+1, validator.ActivationEligibilityEpoch, "Could not update registry %d, unexpected activation eligibility epoch", i) if uint64(i) < limit && validator.ActivationEpoch != helpers.ActivationExitEpoch(currentEpoch) { t.Errorf("Could not update registry %d, validators failed to activate: wanted activation epoch %d, got %d", i, helpers.ActivationExitEpoch(currentEpoch), validator.ActivationEpoch) @@ -338,9 +338,10 @@ func TestProcessRegistryUpdates_EligibleToActivate(t *testing.T) { } func TestProcessRegistryUpdates_EligibleToActivate_Cancun(t *testing.T) { + finalizedEpoch := primitives.Epoch(4) base := ðpb.BeaconStateDeneb{ Slot: 5 * params.BeaconConfig().SlotsPerEpoch, - FinalizedCheckpoint: ðpb.Checkpoint{Epoch: 6, Root: make([]byte, fieldparams.RootLength)}, + FinalizedCheckpoint: ðpb.Checkpoint{Epoch: finalizedEpoch, Root: make([]byte, fieldparams.RootLength)}, } cfg := params.BeaconConfig() cfg.MinPerEpochChurnLimit = 10 @@ -349,7 +350,7 @@ func TestProcessRegistryUpdates_EligibleToActivate_Cancun(t *testing.T) { for i := uint64(0); i < 10; i++ { base.Validators = append(base.Validators, ðpb.Validator{ - ActivationEligibilityEpoch: params.BeaconConfig().FarFutureEpoch, + ActivationEligibilityEpoch: finalizedEpoch, EffectiveBalance: params.BeaconConfig().MaxEffectiveBalance, ActivationEpoch: params.BeaconConfig().FarFutureEpoch, }) @@ -360,7 +361,6 @@ func TestProcessRegistryUpdates_EligibleToActivate_Cancun(t *testing.T) { newState, err := epoch.ProcessRegistryUpdates(context.Background(), beaconState) require.NoError(t, err) for i, validator := range newState.Validators() { - assert.Equal(t, currentEpoch+1, validator.ActivationEligibilityEpoch, "Could not update registry %d, unexpected activation eligibility epoch", i) // Note: In Deneb, only validators indices before `MaxPerEpochActivationChurnLimit` should be activated. if uint64(i) < params.BeaconConfig().MaxPerEpochActivationChurnLimit && validator.ActivationEpoch != helpers.ActivationExitEpoch(currentEpoch) { t.Errorf("Could not update registry %d, validators failed to activate: wanted activation epoch %d, got %d", diff --git a/beacon-chain/core/epoch/sortable_indices.go b/beacon-chain/core/epoch/sortable_indices.go new file mode 100644 index 000000000000..9c268bd24fa6 --- /dev/null +++ b/beacon-chain/core/epoch/sortable_indices.go @@ -0,0 +1,35 @@ +package epoch + +import ( + "github.com/prysmaticlabs/prysm/v5/beacon-chain/state" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" +) + +// sortableIndices implements the Sort interface to sort newly activated validator indices +// by activation epoch and by index number. +type sortableIndices struct { + indices []primitives.ValidatorIndex + state state.ReadOnlyValidators +} + +// Len is the number of elements in the collection. +func (s sortableIndices) Len() int { return len(s.indices) } + +// Swap swaps the elements with indexes i and j. +func (s sortableIndices) Swap(i, j int) { s.indices[i], s.indices[j] = s.indices[j], s.indices[i] } + +// Less reports whether the element with index i must sort before the element with index j. +func (s sortableIndices) Less(i, j int) bool { + vi, erri := s.state.ValidatorAtIndexReadOnly(s.indices[i]) + vj, errj := s.state.ValidatorAtIndexReadOnly(s.indices[j]) + + if erri != nil || errj != nil { + return false + } + + a, b := vi.ActivationEligibilityEpoch(), vj.ActivationEligibilityEpoch() + if a == b { + return s.indices[i] < s.indices[j] + } + return a < b +} diff --git a/beacon-chain/core/epoch/sortable_indices_test.go b/beacon-chain/core/epoch/sortable_indices_test.go new file mode 100644 index 000000000000..45c29f0c784b --- /dev/null +++ b/beacon-chain/core/epoch/sortable_indices_test.go @@ -0,0 +1,53 @@ +package epoch + +import ( + "sort" + "testing" + + "github.com/google/go-cmp/cmp" + state_native "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/state-native" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/testing/require" +) + +func TestSortableIndices(t *testing.T) { + st, err := state_native.InitializeFromProtoPhase0(ð.BeaconState{ + Validators: []*eth.Validator{ + {ActivationEligibilityEpoch: 0}, + {ActivationEligibilityEpoch: 5}, + {ActivationEligibilityEpoch: 4}, + {ActivationEligibilityEpoch: 4}, + {ActivationEligibilityEpoch: 2}, + {ActivationEligibilityEpoch: 1}, + }, + }) + require.NoError(t, err) + + s := sortableIndices{ + indices: []primitives.ValidatorIndex{ + 4, + 2, + 5, + 3, + 1, + 0, + }, + state: st, + } + + sort.Sort(s) + + want := []primitives.ValidatorIndex{ + 0, + 5, + 4, + 2, // Validators with the same ActivationEligibilityEpoch are sorted by index, ascending. + 3, + 1, + } + + if !cmp.Equal(s.indices, want) { + t.Errorf("Failed to sort indices correctly, wanted %v, got %v", want, s.indices) + } +} diff --git a/beacon-chain/core/helpers/validators.go b/beacon-chain/core/helpers/validators.go index a45c69562fe6..ccee87add7c5 100644 --- a/beacon-chain/core/helpers/validators.go +++ b/beacon-chain/core/helpers/validators.go @@ -404,11 +404,11 @@ func ComputeProposerIndex(bState state.ReadOnlyValidators, activeIndices []primi // validator.activation_eligibility_epoch == FAR_FUTURE_EPOCH // and validator.effective_balance >= MIN_ACTIVATION_BALANCE # [Modified in Electra:EIP7251] // ) -func IsEligibleForActivationQueue(validator *ethpb.Validator, currentEpoch primitives.Epoch) bool { +func IsEligibleForActivationQueue(validator state.ReadOnlyValidator, currentEpoch primitives.Epoch) bool { if currentEpoch >= params.BeaconConfig().ElectraForkEpoch { - return isEligibleForActivationQueueElectra(validator.ActivationEligibilityEpoch, validator.EffectiveBalance) + return isEligibleForActivationQueueElectra(validator.ActivationEligibilityEpoch(), validator.EffectiveBalance()) } - return isEligibleForActivationQueue(validator.ActivationEligibilityEpoch, validator.EffectiveBalance) + return isEligibleForActivationQueue(validator.ActivationEligibilityEpoch(), validator.EffectiveBalance()) } // isEligibleForActivationQueue carries out the logic for IsEligibleForActivationQueue diff --git a/beacon-chain/core/helpers/validators_test.go b/beacon-chain/core/helpers/validators_test.go index efa21c75cb63..a5df3c6bf83d 100644 --- a/beacon-chain/core/helpers/validators_test.go +++ b/beacon-chain/core/helpers/validators_test.go @@ -743,7 +743,9 @@ func TestIsEligibleForActivationQueue(t *testing.T) { t.Run(tt.name, func(t *testing.T) { helpers.ClearCache() - assert.Equal(t, tt.want, helpers.IsEligibleForActivationQueue(tt.validator, tt.currentEpoch), "IsEligibleForActivationQueue()") + v, err := state_native.NewValidator(tt.validator) + assert.NoError(t, err) + assert.Equal(t, tt.want, helpers.IsEligibleForActivationQueue(v, tt.currentEpoch), "IsEligibleForActivationQueue()") }) } }