Skip to content

Commit

Permalink
optimise a.blsToExecutionChangeService.ProcessMessage with async sign… (
Browse files Browse the repository at this point in the history
#12441)

…ature verification

Co-authored-by: shota.silagadze <shota.silagadze@taal.com>
  • Loading branch information
shotasilagadze and shotasilagadzetaal authored Oct 24, 2024
1 parent da79034 commit 3d5d022
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 88 deletions.
27 changes: 12 additions & 15 deletions cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,24 +269,21 @@ func (a *ApiHandler) PostEthV1BeaconPoolBlsToExecutionChanges(w http.ResponseWri
}
failures := []poolingFailure{}
for _, v := range req {
if err := a.blsToExecutionChangeService.ProcessMessage(r.Context(), nil, v); err != nil && !errors.Is(err, services.ErrIgnore) {
failures = append(failures, poolingFailure{Index: len(failures), Message: err.Error()})
continue
encodedSSZ, err := v.EncodeSSZ(nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Broadcast to gossip
if a.sentinel != nil {
encodedSSZ, err := v.EncodeSSZ(nil)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{

if err := a.blsToExecutionChangeService.ProcessMessage(r.Context(), nil, &cltypes.SignedBLSToExecutionChangeWithGossipData{
SignedBLSToExecutionChange: v,
GossipData: &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNameBlsToExecutionChange,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
},
}); err != nil && !errors.Is(err, services.ErrIgnore) {
failures = append(failures, poolingFailure{Index: len(failures), Message: err.Error()})
continue
}
}

Expand Down
4 changes: 2 additions & 2 deletions cl/beacon/handler/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ func setupTestingHandler(t *testing.T, v clparams.StateVersion, logger log.Logge
opPool.VoluntaryExitsPool.Insert(msg.VoluntaryExit.ValidatorIndex, msg)
return nil
}).AnyTimes()
blsToExecutionChangeService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedBLSToExecutionChange) error {
opPool.BLSToExecutionChangesPool.Insert(msg.Signature, msg)
blsToExecutionChangeService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.SignedBLSToExecutionChangeWithGossipData) error {
opPool.BLSToExecutionChangesPool.Insert(msg.SignedBLSToExecutionChange.Signature, msg.SignedBLSToExecutionChange)
return nil
}).AnyTimes()
proposerSlashingService.EXPECT().ProcessMessage(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, subnetID *uint64, msg *cltypes.ProposerSlashing) error {
Expand Down
7 changes: 7 additions & 0 deletions cl/cltypes/bls_to_execution_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

libcommon "github.com/erigontech/erigon-lib/common"
sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto"
"github.com/erigontech/erigon-lib/types/ssz"
"github.com/erigontech/erigon/cl/merkle_tree"
ssz2 "github.com/erigontech/erigon/cl/ssz"
Expand Down Expand Up @@ -58,6 +59,12 @@ func (*BLSToExecutionChange) Static() bool {
return true
}

// SignedBLSToExecutionChangeWithGossipData type represents SignedBLSToExecutionChange with the gossip data where it's coming from.
type SignedBLSToExecutionChangeWithGossipData struct {
SignedBLSToExecutionChange *SignedBLSToExecutionChange
GossipData *sentinel.GossipData
}

type SignedBLSToExecutionChange struct {
Message *BLSToExecutionChange `json:"message"`
Signature libcommon.Bytes96 `json:"signature"`
Expand Down
7 changes: 5 additions & 2 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,11 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
case gossip.TopicNameAttesterSlashing:
return operationsContract[*cltypes.AttesterSlashing](ctx, g, data, int(version), "attester slashing", g.forkChoice.OnAttesterSlashing)
case gossip.TopicNameBlsToExecutionChange:
obj := &cltypes.SignedBLSToExecutionChange{}
if err := obj.DecodeSSZ(data.Data, int(version)); err != nil {
obj := &cltypes.SignedBLSToExecutionChangeWithGossipData{
GossipData: copyOfSentinelData(data),
SignedBLSToExecutionChange: &cltypes.SignedBLSToExecutionChange{},
}
if err := obj.SignedBLSToExecutionChange.DecodeSSZ(data.Data, int(version)); err != nil {
return err
}
return g.blsToExecutionChangeService.ProcessMessage(ctx, data.SubnetId, obj)
Expand Down
23 changes: 15 additions & 8 deletions cl/phase1/network/services/batch_signature_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ var (
)

type BatchSignatureVerifier struct {
sentinel sentinel.SentinelClient
attVerifyAndExecute chan *AggregateVerificationData
aggregateProofVerify chan *AggregateVerificationData
ctx context.Context
sentinel sentinel.SentinelClient
attVerifyAndExecute chan *AggregateVerificationData
aggregateProofVerify chan *AggregateVerificationData
blsToExecutionChangeVerify chan *AggregateVerificationData
ctx context.Context
}

// each AggregateVerification request has sentinel.SentinelClient and *sentinel.GossipData
Expand All @@ -41,10 +42,11 @@ type AggregateVerificationData struct {

func NewBatchSignatureVerifier(ctx context.Context, sentinel sentinel.SentinelClient) *BatchSignatureVerifier {
return &BatchSignatureVerifier{
ctx: ctx,
sentinel: sentinel,
attVerifyAndExecute: make(chan *AggregateVerificationData, 1024),
aggregateProofVerify: make(chan *AggregateVerificationData, 1024),
ctx: ctx,
sentinel: sentinel,
attVerifyAndExecute: make(chan *AggregateVerificationData, 1024),
aggregateProofVerify: make(chan *AggregateVerificationData, 1024),
blsToExecutionChangeVerify: make(chan *AggregateVerificationData, 1024),
}
}

Expand All @@ -57,6 +59,10 @@ func (b *BatchSignatureVerifier) AsyncVerifyAggregateProof(data *AggregateVerifi
b.aggregateProofVerify <- data
}

func (b *BatchSignatureVerifier) AsyncVerifyBlsToExecutionChange(data *AggregateVerificationData) {
b.blsToExecutionChangeVerify <- data
}

func (b *BatchSignatureVerifier) ImmediateVerification(data *AggregateVerificationData) error {
return b.processSignatureVerification([]*AggregateVerificationData{data})
}
Expand All @@ -65,6 +71,7 @@ func (b *BatchSignatureVerifier) Start() {
// separate goroutines for each type of verification
go b.start(b.attVerifyAndExecute)
go b.start(b.aggregateProofVerify)
go b.start(b.blsToExecutionChangeVerify)
}

// When receiving AggregateVerificationData, we simply collect all the signature verification data
Expand Down
72 changes: 42 additions & 30 deletions cl/phase1/network/services/bls_to_execution_change_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,34 +38,37 @@ var (
)

type blsToExecutionChangeService struct {
operationsPool pool.OperationsPool
emitters *beaconevents.EventEmitter
syncedDataManager synced_data.SyncedData
beaconCfg *clparams.BeaconChainConfig
operationsPool pool.OperationsPool
emitters *beaconevents.EventEmitter
syncedDataManager synced_data.SyncedData
beaconCfg *clparams.BeaconChainConfig
batchSignatureVerifier *BatchSignatureVerifier
}

func NewBLSToExecutionChangeService(
operationsPool pool.OperationsPool,
emitters *beaconevents.EventEmitter,
syncedDataManager synced_data.SyncedData,
beaconCfg *clparams.BeaconChainConfig,
batchSignatureVerifier *BatchSignatureVerifier,
) BLSToExecutionChangeService {
return &blsToExecutionChangeService{
operationsPool: operationsPool,
emitters: emitters,
syncedDataManager: syncedDataManager,
beaconCfg: beaconCfg,
operationsPool: operationsPool,
emitters: emitters,
syncedDataManager: syncedDataManager,
beaconCfg: beaconCfg,
batchSignatureVerifier: batchSignatureVerifier,
}
}

func (s *blsToExecutionChangeService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SignedBLSToExecutionChange) error {
func (s *blsToExecutionChangeService) ProcessMessage(ctx context.Context, subnet *uint64, msg *cltypes.SignedBLSToExecutionChangeWithGossipData) error {
// https://github.com/ethereum/consensus-specs/blob/dev/specs/capella/p2p-interface.md#bls_to_execution_change
// [IGNORE] The signed_bls_to_execution_change is the first valid signed bls to execution change received
// for the validator with index signed_bls_to_execution_change.message.validator_index.
if s.operationsPool.BLSToExecutionChangesPool.Has(msg.Signature) {
if s.operationsPool.BLSToExecutionChangesPool.Has(msg.SignedBLSToExecutionChange.Signature) {
return ErrIgnore
}
change := msg.Message
change := msg.SignedBLSToExecutionChange.Message
stateReader := s.syncedDataManager.HeadStateReader()
if stateReader == nil {
return ErrIgnore
Expand Down Expand Up @@ -110,26 +113,35 @@ func (s *blsToExecutionChangeService) ProcessMessage(ctx context.Context, subnet
if err != nil {
return err
}
valid, err := blsVerify(msg.Signature[:], signedRoot[:], change.From[:])
if err != nil {
return err
}
if !valid {
return errors.New("invalid signature")

aggregateVerificationData := &AggregateVerificationData{
Signatures: [][]byte{msg.SignedBLSToExecutionChange.Signature[:]},
SignRoots: [][]byte{signedRoot[:]},
Pks: [][]byte{change.From[:]},
GossipData: msg.GossipData,
F: func() {
// validator.withdrawal_credentials = (
// ETH1_ADDRESS_WITHDRAWAL_PREFIX
// + b'\x00' * 11
// + address_change.to_execution_address
// )
newWc := libcommon.Hash{}
newWc[0] = byte(s.beaconCfg.ETH1AddressWithdrawalPrefixByte)
copy(newWc[1:], make([]byte, 11))
copy(newWc[12:], change.To[:])
stateMutator.SetWithdrawalCredentialForValidatorAtIndex(int(change.ValidatorIndex), newWc)

s.emitters.Operation().SendBlsToExecution(msg.SignedBLSToExecutionChange)
s.operationsPool.BLSToExecutionChangesPool.Insert(msg.SignedBLSToExecutionChange.Signature, msg.SignedBLSToExecutionChange)
},
}

// validator.withdrawal_credentials = (
// ETH1_ADDRESS_WITHDRAWAL_PREFIX
// + b'\x00' * 11
// + address_change.to_execution_address
// )
newWc := libcommon.Hash{}
newWc[0] = byte(s.beaconCfg.ETH1AddressWithdrawalPrefixByte)
copy(newWc[1:], make([]byte, 11))
copy(newWc[12:], change.To[:])
stateMutator.SetWithdrawalCredentialForValidatorAtIndex(int(change.ValidatorIndex), newWc)
// push the signatures to verify asynchronously and run final functions after that.
s.batchSignatureVerifier.AsyncVerifyBlsToExecutionChange(aggregateVerificationData)

s.emitters.Operation().SendBlsToExecution(msg)
s.operationsPool.BLSToExecutionChangesPool.Insert(msg.Signature, msg)
return nil
// As the logic goes, if we return ErrIgnore there will be no peer banning and further publishing
// gossip data into the network by the gossip manager. That's what we want because we will be doing that ourselves
// in BatchSignatureVerifier service. After validating signatures, if they are valid we will publish the
// gossip ourselves or ban the peer which sent that particular invalid signature.
return ErrIgnore
}
Loading

0 comments on commit 3d5d022

Please sign in to comment.