Skip to content

Commit

Permalink
Process blocks after ePBS
Browse files Browse the repository at this point in the history
    These are some of the things that are left to be done

    - Process the payload
    - Change stategen to get the poststate of the block and the payload
      separately
    - Change the next slot cache to be safe for full/empty
  • Loading branch information
potuz committed Nov 3, 2024
1 parent ffc5e5c commit 2ee5923
Show file tree
Hide file tree
Showing 22 changed files with 435 additions and 129 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Simplified `ExitedValidatorIndices`.
- Simplified `EjectedValidatorIndices`.
- `engine_newPayloadV4`,`engine_getPayloadV4` are changes due to new execution request serialization decisions, [PR](https://github.com/prysmaticlabs/prysm/pull/14580)
- Use ROBlock earlier in block syncing pipeline.
- Changed the signature of `ProcessPayload`

### Deprecated

Expand Down
53 changes: 36 additions & 17 deletions beacon-chain/blockchain/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,44 @@ func logStateTransitionData(b interfaces.ReadOnlyBeaconBlock) error {
}
log = log.WithField("syncBitsCount", agg.SyncCommitteeBits.Count())
}
if b.Version() >= version.Bellatrix {
p, err := b.Body().Execution()
if b.Version() >= version.EPBS {
sh, err := b.Body().SignedExecutionPayloadHeader()
if err != nil {
return err
}
log = log.WithField("payloadHash", fmt.Sprintf("%#x", bytesutil.Trunc(p.BlockHash())))
txs, err := p.Transactions()
switch {
case errors.Is(err, consensus_types.ErrUnsupportedField):
case err != nil:
header, err := sh.Header()
if err != nil {
return err
default:
log = log.WithField("txCount", len(txs))
txsPerSlotCount.Set(float64(len(txs)))
}
}
if b.Version() >= version.Deneb {
kzgs, err := b.Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to get blob KZG commitments")
} else if len(kzgs) > 0 {
log = log.WithField("kzgCommitmentCount", len(kzgs))
log = log.WithFields(logrus.Fields{"payloadHash": header.BlockHash(),
"builderIndex": header.BuilderIndex(),
"value": header.Value(),
"blobKzgCommitmentsRoot": header.BlobKzgCommitmentsRoot(),
})
} else {
if b.Version() >= version.Bellatrix {
p, err := b.Body().Execution()
if err != nil {
return err
}
log = log.WithField("payloadHash", fmt.Sprintf("%#x", bytesutil.Trunc(p.BlockHash())))
txs, err := p.Transactions()
switch {
case errors.Is(err, consensus_types.ErrUnsupportedField):
case err != nil:
return err
default:
log = log.WithField("txCount", len(txs))
txsPerSlotCount.Set(float64(len(txs)))
}
}
if b.Version() >= version.Deneb {
kzgs, err := b.Body().BlobKzgCommitments()
if err != nil {
log.WithError(err).Error("Failed to get blob KZG commitments")
} else if len(kzgs) > 0 {
log = log.WithField("kzgCommitmentCount", len(kzgs))
}
}
}
log.Info("Finished applying state transition")
Expand Down Expand Up @@ -112,6 +128,9 @@ func logBlockSyncStatus(block interfaces.ReadOnlyBeaconBlock, blockRoot [32]byte

// logs payload related data every slot.
func logPayload(block interfaces.ReadOnlyBeaconBlock) error {
if block.Version() >= version.EPBS {
return nil
}
isExecutionBlk, err := blocks.IsExecutionBlock(block.Body())
if err != nil {
return errors.Wrap(err, "could not determine if block is execution block")
Expand Down
7 changes: 6 additions & 1 deletion beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {
fcuArgs := &fcuConfig{}

if s.inRegularSync() {
defer s.handleSecondFCUCall(cfg, fcuArgs)
if cfg.roblock.Version() < version.EPBS {
defer s.handleSecondFCUCall(cfg, fcuArgs)
}
}
defer s.sendLightClientFeeds(cfg)
defer s.sendStateFeedOnBlock(cfg)
Expand Down Expand Up @@ -99,6 +101,9 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error {
s.logNonCanonicalBlockReceived(cfg.roblock.Root(), cfg.headRoot)
return nil
}
if cfg.roblock.Version() >= version.EPBS {
return nil
}
if err := s.getFCUArgs(cfg, fcuArgs); err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
return nil
Expand Down
12 changes: 9 additions & 3 deletions beacon-chain/blockchain/process_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,9 @@ func TestStore_NoViableHead_NewPayload(t *testing.T) {
require.NoError(t, err)
preStateVersion, preStateHeader, err := getStateVersionAndPayload(preState)
require.NoError(t, err)
_, err = service.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, wsb, root)
rowsb, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
_, err = service.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, rowsb)
require.ErrorContains(t, "received an INVALID payload from execution engine", err)
// Check that forkchoice's head and store's headroot are the previous head (since the invalid block did
// not finish importing and it was never imported to forkchoice). Check
Expand Down Expand Up @@ -1714,7 +1716,9 @@ func TestStore_NoViableHead_Liveness(t *testing.T) {
require.NoError(t, err)
preStateVersion, preStateHeader, err := getStateVersionAndPayload(preState)
require.NoError(t, err)
_, err = service.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, wsb, root)
rowsb, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
_, err = service.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, rowsb)
require.ErrorContains(t, "received an INVALID payload from execution engine", err)

// Check that forkchoice's head and store's headroot are the previous head (since the invalid block did
Expand Down Expand Up @@ -1964,7 +1968,9 @@ func TestNoViableHead_Reboot(t *testing.T) {
require.NoError(t, err)
preStateVersion, preStateHeader, err := getStateVersionAndPayload(preState)
require.NoError(t, err)
_, err = service.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, wsb, root)
rowsb, err := consensusblocks.NewROBlockWithRoot(wsb, root)
require.NoError(t, err)
_, err = service.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, rowsb)
require.ErrorContains(t, "received an INVALID payload from execution engine", err)

// Check that the headroot/state are not in DB and restart the node
Expand Down
49 changes: 33 additions & 16 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
consensus_blocks "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
consensusblocks "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
Expand Down Expand Up @@ -96,13 +97,34 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
}

currentCheckpoints := s.saveCurrentCheckpoints(preState)
postState, isValidPayload, err := s.validateExecutionAndConsensus(ctx, preState, blockCopy, blockRoot)
roblock, err := consensus_blocks.NewROBlockWithRoot(blockCopy, blockRoot)
if err != nil {
return err
}
daWaitedTime, err := s.handleDA(ctx, blockCopy, blockRoot, avs)
if err != nil {
return err
var postState state.BeaconState
var isValidPayload bool
var daWaitedTime time.Duration
if blockCopy.Version() >= version.EPBS {
postState, err = s.validateStateTransition(ctx, preState, roblock)
if err != nil {
return errors.Wrap(err, "could not validate state transition")
}
optimistic, err := s.IsOptimisticForRoot(ctx, roblock.Block().ParentRoot())
if err != nil {
return errors.Wrap(err, "could not check if parent is optimistic")
}
// if the parent is not optimistic then we can set the block as
// not optimistic.
isValidPayload = !optimistic
} else {
postState, isValidPayload, err = s.validateExecutionAndConsensus(ctx, preState, roblock)
if err != nil {
return err
}
daWaitedTime, err = s.handleDA(ctx, blockCopy, blockRoot, avs)
if err != nil {
return err
}
}
// Defragment the state before continuing block processing.
s.defragmentState(postState)
Expand All @@ -113,10 +135,6 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
if err := s.savePostStateInfo(ctx, blockRoot, blockCopy, postState); err != nil {
return errors.Wrap(err, "could not save post state info")
}
roblock, err := consensus_blocks.NewROBlockWithRoot(blockCopy, blockRoot)
if err != nil {
return err
}
args := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
Expand Down Expand Up @@ -200,8 +218,7 @@ func (s *Service) updateCheckpoints(
func (s *Service) validateExecutionAndConsensus(
ctx context.Context,
preState state.BeaconState,
block interfaces.SignedBeaconBlock,
blockRoot [32]byte,
block consensusblocks.ROBlock,
) (state.BeaconState, bool, error) {
preStateVersion, preStateHeader, err := getStateVersionAndPayload(preState)
if err != nil {
Expand All @@ -220,7 +237,7 @@ func (s *Service) validateExecutionAndConsensus(
var isValidPayload bool
eg.Go(func() error {
var err error
isValidPayload, err = s.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, block, blockRoot)
isValidPayload, err = s.validateExecutionOnBlock(ctx, preStateVersion, preStateHeader, block)
if err != nil {
return errors.Wrap(err, "could not notify the engine of the new payload")
}
Expand Down Expand Up @@ -571,16 +588,16 @@ func (s *Service) sendBlockAttestationsToSlasher(signed interfaces.ReadOnlySigne
}

// validateExecutionOnBlock notifies the engine of the incoming block execution payload and returns true if the payload is valid
func (s *Service) validateExecutionOnBlock(ctx context.Context, ver int, header interfaces.ExecutionData, signed interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) (bool, error) {
isValidPayload, err := s.notifyNewPayload(ctx, ver, header, signed)
func (s *Service) validateExecutionOnBlock(ctx context.Context, ver int, header interfaces.ExecutionData, block consensusblocks.ROBlock) (bool, error) {
isValidPayload, err := s.notifyNewPayload(ctx, ver, header, block)
if err != nil {
s.cfg.ForkChoiceStore.Lock()
err = s.handleInvalidExecutionError(ctx, err, blockRoot, signed.Block().ParentRoot())
err = s.handleInvalidExecutionError(ctx, err, block.Root(), block.Block().ParentRoot())
s.cfg.ForkChoiceStore.Unlock()
return false, err
}
if signed.Version() < version.Capella && isValidPayload {
if err := s.validateMergeTransitionBlock(ctx, ver, header, signed); err != nil {
if block.Block().Version() < version.Capella && isValidPayload {
if err := s.validateMergeTransitionBlock(ctx, ver, header, block); err != nil {
return isValidPayload, err
}
}
Expand Down
14 changes: 7 additions & 7 deletions beacon-chain/core/blocks/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,24 +202,24 @@ func ValidatePayload(st state.BeaconState, payload interfaces.ExecutionData) err
// block_hash=payload.block_hash,
// transactions_root=hash_tree_root(payload.transactions),
// )
func ProcessPayload(st state.BeaconState, body interfaces.ReadOnlyBeaconBlockBody) (state.BeaconState, error) {
func ProcessPayload(st state.BeaconState, body interfaces.ReadOnlyBeaconBlockBody) error {
payload, err := body.Execution()
if err != nil {
return nil, err
return err
}
if err := verifyBlobCommitmentCount(body); err != nil {
return nil, err
return err
}
if err := ValidatePayloadWhenMergeCompletes(st, payload); err != nil {
return nil, err
return err
}
if err := ValidatePayload(st, payload); err != nil {
return nil, err
return err
}
if err := st.SetLatestExecutionPayloadHeader(payload); err != nil {
return nil, err
return err
}
return st, nil
return nil
}

func verifyBlobCommitmentCount(body interfaces.ReadOnlyBeaconBlockBody) error {
Expand Down
9 changes: 3 additions & 6 deletions beacon-chain/core/blocks/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,7 @@ func Test_ProcessPayload(t *testing.T) {
ExecutionPayload: tt.payload,
})
require.NoError(t, err)
st, err := blocks.ProcessPayload(st, body)
if err != nil {
if err := blocks.ProcessPayload(st, body); err != nil {
require.Equal(t, tt.err.Error(), err.Error())
} else {
require.Equal(t, tt.err, err)
Expand Down Expand Up @@ -619,8 +618,7 @@ func Test_ProcessPayloadCapella(t *testing.T) {
ExecutionPayload: payload,
})
require.NoError(t, err)
_, err = blocks.ProcessPayload(st, body)
require.NoError(t, err)
require.NoError(t, blocks.ProcessPayload(st, body))
}

func Test_ProcessPayload_Blinded(t *testing.T) {
Expand Down Expand Up @@ -677,8 +675,7 @@ func Test_ProcessPayload_Blinded(t *testing.T) {
ExecutionPayloadHeader: p,
})
require.NoError(t, err)
st, err := blocks.ProcessPayload(st, body)
if err != nil {
if err := blocks.ProcessPayload(st, body); err != nil {
require.Equal(t, tt.err.Error(), err.Error())
} else {
require.Equal(t, tt.err, err)
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/core/electra/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ go_library(
"effective_balance_updates.go",
"registry_updates.go",
"transition.go",
"transition_no_verify_sig.go",
"upgrade.go",
"validator.go",
"withdrawals.go",
Expand Down
14 changes: 14 additions & 0 deletions beacon-chain/core/epbs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,28 @@ go_library(
srcs = [
"attestation.go",
"execution_payload_envelope.go",
"execution_payload_header.go",
"operations.go",
"payload_attestation.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/epbs",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/electra:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/validators:go_default_library",
"//beacon-chain/state:go_default_library",
"//config/params:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//network/forks:go_default_library",
"//proto/engine/v1:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
)
Expand Down
51 changes: 51 additions & 0 deletions beacon-chain/core/epbs/execution_payload_header.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package epbs

import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/crypto/bls"
"github.com/prysmaticlabs/prysm/v5/network/forks"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)

// ValidatePayloadHeaderSignature validates the signature of the execution payload header.
func ValidatePayloadHeaderSignature(st state.ReadOnlyBeaconState, sh interfaces.ROSignedExecutionPayloadHeader) error {
h, err := sh.Header()
if err != nil {
return err
}

pubkey := st.PubkeyAtIndex(h.BuilderIndex())
pub, err := bls.PublicKeyFromBytes(pubkey[:])
if err != nil {
return err
}

s := sh.Signature()
sig, err := bls.SignatureFromBytes(s[:])
if err != nil {
return err
}

currentEpoch := slots.ToEpoch(h.Slot())
f, err := forks.Fork(currentEpoch)
if err != nil {
return err
}

domain, err := signing.Domain(f, currentEpoch, params.BeaconConfig().DomainBeaconBuilder, st.GenesisValidatorsRoot())
if err != nil {
return err
}
root, err := sh.SigningRoot(domain)
if err != nil {
return err
}
if !sig.Verify(pub, root[:]) {
return signing.ErrSigFailedToVerify
}

return nil
}
Loading

0 comments on commit 2ee5923

Please sign in to comment.