diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 2dc6f11a87f..bb7d4c728d6 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -59,6 +59,7 @@ go_library( "//beacon-chain/forkchoice:go_default_library", "//beacon-chain/forkchoice/doubly-linked-tree:go_default_library", "//beacon-chain/forkchoice/types:go_default_library", + "//beacon-chain/light-client:go_default_library", "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/operations/blstoexec:go_default_library", "//beacon-chain/operations/slashings:go_default_library", diff --git a/beacon-chain/blockchain/options.go b/beacon-chain/blockchain/options.go index 38492502a1f..62c6115562b 100644 --- a/beacon-chain/blockchain/options.go +++ b/beacon-chain/blockchain/options.go @@ -8,6 +8,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution" "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice" + light_client "github.com/prysmaticlabs/prysm/v5/beacon-chain/light-client" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/blstoexec" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/slashings" @@ -205,3 +206,10 @@ func WithSyncChecker(checker Checker) Option { return nil } } + +func WithLightClientStore(lcs *light_client.Store) Option { + return func(s *Service) error { + s.lcStore = lcs + return nil + } +} diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 7a55c3f83fb..896472e75bd 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -69,7 +69,9 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error { if s.inRegularSync() { defer s.handleSecondFCUCall(cfg, fcuArgs) } - defer s.sendLightClientFeeds(cfg) + if features.Get().EnableLightClient && slots.ToEpoch(s.CurrentSlot()) >= params.BeaconConfig().AltairForkEpoch { + defer s.processLightClientUpdates(cfg) + } defer s.sendStateFeedOnBlock(cfg) defer reportProcessingTime(startTime) defer reportAttestationInclusion(cfg.roblock.Block()) diff --git a/beacon-chain/blockchain/process_block_helpers.go b/beacon-chain/blockchain/process_block_helpers.go index 04289145e07..7b9a104b222 100644 --- a/beacon-chain/blockchain/process_block_helpers.go +++ b/beacon-chain/blockchain/process_block_helpers.go @@ -15,7 +15,6 @@ import ( doublylinkedtree "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/doubly-linked-tree" forkchoicetypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/types" "github.com/prysmaticlabs/prysm/v5/beacon-chain/state" - "github.com/prysmaticlabs/prysm/v5/config/features" field_params "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" consensus_blocks "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" @@ -114,69 +113,30 @@ func (s *Service) sendStateFeedOnBlock(cfg *postBlockProcessConfig) { }) } -// sendLightClientFeeds sends the light client feeds when feature flag is enabled. -func (s *Service) sendLightClientFeeds(cfg *postBlockProcessConfig) { - if features.Get().EnableLightClient { - if _, err := s.sendLightClientOptimisticUpdate(cfg.ctx, cfg.roblock, cfg.postState); err != nil { - log.WithError(err).Error("Failed to send light client optimistic update") - } - - // Get the finalized checkpoint - finalized := s.ForkChoicer().FinalizedCheckpoint() - - // LightClientFinalityUpdate needs super majority - s.tryPublishLightClientFinalityUpdate(cfg.ctx, cfg.roblock, finalized, cfg.postState) +func (s *Service) processLightClientUpdates(cfg *postBlockProcessConfig) { + if err := s.processLightClientOptimisticUpdate(cfg.ctx, cfg.roblock, cfg.postState); err != nil { + log.WithError(err).Error("Failed to process light client optimistic update") + } + if err := s.processLightClientFinalityUpdate(cfg.ctx, cfg.roblock, cfg.postState); err != nil { + log.WithError(err).Error("Failed to process light client finality update") } } -func (s *Service) tryPublishLightClientFinalityUpdate( +func (s *Service) processLightClientFinalityUpdate( ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock, - finalized *forkchoicetypes.Checkpoint, postState state.BeaconState, -) { - if finalized.Epoch <= s.lastPublishedLightClientEpoch { - return - } - - config := params.BeaconConfig() - if finalized.Epoch < config.AltairForkEpoch { - return - } - - syncAggregate, err := signed.Block().Body().SyncAggregate() - if err != nil || syncAggregate == nil { - return - } - - // LightClientFinalityUpdate needs super majority - if syncAggregate.SyncCommitteeBits.Count()*3 < config.SyncCommitteeSize*2 { - return - } - - _, err = s.sendLightClientFinalityUpdate(ctx, signed, postState) - if err != nil { - log.WithError(err).Error("Failed to send light client finality update") - } else { - s.lastPublishedLightClientEpoch = finalized.Epoch - } -} - -// sendLightClientFinalityUpdate sends a light client finality update notification to the state feed. -func (s *Service) sendLightClientFinalityUpdate(ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock, - postState state.BeaconState) (int, error) { - // Get attested state +) error { attestedRoot := signed.Block().ParentRoot() attestedBlock, err := s.cfg.BeaconDB.Block(ctx, attestedRoot) if err != nil { - return 0, errors.Wrap(err, "could not get attested block") + return errors.Wrap(err, "could not get attested block") } attestedState, err := s.cfg.StateGen.StateByRoot(ctx, attestedRoot) if err != nil { - return 0, errors.Wrap(err, "could not get attested state") + return errors.Wrap(err, "could not get attested state") } - // Get finalized block var finalizedBlock interfaces.ReadOnlySignedBeaconBlock finalizedCheckPoint := attestedState.FinalizedCheckpoint() if finalizedCheckPoint != nil { @@ -197,28 +157,51 @@ func (s *Service) sendLightClientFinalityUpdate(ctx context.Context, signed inte finalizedBlock, ) if err != nil { - return 0, errors.Wrap(err, "could not create light client update") + return errors.Wrap(err, "could not create light client update") } - // Send event - return s.cfg.StateNotifier.StateFeed().Send(&feed.Event{ + maxActiveParticipants := update.SyncAggregate().SyncCommitteeBits.Len() + numActiveParticipants := update.SyncAggregate().SyncCommitteeBits.Count() + hasSupermajority := numActiveParticipants*3 >= maxActiveParticipants*2 + + last := s.lcStore.LastLCFinalityUpdate + if last != nil { + // The finalized_header.beacon.slot is greater than that of all previously forwarded finality_updates, + // or it matches the highest previously forwarded slot and also has a sync_aggregate indicating supermajority (> 2/3) + // sync committee participation while the previously forwarded finality_update for that slot did not indicate supermajority + slot := last.FinalizedHeader().Beacon().Slot + lastMaxActiveParticipants := last.SyncAggregate().SyncCommitteeBits.Len() + lastNumActiveParticipants := last.SyncAggregate().SyncCommitteeBits.Count() + lastHasSupermajority := lastNumActiveParticipants*3 >= lastMaxActiveParticipants*2 + + if update.FinalizedHeader().Beacon().Slot < slot { + return nil + } + if update.FinalizedHeader().Beacon().Slot == slot && (lastHasSupermajority || !hasSupermajority) { + return nil + } + } + + s.lcStore.LastLCFinalityUpdate = update + + s.cfg.StateNotifier.StateFeed().Send(&feed.Event{ Type: statefeed.LightClientFinalityUpdate, Data: update, - }), nil + }) + + return nil } -// sendLightClientOptimisticUpdate sends a light client optimistic update notification to the state feed. -func (s *Service) sendLightClientOptimisticUpdate(ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock, - postState state.BeaconState) (int, error) { - // Get attested state +func (s *Service) processLightClientOptimisticUpdate(ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock, + postState state.BeaconState) error { attestedRoot := signed.Block().ParentRoot() attestedBlock, err := s.cfg.BeaconDB.Block(ctx, attestedRoot) if err != nil { - return 0, errors.Wrap(err, "could not get attested block") + return errors.Wrap(err, "could not get attested block") } attestedState, err := s.cfg.StateGen.StateByRoot(ctx, attestedRoot) if err != nil { - return 0, errors.Wrap(err, "could not get attested state") + return errors.Wrap(err, "could not get attested state") } update, err := lightclient.NewLightClientOptimisticUpdateFromBeaconState( @@ -230,13 +213,25 @@ func (s *Service) sendLightClientOptimisticUpdate(ctx context.Context, signed in attestedBlock, ) if err != nil { - return 0, errors.Wrap(err, "could not create light client update") + return errors.Wrap(err, "could not create light client update") } - return s.cfg.StateNotifier.StateFeed().Send(&feed.Event{ + last := s.lcStore.LastLCOptimisticUpdate + if last != nil { + // The attested_header.beacon.slot is greater than that of all previously forwarded optimistic_updates + if update.AttestedHeader().Beacon().Slot <= last.AttestedHeader().Beacon().Slot { + return nil + } + } + + s.lcStore.LastLCOptimisticUpdate = update + + s.cfg.StateNotifier.StateFeed().Send(&feed.Event{ Type: statefeed.LightClientOptimisticUpdate, Data: update, - }), nil + }) + + return nil } // updateCachesPostBlockProcessing updates the next slot cache and handles the epoch diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index c984a2f7975..6494b80a746 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -24,6 +24,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution" f "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice" forkchoicetypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/types" + light_client "github.com/prysmaticlabs/prysm/v5/beacon-chain/light-client" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/blstoexec" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/slashings" @@ -38,7 +39,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" consensus_blocks "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" - "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" @@ -49,25 +49,25 @@ import ( // Service represents a service that handles the internal // logic of managing the full PoS beacon chain. type Service struct { - cfg *config - ctx context.Context - cancel context.CancelFunc - genesisTime time.Time - head *head - headLock sync.RWMutex - originBlockRoot [32]byte // genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized - boundaryRoots [][32]byte - checkpointStateCache *cache.CheckpointStateCache - initSyncBlocks map[[32]byte]interfaces.ReadOnlySignedBeaconBlock - initSyncBlocksLock sync.RWMutex - wsVerifier *WeakSubjectivityVerifier - clockSetter startup.ClockSetter - clockWaiter startup.ClockWaiter - syncComplete chan struct{} - blobNotifiers *blobNotifierMap - blockBeingSynced *currentlySyncingBlock - blobStorage *filesystem.BlobStorage - lastPublishedLightClientEpoch primitives.Epoch + cfg *config + ctx context.Context + cancel context.CancelFunc + genesisTime time.Time + head *head + headLock sync.RWMutex + originBlockRoot [32]byte // genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized + boundaryRoots [][32]byte + checkpointStateCache *cache.CheckpointStateCache + initSyncBlocks map[[32]byte]interfaces.ReadOnlySignedBeaconBlock + initSyncBlocksLock sync.RWMutex + wsVerifier *WeakSubjectivityVerifier + clockSetter startup.ClockSetter + clockWaiter startup.ClockWaiter + syncComplete chan struct{} + blobNotifiers *blobNotifierMap + blockBeingSynced *currentlySyncingBlock + blobStorage *filesystem.BlobStorage + lcStore *light_client.Store } // config options for the service. diff --git a/beacon-chain/light-client/BUILD.bazel b/beacon-chain/light-client/BUILD.bazel new file mode 100644 index 00000000000..4c36f9363a8 --- /dev/null +++ b/beacon-chain/light-client/BUILD.bazel @@ -0,0 +1,9 @@ +load("@prysm//tools/go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["store.go"], + importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/light-client", + visibility = ["//visibility:public"], + deps = ["//consensus-types/interfaces:go_default_library"], +) diff --git a/beacon-chain/light-client/store.go b/beacon-chain/light-client/store.go new file mode 100644 index 00000000000..203758d3ebc --- /dev/null +++ b/beacon-chain/light-client/store.go @@ -0,0 +1,12 @@ +package light_client + +import ( + "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" +) + +type Store struct { + // lcFinalityUpdateLock sync.Mutex + LastLCFinalityUpdate interfaces.LightClientFinalityUpdate + // lcOptimisticUpdateLock sync.Mutex + LastLCOptimisticUpdate interfaces.LightClientOptimisticUpdate +} diff --git a/beacon-chain/node/BUILD.bazel b/beacon-chain/node/BUILD.bazel index a310daf40b2..e21526b857b 100644 --- a/beacon-chain/node/BUILD.bazel +++ b/beacon-chain/node/BUILD.bazel @@ -30,6 +30,7 @@ go_library( "//beacon-chain/execution:go_default_library", "//beacon-chain/forkchoice:go_default_library", "//beacon-chain/forkchoice/doubly-linked-tree:go_default_library", + "//beacon-chain/light-client:go_default_library", "//beacon-chain/monitor:go_default_library", "//beacon-chain/node/registration:go_default_library", "//beacon-chain/operations/attestations:go_default_library", diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index fce4a4e56af..3b08cc3105b 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -34,6 +34,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution" "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice" doublylinkedtree "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/doubly-linked-tree" + light_client "github.com/prysmaticlabs/prysm/v5/beacon-chain/light-client" "github.com/prysmaticlabs/prysm/v5/beacon-chain/monitor" "github.com/prysmaticlabs/prysm/v5/beacon-chain/node/registration" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations" @@ -121,6 +122,7 @@ type BeaconNode struct { BlobStorageOptions []filesystem.BlobStorageOption verifyInitWaiter *verification.InitializerWaiter syncChecker *initialsync.SyncChecker + lcStore *light_client.Store } // New creates a new node instance, sets up configuration options, and registers @@ -157,6 +159,7 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco serviceFlagOpts: &serviceFlagOpts{}, initialSyncComplete: make(chan struct{}), syncChecker: &initialsync.SyncChecker{}, + lcStore: &light_client.Store{}, } for _, opt := range opts { @@ -763,6 +766,7 @@ func (b *BeaconNode) registerBlockchainService(fc forkchoice.ForkChoicer, gs *st blockchain.WithTrackedValidatorsCache(b.trackedValidatorsCache), blockchain.WithPayloadIDCache(b.payloadIDCache), blockchain.WithSyncChecker(b.syncChecker), + blockchain.WithLightClientStore(b.lcStore), ) blockchainService, err := blockchain.NewService(b.ctx, opts...) @@ -846,6 +850,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil regularsync.WithBlobStorage(b.BlobStorage), regularsync.WithVerifierWaiter(b.verifyInitWaiter), regularsync.WithAvailableBlocker(bFillStore), + regularsync.WithLightClientStore(b.lcStore), ) return b.services.RegisterService(rs) } diff --git a/beacon-chain/p2p/types/types.go b/beacon-chain/p2p/types/types.go index 95dd762be7d..4bf10ad5c81 100644 --- a/beacon-chain/p2p/types/types.go +++ b/beacon-chain/p2p/types/types.go @@ -250,8 +250,8 @@ func (r LightClientBootstrapReq) UnmarshalSSZ(buf []byte) error { // LightClientUpdatesByRangeReq specifies the block by roots request type. type LightClientUpdatesByRangeReq struct { - startPeriod uint64 - count uint64 + StartPeriod uint64 + Count uint64 } // MarshalSSZTo marshals the light client updates by range request with the provided byte slice. @@ -266,8 +266,8 @@ func (r *LightClientUpdatesByRangeReq) MarshalSSZTo(dst []byte) ([]byte, error) // MarshalSSZ Marshals the light client updates by range request type into the serialized object. func (r *LightClientUpdatesByRangeReq) MarshalSSZ() ([]byte, error) { buf := make([]byte, 0, r.SizeSSZ()) - binary.LittleEndian.AppendUint64(buf, r.startPeriod) - binary.LittleEndian.AppendUint64(buf, r.count) + binary.LittleEndian.AppendUint64(buf, r.StartPeriod) + binary.LittleEndian.AppendUint64(buf, r.Count) return buf, nil } @@ -283,7 +283,7 @@ func (r *LightClientUpdatesByRangeReq) UnmarshalSSZ(buf []byte) error { if bufLen != lightClientUpdatesByRangeReqLength { return errors.Errorf("expected buffer with length of %d but received length %d", lightClientUpdatesByRangeReqLength, bufLen) } - r.startPeriod = binary.LittleEndian.Uint64(buf[0:8]) - r.count = binary.LittleEndian.Uint64(buf[8:16]) + r.StartPeriod = binary.LittleEndian.Uint64(buf[0:8]) + r.Count = binary.LittleEndian.Uint64(buf[8:16]) return nil } diff --git a/beacon-chain/rpc/eth/light-client/helpers.go b/beacon-chain/rpc/eth/light-client/helpers.go index 2c43366d901..4a811248526 100644 --- a/beacon-chain/rpc/eth/light-client/helpers.go +++ b/beacon-chain/rpc/eth/light-client/helpers.go @@ -4,18 +4,12 @@ import ( "context" "reflect" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/pkg/errors" - fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" - "github.com/prysmaticlabs/prysm/v5/config/params" - "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" - - lightclient "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/light-client" - "github.com/prysmaticlabs/prysm/v5/runtime/version" - "github.com/prysmaticlabs/prysm/v5/api/server/structs" + lightclient "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/light-client" "github.com/prysmaticlabs/prysm/v5/beacon-chain/state" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/time/slots" ) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index ddcd95634cd..e7299b13ab1 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -82,6 +82,7 @@ go_library( "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/db/filters:go_default_library", "//beacon-chain/execution:go_default_library", + "//beacon-chain/light-client:go_default_library", "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/operations/blstoexec:go_default_library", "//beacon-chain/operations/slashings:go_default_library", @@ -126,6 +127,7 @@ go_library( "//time:go_default_library", "//time/slots:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", + "@com_github_ethereum_go_ethereum//common/math:go_default_library", "@com_github_hashicorp_golang_lru//:go_default_library", "@com_github_libp2p_go_libp2p//core:go_default_library", "@com_github_libp2p_go_libp2p//core/host:go_default_library", diff --git a/beacon-chain/sync/options.go b/beacon-chain/sync/options.go index ff20b8b8121..2eaa5325a9a 100644 --- a/beacon-chain/sync/options.go +++ b/beacon-chain/sync/options.go @@ -8,6 +8,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution" + light_client "github.com/prysmaticlabs/prysm/v5/beacon-chain/light-client" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/blstoexec" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/slashings" @@ -180,3 +181,11 @@ func WithAvailableBlocker(avb coverage.AvailableBlocker) Option { return nil } } + +// WithLightClientStore allows the sync package to access light client data. +func WithLightClientStore(lcs *light_client.Store) Option { + return func(s *Service) error { + s.lcStore = lcs + return nil + } +} diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index 6c8de561b5d..bc452a7f62d 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -203,3 +203,146 @@ func WriteLightClientBootstrapChunk( _, err := encoding.EncodeWithMaxLength(stream, bootstrap) return err } + +func WriteLightClientUpdateChunk( + stream libp2pcore.Stream, + tor blockchain.TemporalOracle, + encoding encoder.NetworkEncoding, + update interfaces.LightClientUpdate, +) error { + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + return err + } + + var obtainedCtx []byte + valRoot := tor.GenesisValidatorsRoot() + switch v := update.Version(); v { + case version.Altair: + digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + case version.Capella: + digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().CapellaForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + case version.Deneb: + digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + case version.Electra: + digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().ElectraForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + default: + return errors.Wrapf(ErrUnrecognizedVersion, "light client boostrap version %s is not recognized", version.String(v)) + } + + if err := writeContextToStream(obtainedCtx, stream); err != nil { + return err + } + _, err := encoding.EncodeWithMaxLength(stream, update) + return err +} + +func WriteLightClientFinalityUpdateChunk( + stream libp2pcore.Stream, + tor blockchain.TemporalOracle, + encoding encoder.NetworkEncoding, + update interfaces.LightClientFinalityUpdate, +) error { + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + return err + } + + var obtainedCtx []byte + valRoot := tor.GenesisValidatorsRoot() + switch v := update.Version(); v { + case version.Altair: + digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + case version.Capella: + digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().CapellaForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + case version.Deneb: + digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + case version.Electra: + digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().ElectraForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + default: + return errors.Wrapf(ErrUnrecognizedVersion, "light client boostrap version %s is not recognized", version.String(v)) + } + + if err := writeContextToStream(obtainedCtx, stream); err != nil { + return err + } + _, err := encoding.EncodeWithMaxLength(stream, update) + return err +} +func WriteLightClientOptimisticUpdateChunk( + stream libp2pcore.Stream, + tor blockchain.TemporalOracle, + encoding encoder.NetworkEncoding, + update interfaces.LightClientOptimisticUpdate, +) error { + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + return err + } + + var obtainedCtx []byte + valRoot := tor.GenesisValidatorsRoot() + switch v := update.Version(); v { + case version.Altair: + digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + case version.Capella: + digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().CapellaForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + case version.Deneb: + digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + case version.Electra: + digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().ElectraForkEpoch, valRoot[:]) + if err != nil { + return err + } + obtainedCtx = digest[:] + default: + return errors.Wrapf(ErrUnrecognizedVersion, "light client boostrap version %s is not recognized", version.String(v)) + } + + if err := writeContextToStream(obtainedCtx, stream); err != nil { + return err + } + _, err := encoding.EncodeWithMaxLength(stream, update) + return err +} diff --git a/beacon-chain/sync/rpc_light_client.go b/beacon-chain/sync/rpc_light_client.go index 48478176c7f..cbeee59be2e 100644 --- a/beacon-chain/sync/rpc_light_client.go +++ b/beacon-chain/sync/rpc_light_client.go @@ -4,14 +4,17 @@ import ( "context" "fmt" + "github.com/ethereum/go-ethereum/common/math" libp2pcore "github.com/libp2p/go-libp2p/core" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/monitoring/tracing" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" ) -// blobSidecarByRootRPCHandler handles the /eth2/beacon_chain/req/blob_sidecars_by_root/1/ RPC request. -// spec: https://github.com/ethereum/consensus-specs/blob/a7e45db9ac2b60a33e144444969ad3ac0aae3d4c/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1 +// lightClientBootstrapRPCHandler handles the /eth2/beacon_chain/req/light_client_bootstrap/1/ RPC request. func (s *Service) lightClientBootstrapRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { ctx, span := trace.StartSpan(ctx, "sync.lightClientBootstrapRPCHandler") defer span.End() @@ -36,9 +39,14 @@ func (s *Service) lightClientBootstrapRPCHandler(ctx context.Context, msg interf bootstrap, err := s.cfg.beaconDB.LightClientBootstrap(ctx, blkRoot) if err != nil { s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) return err } + + SetStreamWriteDeadline(stream, defaultWriteDuration) if err = WriteLightClientBootstrapChunk(stream, s.cfg.clock, s.cfg.p2p.Encoding(), bootstrap); err != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) return err } @@ -46,14 +54,120 @@ func (s *Service) lightClientBootstrapRPCHandler(ctx context.Context, msg interf return nil } +// lightClientBootstrapRPCHandler handles the /eth2/beacon_chain/req/light_client_updates_by_range/1/ RPC request. func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { + ctx, span := trace.StartSpan(ctx, "sync.lightClientUpdatesByRangeRPCHandler") + defer span.End() + ctx, cancel := context.WithTimeout(ctx, ttfbTimeout) + defer cancel() + + // TODO: What should we log? + log := log.WithField("handler", p2p.LightClientUpdatesByRangeName[1:]) // slice the leading slash off the name var + + SetRPCStreamDeadlines(stream) + if err := s.rateLimiter.validateRequest(stream, 1); err != nil { + return err + } + s.rateLimiter.add(stream, 1) + + rawMsg, ok := msg.(*types.LightClientUpdatesByRangeReq) + if !ok { + return fmt.Errorf("message is not type %T", &types.LightClientUpdatesByRangeReq{}) + } + r := *rawMsg + + if r.Count > params.BeaconConfig().MaxRequestLightClientUpdates { + r.Count = params.BeaconConfig().MaxRequestLightClientUpdates + } + endPeriod, ok := math.SafeAdd(r.StartPeriod, r.Count-1) + if !ok { + err := errors.Wrap(types.ErrInvalidRequest, "end period overflows") + s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + tracing.AnnotateError(span, err) + return err + } + + updates, err := s.cfg.beaconDB.LightClientUpdates(ctx, r.StartPeriod, endPeriod) + if err != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + return err + } + for _, u := range updates { + SetStreamWriteDeadline(stream, defaultWriteDuration) + if err = WriteLightClientUpdateChunk(stream, s.cfg.clock, s.cfg.p2p.Encoding(), u); err != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + return err + } + s.rateLimiter.add(stream, 1) + } + + closeStream(stream, log) return nil } -func (s *Service) lightClientFinalityUpdateRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { +// lightClientBootstrapRPCHandler handles the /eth2/beacon_chain/req/light_client_finality_update/1/ RPC request. +func (s *Service) lightClientFinalityUpdateRPCHandler(ctx context.Context, _ interface{}, stream libp2pcore.Stream) error { + ctx, span := trace.StartSpan(ctx, "sync.lightClientFinalityUpdateRPCHandler") + defer span.End() + ctx, cancel := context.WithTimeout(ctx, ttfbTimeout) + defer cancel() + + // TODO: What should we log? + log := log.WithField("handler", p2p.LightClientFinalityUpdateName[1:]) // slice the leading slash off the name var + + SetRPCStreamDeadlines(stream) + if err := s.rateLimiter.validateRequest(stream, 1); err != nil { + return err + } + s.rateLimiter.add(stream, 1) + + if s.lcStore.LastLCFinalityUpdate == nil { + s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrResourceUnavailable.Error(), stream) + return nil + } + + SetStreamWriteDeadline(stream, defaultWriteDuration) + if err := WriteLightClientFinalityUpdateChunk(stream, s.cfg.clock, s.cfg.p2p.Encoding(), s.lcStore.LastLCFinalityUpdate); err != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + return err + } + + closeStream(stream, log) return nil } -func (s *Service) lightClientOptimisticUpdateRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { +// lightClientOptimisticUpdateRPCHandler handles the /eth2/beacon_chain/req/light_client_optimistic_update/1/ RPC request. +func (s *Service) lightClientOptimisticUpdateRPCHandler(ctx context.Context, _ interface{}, stream libp2pcore.Stream) error { + ctx, span := trace.StartSpan(ctx, "sync.lightClientOptimisticUpdateRPCHandler") + defer span.End() + ctx, cancel := context.WithTimeout(ctx, ttfbTimeout) + defer cancel() + + // TODO: What should we log? + log := log.WithField("handler", p2p.LightClientOptimisticUpdateName[1:]) // slice the leading slash off the name var + + SetRPCStreamDeadlines(stream) + if err := s.rateLimiter.validateRequest(stream, 1); err != nil { + return err + } + s.rateLimiter.add(stream, 1) + + if s.lcStore.LastLCOptimisticUpdate == nil { + s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrResourceUnavailable.Error(), stream) + return nil + } + + SetStreamWriteDeadline(stream, defaultWriteDuration) + if err := WriteLightClientOptimisticUpdateChunk(stream, s.cfg.clock, s.cfg.p2p.Encoding(), s.lcStore.LastLCOptimisticUpdate); err != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + return err + } + + closeStream(stream, log) return nil } diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index 30edbb09cf5..8572d94a02a 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -25,6 +25,8 @@ import ( "github.com/sirupsen/logrus" ) +// TODO + var errBlobChunkedReadFailure = errors.New("failed to read stream of chunk-encoded blobs") var errBlobUnmarshal = errors.New("Could not unmarshal chunk-encoded blob") diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 26d610dcf54..a1d295ffaff 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -25,6 +25,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution" + light_client "github.com/prysmaticlabs/prysm/v5/beacon-chain/light-client" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/blstoexec" "github.com/prysmaticlabs/prysm/v5/beacon-chain/operations/slashings" @@ -168,10 +169,7 @@ type Service struct { newBlobVerifier verification.NewBlobVerifier availableBlocker coverage.AvailableBlocker ctxMap ContextByteVersions - lcFinalityUpdateLock sync.Mutex - lastLCFinalityUpdate *lcFinalityUpdateInfo - lcOptimisticUpdateLock sync.Mutex - lastLCOptimisticUpdate *lcOptimisticUpdateInfo + lcStore *light_client.Store } // NewService initializes new regular sync service. diff --git a/beacon-chain/sync/subscriber_light_client.go b/beacon-chain/sync/subscriber_light_client.go index 288d941d6c8..7ea65284e89 100644 --- a/beacon-chain/sync/subscriber_light_client.go +++ b/beacon-chain/sync/subscriber_light_client.go @@ -2,16 +2,44 @@ package sync import ( "context" + "fmt" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" + statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" + "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" "google.golang.org/protobuf/proto" ) // TODO: event feed func (s *Service) lightClientFinalityUpdateSubscriber(_ context.Context, msg proto.Message) error { + update, ok := msg.(interfaces.LightClientFinalityUpdate) + if !ok { + return fmt.Errorf("message type %T is not a light client finality update", msg) + } + + s.lcStore.LastLCFinalityUpdate = update + + s.cfg.stateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.LightClientFinalityUpdate, + Data: update, + }) + return nil } func (s *Service) lightClientOptimisticUpdateSubscriber(_ context.Context, msg proto.Message) error { + update, ok := msg.(interfaces.LightClientOptimisticUpdate) + if !ok { + return fmt.Errorf("message type %T is not a light client optimistic update", msg) + } + + s.lcStore.LastLCOptimisticUpdate = update + + s.cfg.stateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.LightClientOptimisticUpdate, + Data: update, + }) + return nil } diff --git a/beacon-chain/sync/validate_light_client.go b/beacon-chain/sync/validate_light_client.go index b68513fb6a3..889ba28823c 100644 --- a/beacon-chain/sync/validate_light_client.go +++ b/beacon-chain/sync/validate_light_client.go @@ -40,22 +40,24 @@ func (s *Service) validateLightClientFinalityUpdate(ctx context.Context, pid pee return pubsub.ValidationReject, errWrongMessage } - s.lcFinalityUpdateLock.Lock() - defer s.lcFinalityUpdateLock.Unlock() - maxActiveParticipants := update.SyncAggregate().SyncCommitteeBits.Len() numActiveParticipants := update.SyncAggregate().SyncCommitteeBits.Count() hasSupermajority := numActiveParticipants*3 >= maxActiveParticipants*2 - last := s.lastLCFinalityUpdate + last := s.lcStore.LastLCFinalityUpdate if last != nil { // [IGNORE] The finalized_header.beacon.slot is greater than that of all previously forwarded finality_updates, // or it matches the highest previously forwarded slot and also has a sync_aggregate indicating supermajority (> 2/3) // sync committee participation while the previously forwarded finality_update for that slot did not indicate supermajority - if update.FinalizedHeader().Beacon().Slot < last.slot { + slot := last.FinalizedHeader().Beacon().Slot + lastMaxActiveParticipants := last.SyncAggregate().SyncCommitteeBits.Len() + lastNumActiveParticipants := last.SyncAggregate().SyncCommitteeBits.Count() + lastHasSupermajority := lastNumActiveParticipants*3 >= lastMaxActiveParticipants*2 + + if update.FinalizedHeader().Beacon().Slot < slot { return pubsub.ValidationIgnore, nil } - if update.FinalizedHeader().Beacon().Slot == last.slot && (last.hasSupermajority || !hasSupermajority) { + if update.FinalizedHeader().Beacon().Slot == slot && (lastHasSupermajority || !hasSupermajority) { return pubsub.ValidationIgnore, nil } } @@ -70,11 +72,6 @@ func (s *Service) validateLightClientFinalityUpdate(ctx context.Context, pid pee return pubsub.ValidationIgnore, nil } - s.lastLCFinalityUpdate = &lcFinalityUpdateInfo{ - slot: update.FinalizedHeader().Beacon().Slot, - hasSupermajority: hasSupermajority, - } - return pubsub.ValidationAccept, nil } @@ -105,13 +102,10 @@ func (s *Service) validateLightClientOptimisticUpdate(ctx context.Context, pid p return pubsub.ValidationReject, errWrongMessage } - s.lcOptimisticUpdateLock.Lock() - defer s.lcOptimisticUpdateLock.Unlock() - - last := s.lastLCOptimisticUpdate + last := s.lcStore.LastLCOptimisticUpdate if last != nil { // [IGNORE] The attested_header.beacon.slot is greater than that of all previously forwarded optimistic_updates - if update.AttestedHeader().Beacon().Slot <= last.slot { + if update.AttestedHeader().Beacon().Slot <= last.AttestedHeader().Beacon().Slot { return pubsub.ValidationIgnore, nil } } @@ -126,9 +120,5 @@ func (s *Service) validateLightClientOptimisticUpdate(ctx context.Context, pid p return pubsub.ValidationIgnore, nil } - s.lastLCOptimisticUpdate = &lcOptimisticUpdateInfo{ - slot: update.AttestedHeader().Beacon().Slot, - } - return pubsub.ValidationAccept, nil }