Skip to content

Commit

Permalink
Merge branch 'develop' into return-early-blob-constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
terencechain authored Nov 6, 2024
2 parents 935fef0 + b87d02e commit 282a5f8
Show file tree
Hide file tree
Showing 27 changed files with 265 additions and 78 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Updated the `beacon-chain/monitor` package to Electra. [PR](https://github.com/prysmaticlabs/prysm/pull/14562)
- Added ListAttestationsV2 endpoint.
- Add ability to rollback node's internal state during processing.
- Change how unsafe protobuf state is created to prevent unnecessary copies.
- Added benchmarks for process slots for Capella, Deneb, Electra

### Changed

Expand All @@ -32,12 +34,19 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Use read only validator for core processing to avoid unnecessary copying.
- Use ROBlock across block processing pipeline.
- Added missing Eth-Consensus-Version headers to GetBlockAttestationsV2 and GetAttesterSlashingsV2 endpoints.
- When instantiating new validators, explicit set `Slashed` to false and move `EffectiveBalance` to match struct definition.
- Updated pgo profile for beacon chain with holesky data. This improves the profile guided
optimizations in the go compiler.
- Use read only state when computing the active validator list.
- Simplified `ExitedValidatorIndices`.
- Simplified `EjectedValidatorIndices`.
- `engine_newPayloadV4`,`engine_getPayloadV4` are changes due to new execution request serialization decisions, [PR](https://github.com/prysmaticlabs/prysm/pull/14580)
- Fixed various small things in state-native code.
- Use ROBlock earlier in block syncing pipeline.
- Changed the signature of `ProcessPayload`.
- Only Build the Protobuf state once during serialization.
- Capella blocks are execution.
- Fixed panic when http request to subscribe to event stream fails.
- Return early for blob reconstructor during capella fork

### Deprecated
Expand Down Expand Up @@ -176,6 +185,7 @@ Updating to this release is recommended at your convenience.
- Light client support: fix light client attested header execution fields' wrong version bug.
- Testing: added custom matcher for better push settings testing.
- Registered `GetDepositSnapshot` Beacon API endpoint.
- Fix rolling back of a block due to a context deadline.

### Security

Expand Down
1 change: 1 addition & 0 deletions api/client/event/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
EventType: EventConnectionError,
Data: []byte(errors.Wrap(err, client.ErrConnectionIssue.Error()).Error()),
}
return
}

defer func() {
Expand Down
22 changes: 21 additions & 1 deletion api/client/event/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestNewEventStream(t *testing.T) {

func TestEventStream(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) {
mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, _ *http.Request) {
flusher, ok := w.(http.Flusher)
require.Equal(t, true, ok)
for i := 1; i <= 3; i++ {
Expand Down Expand Up @@ -79,3 +79,23 @@ func TestEventStream(t *testing.T) {
}
}
}

func TestEventStreamRequestError(t *testing.T) {
topics := []string{"head"}
eventsChannel := make(chan *Event, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// use valid url that will result in failed request with nil body
stream, err := NewEventStream(ctx, http.DefaultClient, "http://badhost:1234", topics)
require.NoError(t, err)

// error will happen when request is made, should be received over events channel
go stream.Subscribe(eventsChannel)

event := <-eventsChannel
if event.EventType != EventConnectionError {
t.Errorf("Expected event type %q, got %q", EventConnectionError, event.EventType)
}

}
2 changes: 2 additions & 0 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {

err := s.cfg.ForkChoiceStore.InsertNode(ctx, cfg.postState, cfg.roblock)
if err != nil {
// Do not use parent context in the event it deadlined
ctx = trace.NewContext(context.Background(), span)
s.rollbackBlock(ctx, cfg.roblock.Root())
return errors.Wrapf(err, "could not insert block %d to fork choice store", cfg.roblock.Block().Slot())
}
Expand Down
91 changes: 88 additions & 3 deletions beacon-chain/blockchain/process_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,9 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
require.NoError(t, err)
preStateVersion, preStateHeader, err := getStateVersionAndPayload(preState)
require.NoError(t, err)
_, err = service.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, wsb, root)
rowsb, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
_, err = service.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, rowsb)
require.ErrorContains(t, "received an INVALID payload from execution engine", err)
// Check that forkchoice's head and store's headroot are the previous head (since the invalid block did
// not finish importing and it was never imported to forkchoice). Check
Expand Down Expand Up @@ -1714,7 +1716,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
require.NoError(t, err)
preStateVersion, preStateHeader, err := getStateVersionAndPayload(preState)
require.NoError(t, err)
_, err = service.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, wsb, root)
rowsb, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
_, err = service.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, rowsb)
require.ErrorContains(t, "received an INVALID payload from execution engine", err)

// Check that forkchoice's head and store's headroot are the previous head (since the invalid block did
Expand Down Expand Up @@ -1964,7 +1968,9 @@ func TestNoViableHead_Reboot(t *testing.T) {
require.NoError(t, err)
preStateVersion, preStateHeader, err := getStateVersionAndPayload(preState)
require.NoError(t, err)
_, err = service.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, wsb, root)
rowsb, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
_, err = service.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, rowsb)
require.ErrorContains(t, "received an INVALID payload from execution engine", err)

// Check that the headroot/state are not in DB and restart the node
Expand Down Expand Up @@ -2346,6 +2352,85 @@ func TestRollbackBlock(t *testing.T) {
require.Equal(t, false, hasState)
}

func TestRollbackBlock_ContextDeadline(t *testing.T) {
service, tr := minimalTestService(t)
ctx := tr.ctx

st, keys := util.DeterministicGenesisState(t, 64)
stateRoot, err := st.HashTreeRoot(ctx)
require.NoError(t, err, "Could not hash genesis state")

require.NoError(t, service.saveGenesisData(ctx, st))

genesis := blocks.NewGenesisBlock(stateRoot[:])
wsb, err := consensusblocks.NewSignedBeaconBlock(genesis)
require.NoError(t, err)
require.NoError(t, service.cfg.BeaconDB.SaveBlock(ctx, wsb), "Could not save genesis block")
parentRoot, err := genesis.Block.HashTreeRoot()
require.NoError(t, err, "Could not get signing root")
require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, st, parentRoot), "Could not save genesis state")
require.NoError(t, service.cfg.BeaconDB.SaveHeadBlockRoot(ctx, parentRoot), "Could not save genesis state")
require.NoError(t, service.cfg.BeaconDB.SaveJustifiedCheckpoint(ctx, &ethpb.Checkpoint{Root: parentRoot[:]}))
require.NoError(t, service.cfg.BeaconDB.SaveFinalizedCheckpoint(ctx, &ethpb.Checkpoint{Root: parentRoot[:]}))

st, err = service.HeadState(ctx)
require.NoError(t, err)
b, err := util.GenerateFullBlock(st, keys, util.DefaultBlockGenConfig(), 33)
require.NoError(t, err)
wsb, err = consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
root, err := b.Block.HashTreeRoot()
require.NoError(t, err)
preState, err := service.getBlockPreState(ctx, wsb.Block())
require.NoError(t, err)
postState, err := service.validateStateTransition(ctx, preState, wsb)
require.NoError(t, err)
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))
roblock, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
require.NoError(t, service.postBlockProcess(&postBlockProcessConfig{ctx, roblock, [32]byte{}, postState, false}))

b, err = util.GenerateFullBlock(postState, keys, util.DefaultBlockGenConfig(), 34)
require.NoError(t, err)
wsb, err = consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
root, err = b.Block.HashTreeRoot()
require.NoError(t, err)
preState, err = service.getBlockPreState(ctx, wsb.Block())
require.NoError(t, err)
postState, err = service.validateStateTransition(ctx, preState, wsb)
require.NoError(t, err)
require.NoError(t, service.savePostStateInfo(ctx, root, wsb, postState))

require.Equal(t, true, service.cfg.BeaconDB.HasBlock(ctx, root))
hasState, err := service.cfg.StateGen.HasState(ctx, root)
require.NoError(t, err)
require.Equal(t, true, hasState)

// Set deadlined context when processing the block
cancCtx, canc := context.WithCancel(context.Background())
canc()
roblock, err = consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)

parentRoot = roblock.Block().ParentRoot()

cj := &ethpb.Checkpoint{}
cj.Epoch = 1
cj.Root = parentRoot[:]
require.NoError(t, postState.SetCurrentJustifiedCheckpoint(cj))
require.NoError(t, postState.SetFinalizedCheckpoint(cj))

// Rollback block insertion into db and caches.
require.ErrorContains(t, "context canceled", service.postBlockProcess(&postBlockProcessConfig{cancCtx, roblock, [32]byte{}, postState, false}))

// The block should no longer exist.
require.Equal(t, false, service.cfg.BeaconDB.HasBlock(ctx, root))
hasState, err = service.cfg.StateGen.HasState(ctx, root)
require.NoError(t, err)
require.Equal(t, false, hasState)
}

func fakeCommitments(n int) [][]byte {
f := make([][]byte, n)
for i := range f {
Expand Down
27 changes: 14 additions & 13 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
consensus_blocks "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
consensusblocks "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
Expand Down Expand Up @@ -84,7 +85,12 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
}

currentCheckpoints := s.saveCurrentCheckpoints(preState)
postState, isValidPayload, err := s.validateExecutionAndConsensus(ctx, preState, blockCopy, blockRoot)
roblock, err := consensus_blocks.NewROBlockWithRoot(blockCopy, blockRoot)
if err != nil {
return err
}

postState, isValidPayload, err := s.validateExecutionAndConsensus(ctx, preState, roblock)
if err != nil {
return err
}
Expand All @@ -101,10 +107,6 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
if err := s.savePostStateInfo(ctx, blockRoot, blockCopy, postState); err != nil {
return errors.Wrap(err, "could not save post state info")
}
roblock, err := consensus_blocks.NewROBlockWithRoot(blockCopy, blockRoot)
if err != nil {
return err
}
args := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
Expand Down Expand Up @@ -188,8 +190,7 @@ func (s *Service) updateCheckpoints(
func (s *Service) validateExecutionAndConsensus(
ctx context.Context,
preState state.BeaconState,
block interfaces.SignedBeaconBlock,
blockRoot [32]byte,
block consensusblocks.ROBlock,
) (state.BeaconState, bool, error) {
preStateVersion, preStateHeader, err := getStateVersionAndPayload(preState)
if err != nil {
Expand All @@ -208,7 +209,7 @@ func (s *Service) validateExecutionAndConsensus(
var isValidPayload bool
eg.Go(func() error {
var err error
isValidPayload, err = s.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, block, blockRoot)
isValidPayload, err = s.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, block)
if err != nil {
return errors.Wrap(err, "could not notify the engine of the new payload")
}
Expand Down Expand Up @@ -559,16 +560,16 @@ func (s *Service) sendBlockAttestationsToSlasher(signed interfaces.ReadOnlySigne
}

// validateExecutionOnBlock notifies the engine of the incoming block execution payload and returns true if the payload is valid
func (s *Service) validateExecutionOnBlock(ctx context.Context, ver int, header interfaces.ExecutionData, signed interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) (bool, error) {
isValidPayload, err := s.notifyNewPayload(ctx, ver, header, signed)
func (s *Service) validateExecutionOnBlock(ctx context.Context, ver int, header interfaces.ExecutionData, block consensusblocks.ROBlock) (bool, error) {
isValidPayload, err := s.notifyNewPayload(ctx, ver, header, block)
if err != nil {
s.cfg.ForkChoiceStore.Lock()
err = s.handleInvalidExecutionError(ctx, err, blockRoot, signed.Block().ParentRoot())
err = s.handleInvalidExecutionError(ctx, err, block.Root(), block.Block().ParentRoot())
s.cfg.ForkChoiceStore.Unlock()
return false, err
}
if signed.Version() < version.Capella && isValidPayload {
if err := s.validateMergeTransitionBlock(ctx, ver, header, signed); err != nil {
if block.Block().Version() < version.Capella && isValidPayload {
if err := s.validateMergeTransitionBlock(ctx, ver, header, block); err != nil {
return isValidPayload, err
}
}
Expand Down
6 changes: 4 additions & 2 deletions beacon-chain/core/altair/deposit.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,12 @@ func AddValidatorToRegistry(beaconState state.BeaconState, pubKey []byte, withdr
// return Validator(
// pubkey=pubkey,
// withdrawal_credentials=withdrawal_credentials,
// effective_balance=effective_balance,
// slashed=False,
// activation_eligibility_epoch=FAR_FUTURE_EPOCH,
// activation_epoch=FAR_FUTURE_EPOCH,
// exit_epoch=FAR_FUTURE_EPOCH,
// withdrawable_epoch=FAR_FUTURE_EPOCH,
// effective_balance=effective_balance,
// )
func GetValidatorFromDeposit(pubKey []byte, withdrawalCredentials []byte, amount uint64) *ethpb.Validator {
effectiveBalance := amount - (amount % params.BeaconConfig().EffectiveBalanceIncrement)
Expand All @@ -202,10 +203,11 @@ func GetValidatorFromDeposit(pubKey []byte, withdrawalCredentials []byte, amount
return &ethpb.Validator{
PublicKey: pubKey,
WithdrawalCredentials: withdrawalCredentials,
EffectiveBalance: effectiveBalance,
Slashed: false,
ActivationEligibilityEpoch: params.BeaconConfig().FarFutureEpoch,
ActivationEpoch: params.BeaconConfig().FarFutureEpoch,
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
WithdrawableEpoch: params.BeaconConfig().FarFutureEpoch,
EffectiveBalance: effectiveBalance,
}
}
17 changes: 10 additions & 7 deletions beacon-chain/core/blocks/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ func IsExecutionBlock(body interfaces.ReadOnlyBeaconBlockBody) (bool, error) {
if body == nil {
return false, errors.New("nil block body")
}
if body.Version() >= version.Capella {
return true, nil
}
payload, err := body.Execution()
switch {
case errors.Is(err, consensus_types.ErrUnsupportedField):
Expand Down Expand Up @@ -202,24 +205,24 @@ func ValidatePayload(st state.BeaconState, payload interfaces.ExecutionData) err
// block_hash=payload.block_hash,
// transactions_root=hash_tree_root(payload.transactions),
// )
func ProcessPayload(st state.BeaconState, body interfaces.ReadOnlyBeaconBlockBody) (state.BeaconState, error) {
func ProcessPayload(st state.BeaconState, body interfaces.ReadOnlyBeaconBlockBody) error {
payload, err := body.Execution()
if err != nil {
return nil, err
return err
}
if err := verifyBlobCommitmentCount(body); err != nil {
return nil, err
return err
}
if err := ValidatePayloadWhenMergeCompletes(st, payload); err != nil {
return nil, err
return err
}
if err := ValidatePayload(st, payload); err != nil {
return nil, err
return err
}
if err := st.SetLatestExecutionPayloadHeader(payload); err != nil {
return nil, err
return err
}
return st, nil
return nil
}

func verifyBlobCommitmentCount(body interfaces.ReadOnlyBeaconBlockBody) error {
Expand Down
12 changes: 5 additions & 7 deletions beacon-chain/core/blocks/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ func Test_IsExecutionBlockCapella(t *testing.T) {
require.NoError(t, err)
got, err := blocks.IsExecutionBlock(wrappedBlock.Body())
require.NoError(t, err)
require.Equal(t, false, got)
// #14614
require.Equal(t, true, got)
}

func Test_IsExecutionEnabled(t *testing.T) {
Expand Down Expand Up @@ -587,8 +588,7 @@ func Test_ProcessPayload(t *testing.T) {
ExecutionPayload: tt.payload,
})
require.NoError(t, err)
st, err := blocks.ProcessPayload(st, body)
if err != nil {
if err := blocks.ProcessPayload(st, body); err != nil {
require.Equal(t, tt.err.Error(), err.Error())
} else {
require.Equal(t, tt.err, err)
Expand Down Expand Up @@ -619,8 +619,7 @@ func Test_ProcessPayloadCapella(t *testing.T) {
ExecutionPayload: payload,
})
require.NoError(t, err)
_, err = blocks.ProcessPayload(st, body)
require.NoError(t, err)
require.NoError(t, blocks.ProcessPayload(st, body))
}

func Test_ProcessPayload_Blinded(t *testing.T) {
Expand Down Expand Up @@ -677,8 +676,7 @@ func Test_ProcessPayload_Blinded(t *testing.T) {
ExecutionPayloadHeader: p,
})
require.NoError(t, err)
st, err := blocks.ProcessPayload(st, body)
if err != nil {
if err := blocks.ProcessPayload(st, body); err != nil {
require.Equal(t, tt.err.Error(), err.Error())
} else {
require.Equal(t, tt.err, err)
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/core/blocks/withdrawals.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func ProcessWithdrawals(st state.BeaconState, executionData interfaces.Execution
}

if st.Version() >= version.Electra {
if err := st.DequeuePartialWithdrawals(processedPartialWithdrawalsCount); err != nil {
if err := st.DequeuePendingPartialWithdrawals(processedPartialWithdrawalsCount); err != nil {
return nil, fmt.Errorf("unable to dequeue partial withdrawals from state: %w", err)
}
}
Expand Down
Loading

0 comments on commit 282a5f8

Please sign in to comment.