diff --git a/network/discovery/dv5_routing.go b/network/discovery/dv5_routing.go index a27b875078..2ffa08c0f1 100644 --- a/network/discovery/dv5_routing.go +++ b/network/discovery/dv5_routing.go @@ -32,9 +32,13 @@ func (dvs *DiscV5Service) Advertise(ctx context.Context, ns string, opt ...disco return opts.Ttl, nil } - if err := dvs.RegisterSubnets(logger, subnet); err != nil { + updated, err := dvs.RegisterSubnets(logger, subnet) + if err != nil { return 0, err } + if updated { + go dvs.PublishENR(logger) + } return opts.Ttl, nil } diff --git a/network/discovery/dv5_service.go b/network/discovery/dv5_service.go index b2d6f181d9..9e7645605e 100644 --- a/network/discovery/dv5_service.go +++ b/network/discovery/dv5_service.go @@ -23,7 +23,7 @@ import ( ) var ( - defaultDiscoveryInterval = time.Second + defaultDiscoveryInterval = time.Millisecond * 100 publishENRTimeout = time.Minute publishStateReady = int32(0) @@ -152,9 +152,14 @@ func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error { if err != nil { return errors.Wrap(err, "could not read domain type") } - if nodeDomainType != dvs.domainType.DomainType() && - nodeDomainType != dvs.domainType.NextDomainType() { - return fmt.Errorf("mismatched domain type: %x", nodeDomainType) + nodeNextDomainType, err := records.GetDomainTypeEntry(e.Node.Record(), records.KeyNextDomainType) + if err != nil && !errors.Is(err, records.ErrEntryNotFound) { + return errors.Wrap(err, "could not read domain type") + } + if dvs.domainType.DomainType() != nodeDomainType && + dvs.domainType.DomainType() != nodeNextDomainType { + return fmt.Errorf("mismatched domain type: neither %x nor %x match %x", + nodeDomainType, nodeNextDomainType, dvs.domainType.DomainType()) } // Get the peer's subnets, skipping if it has none. @@ -258,43 +263,43 @@ func (dvs *DiscV5Service) discover(ctx context.Context, handler HandleNewPeer, i } // RegisterSubnets adds the given subnets and publish the updated node record -func (dvs *DiscV5Service) RegisterSubnets(logger *zap.Logger, subnets ...int) error { +func (dvs *DiscV5Service) RegisterSubnets(logger *zap.Logger, subnets ...int) (updated bool, err error) { if len(subnets) == 0 { - return nil + return false, nil } - updated, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), subnets, nil) + updatedSubnets, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), subnets, nil) if err != nil { - return errors.Wrap(err, "could not update ENR") + return false, errors.Wrap(err, "could not update ENR") } - if updated != nil { - dvs.subnets = updated + if updatedSubnets != nil { + dvs.subnets = updatedSubnets logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode())) - go dvs.publishENR(logger) + return true, nil } - return nil + return false, nil } // DeregisterSubnets removes the given subnets and publish the updated node record -func (dvs *DiscV5Service) DeregisterSubnets(logger *zap.Logger, subnets ...int) error { +func (dvs *DiscV5Service) DeregisterSubnets(logger *zap.Logger, subnets ...int) (updated bool, err error) { logger = logger.Named(logging.NameDiscoveryService) if len(subnets) == 0 { - return nil + return false, nil } - updated, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), nil, subnets) + updatedSubnets, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), nil, subnets) if err != nil { - return errors.Wrap(err, "could not update ENR") + return false, errors.Wrap(err, "could not update ENR") } - if updated != nil { - dvs.subnets = updated + if updatedSubnets != nil { + dvs.subnets = updatedSubnets logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode())) - go dvs.publishENR(logger) + return true, nil } - return nil + return false, nil } -// publishENR publishes the new ENR across the network -func (dvs *DiscV5Service) publishENR(logger *zap.Logger) { +// PublishENR publishes the ENR with the current domain type across the network +func (dvs *DiscV5Service) PublishENR(logger *zap.Logger) { ctx, done := context.WithTimeout(dvs.ctx, publishENRTimeout) defer done() if !atomic.CompareAndSwapInt32(&dvs.publishState, publishStateReady, publishStatePending) { @@ -302,6 +307,18 @@ func (dvs *DiscV5Service) publishENR(logger *zap.Logger) { logger.Debug("pending publish ENR") return } + + err := records.SetDomainTypeEntry(dvs.dv5Listener.LocalNode(), records.KeyDomainType, dvs.domainType.DomainType()) + if err != nil { + logger.Error("could not set domain type", zap.Error(err)) + return + } + err = records.SetDomainTypeEntry(dvs.dv5Listener.LocalNode(), records.KeyNextDomainType, dvs.domainType.NextDomainType()) + if err != nil { + logger.Error("could not set next domain type", zap.Error(err)) + return + } + defer atomic.StoreInt32(&dvs.publishState, publishStateReady) dvs.discover(ctx, func(e PeerEvent) { metricPublishEnrPings.Inc() diff --git a/network/discovery/dv5_service_test.go b/network/discovery/dv5_service_test.go index d806fda8c8..1677991111 100644 --- a/network/discovery/dv5_service_test.go +++ b/network/discovery/dv5_service_test.go @@ -28,7 +28,7 @@ func (td *TestDomainTypeProvider) DomainType() spectypes.DomainType { } func (td *TestDomainTypeProvider) NextDomainType() spectypes.DomainType { - return spectypes.DomainType{0x1, 0x2, 0x3, 0x4} + return spectypes.DomainType{0x1, 0x2, 0x3, 0x5} } func (td *TestDomainTypeProvider) DomainTypeAtEpoch(epoch phase0.Epoch) spectypes.DomainType { @@ -55,10 +55,38 @@ func TestCheckPeer(t *testing.T) { expectedError: errors.New("could not read domain type: not found"), }, { - name: "domain type mismatch", + name: "missing domain type but has next domain type", + domainType: nil, + nextDomainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x5}, + subnets: mySubnets, + expectedError: errors.New("could not read domain type: not found"), + }, + { + name: "domain type mismatch", + domainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x5}, + nextDomainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x6}, + subnets: mySubnets, + expectedError: errors.New("mismatched domain type: neither 01020305 nor 01020306 match 01020304"), + }, + { + name: "domain type mismatch (missing next domain type)", domainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x5}, subnets: mySubnets, - expectedError: errors.New("mismatched domain type: 01020305"), + expectedError: errors.New("mismatched domain type: neither 01020305 nor 00000000 match 01020304"), + }, + { + name: "only next domain type matches", + domainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x3}, + nextDomainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x4}, + subnets: mySubnets, + expectedError: nil, + }, + { + name: "both domain types match", + domainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x4}, + nextDomainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x4}, + subnets: mySubnets, + expectedError: nil, }, { name: "missing subnets", @@ -112,6 +140,10 @@ func TestCheckPeer(t *testing.T) { err := records.SetDomainTypeEntry(localNode, records.KeyDomainType, *test.domainType) require.NoError(t, err) } + if test.nextDomainType != nil { + err := records.SetDomainTypeEntry(localNode, records.KeyNextDomainType, *test.nextDomainType) + require.NoError(t, err) + } if test.subnets != nil { err := records.SetSubnetsEntry(localNode, test.subnets) require.NoError(t, err) @@ -147,11 +179,12 @@ func TestCheckPeer(t *testing.T) { } type checkPeerTest struct { - name string - domainType *spectypes.DomainType - subnets []byte - localNode *enode.LocalNode - expectedError error + name string + domainType *spectypes.DomainType + nextDomainType *spectypes.DomainType + subnets []byte + localNode *enode.LocalNode + expectedError error } func mockSubnets(active ...int) []byte { diff --git a/network/discovery/local_service.go b/network/discovery/local_service.go index eb53f3571d..5ad0a94196 100644 --- a/network/discovery/local_service.go +++ b/network/discovery/local_service.go @@ -94,15 +94,19 @@ func (md *localDiscovery) FindPeers(ctx context.Context, ns string, opt ...disco } // RegisterSubnets implements Service -func (md *localDiscovery) RegisterSubnets(logger *zap.Logger, subnets ...int) error { +func (md *localDiscovery) RegisterSubnets(logger *zap.Logger, subnets ...int) (updated bool, err error) { // TODO - return nil + return false, nil } // DeregisterSubnets implements Service -func (md *localDiscovery) DeregisterSubnets(logger *zap.Logger, subnets ...int) error { +func (md *localDiscovery) DeregisterSubnets(logger *zap.Logger, subnets ...int) (updated bool, err error) { + // TODO + return false, nil +} + +func (md *localDiscovery) PublishENR(logger *zap.Logger) { // TODO - return nil } // discoveryNotifee gets notified when we find a new peer via mDNS discovery diff --git a/network/discovery/service.go b/network/discovery/service.go index 2f3640b9d0..51d131f329 100644 --- a/network/discovery/service.go +++ b/network/discovery/service.go @@ -49,9 +49,10 @@ type Options struct { type Service interface { discovery.Discovery io.Closer - RegisterSubnets(logger *zap.Logger, subnets ...int) error - DeregisterSubnets(logger *zap.Logger, subnets ...int) error + RegisterSubnets(logger *zap.Logger, subnets ...int) (updated bool, err error) + DeregisterSubnets(logger *zap.Logger, subnets ...int) (updated bool, err error) Bootstrap(logger *zap.Logger, handler HandleNewPeer) error + PublishENR(logger *zap.Logger) } // NewService creates new discovery.Service diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 923dbe2f0e..5408a2ec24 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -4,11 +4,12 @@ import ( "context" "errors" "fmt" - ma "github.com/multiformats/go-multiaddr" "strings" "sync/atomic" "time" + ma "github.com/multiformats/go-multiaddr" + "github.com/cornelk/hashmap" "github.com/libp2p/go-libp2p/core/connmgr" connmgrcore "github.com/libp2p/go-libp2p/core/connmgr" @@ -380,16 +381,20 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { return self }) + // Register/unregister subnets for discovery. var errs error + var hasAdded, hasRemoved bool if len(addedSubnets) > 0 { - err := n.disc.RegisterSubnets(logger.Named(logging.NameDiscoveryService), addedSubnets...) + var err error + hasAdded, err = n.disc.RegisterSubnets(logger.Named(logging.NameDiscoveryService), addedSubnets...) if err != nil { logger.Debug("could not register subnets", zap.Error(err)) errs = errors.Join(errs, err) } } if len(removedSubnets) > 0 { - err := n.disc.DeregisterSubnets(logger.Named(logging.NameDiscoveryService), removedSubnets...) + var err error + hasRemoved, err = n.disc.DeregisterSubnets(logger.Named(logging.NameDiscoveryService), removedSubnets...) if err != nil { logger.Debug("could not unregister subnets", zap.Error(err)) errs = errors.Join(errs, err) @@ -405,6 +410,9 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) { } } } + if hasAdded || hasRemoved { + go n.disc.PublishENR(logger.Named(logging.NameDiscoveryService)) + } allSubs, _ := records.Subnets{}.FromString(records.AllSubnets) subnetsList := records.SharedSubnets(allSubs, n.activeSubnets, 0) diff --git a/network/p2p/p2p_setup.go b/network/p2p/p2p_setup.go index 586865a183..24ae84b996 100644 --- a/network/p2p/p2p_setup.go +++ b/network/p2p/p2p_setup.go @@ -248,7 +248,9 @@ func (n *p2pNetwork) setupDiscovery(logger *zap.Logger) error { discV5Opts.Subnets = n.fixedSubnets logger = logger.With(zap.String("subnets", records.Subnets(n.fixedSubnets).String())) } - logger.Info("discovery: using discv5", zap.Strings("bootnodes", discV5Opts.Bootnodes)) + logger.Info("discovery: using discv5", + zap.Strings("bootnodes", discV5Opts.Bootnodes), + zap.String("ip", discV5Opts.IP)) } else { logger.Info("discovery: using mdns (local)") } diff --git a/networkconfig/holesky-stage.go b/networkconfig/holesky-stage.go index 11784553b3..7c994e784b 100644 --- a/networkconfig/holesky-stage.go +++ b/networkconfig/holesky-stage.go @@ -18,6 +18,10 @@ var HoleskyStage = NetworkConfig{ RegistryContractAddr: "0x0d33801785340072C452b994496B19f196b7eE15", AlanForkEpoch: 99999999, Bootnodes: []string{ - "enr:-Ja4QDYHVgUs9NvlMqq93ot6VNqbmrIlMrwKnq4X3DPRgyUNB4ospDp8ubMvsf-KsgqY8rzpZKy4GbE1DLphabpRBc-GAY_diLjngmlkgnY0gmlwhDQrLYqJc2VjcDI1NmsxoQKnAiuSlgSR8asjCH0aYoVKM8uPbi4noFuFHZHaAHqknYNzc3YBg3RjcIITiYN1ZHCCD6E", + // Public bootnode: + // "enr:-Ja4QDYHVgUs9NvlMqq93ot6VNqbmrIlMrwKnq4X3DPRgyUNB4ospDp8ubMvsf-KsgqY8rzpZKy4GbE1DLphabpRBc-GAY_diLjngmlkgnY0gmlwhDQrLYqJc2VjcDI1NmsxoQKnAiuSlgSR8asjCH0aYoVKM8uPbi4noFuFHZHaAHqknYNzc3YBg3RjcIITiYN1ZHCCD6E", + + // Private bootnode: + "enr:-Ja4QEF4O52Pl9pxfF1y_gBzmtp9s2-ncWoN2-VPebEvjibaGJH7VpYGZKdCVws_gIAkjjatn67rXhGuItCfcp2kv4SGAZGOmChHgmlkgnY0gmlwhAoqckWJc2VjcDI1NmsxoQP_bBE-ZYvaXKBR3dRYMN5K_lZP-q-YsBzDZEtxH_4T_YNzc3YBg3RjcIITioN1ZHCCD6I", }, } diff --git a/operator/duties/attester.go b/operator/duties/attester.go index 8daa8f09ec..cc3971f9ea 100644 --- a/operator/duties/attester.go +++ b/operator/duties/attester.go @@ -90,7 +90,7 @@ func (h *AttesterHandler) HandleDuties(ctx context.Context) { // If we have reached the mid-point of the epoch, fetch the duties for the next epoch in the next slot. // This allows us to set them up at a time when the beacon node should be less busy. - if uint64(slot)%slotsPerEpoch == slotsPerEpoch/2-2 { + if uint64(slot)%slotsPerEpoch == slotsPerEpoch/2-1 { h.fetchNextEpoch = true } diff --git a/operator/duties/attester_genesis_test.go b/operator/duties/attester_genesis_test.go index 5ec5af2913..317784e724 100644 --- a/operator/duties/attester_genesis_test.go +++ b/operator/duties/attester_genesis_test.go @@ -645,7 +645,7 @@ func TestScheduler_Attester_Genesis_Reorg_Current(t *testing.T) { waitForDuties = &SafeValue[bool]{} forkEpoch = goclient.FarFutureEpoch ) - currentSlot.Set(phase0.Slot(47)) + currentSlot.Set(phase0.Slot(48)) scheduler, logger, mockTicker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{handler}, currentSlot, forkEpoch) fetchDutiesCall, executeDutiesCall := setupAttesterGenesisDutiesMock(scheduler, dutiesMap, waitForDuties) startFn() @@ -674,7 +674,7 @@ func TestScheduler_Attester_Genesis_Reorg_Current(t *testing.T) { waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action - currentSlot.Set(phase0.Slot(48)) + currentSlot.Set(phase0.Slot(49)) mockTicker.Send(currentSlot.Get()) waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) @@ -696,12 +696,12 @@ func TestScheduler_Attester_Genesis_Reorg_Current(t *testing.T) { waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 5: wait for attester duties to be fetched again for the current epoch - currentSlot.Set(phase0.Slot(49)) + currentSlot.Set(phase0.Slot(50)) mockTicker.Send(currentSlot.Get()) waitForGenesisDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 6: skip to the next epoch - currentSlot.Set(phase0.Slot(50)) + currentSlot.Set(phase0.Slot(51)) for slot := currentSlot.Get(); slot < 64; slot++ { mockTicker.Send(slot) waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) @@ -736,7 +736,7 @@ func TestScheduler_Attester_Genesis_Reorg_Current_Indices_Changed(t *testing.T) waitForDuties = &SafeValue[bool]{} forkEpoch = goclient.FarFutureEpoch ) - currentSlot.Set(phase0.Slot(47)) + currentSlot.Set(phase0.Slot(48)) scheduler, logger, mockTicker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{handler}, currentSlot, forkEpoch) fetchDutiesCall, executeDutiesCall := setupAttesterGenesisDutiesMock(scheduler, dutiesMap, waitForDuties) startFn() @@ -765,7 +765,7 @@ func TestScheduler_Attester_Genesis_Reorg_Current_Indices_Changed(t *testing.T) waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action - currentSlot.Set(phase0.Slot(48)) + currentSlot.Set(phase0.Slot(49)) mockTicker.Send(currentSlot.Get()) waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) @@ -797,12 +797,12 @@ func TestScheduler_Attester_Genesis_Reorg_Current_Indices_Changed(t *testing.T) waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 6: wait for attester duties to be fetched again for the next epoch due to indices change - currentSlot.Set(phase0.Slot(49)) + currentSlot.Set(phase0.Slot(50)) mockTicker.Send(currentSlot.Get()) waitForGenesisDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 7: skip to the next epoch - currentSlot.Set(phase0.Slot(50)) + currentSlot.Set(phase0.Slot(51)) for slot := currentSlot.Get(); slot < 64; slot++ { mockTicker.Send(slot) waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) diff --git a/operator/duties/attester_test.go b/operator/duties/attester_test.go index acd6f26eae..54d4e2632d 100644 --- a/operator/duties/attester_test.go +++ b/operator/duties/attester_test.go @@ -639,7 +639,7 @@ func TestScheduler_Attester_Reorg_Current(t *testing.T) { waitForDuties = &SafeValue[bool]{} forkEpoch = phase0.Epoch(0) ) - currentSlot.Set(phase0.Slot(47)) + currentSlot.Set(phase0.Slot(48)) scheduler, logger, mockTicker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{handler}, currentSlot, forkEpoch) fetchDutiesCall, executeDutiesCall := setupAttesterDutiesMock(scheduler, dutiesMap, waitForDuties) startFn() @@ -668,7 +668,7 @@ func TestScheduler_Attester_Reorg_Current(t *testing.T) { waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action - currentSlot.Set(phase0.Slot(48)) + currentSlot.Set(phase0.Slot(49)) mockTicker.Send(currentSlot.Get()) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) @@ -690,12 +690,12 @@ func TestScheduler_Attester_Reorg_Current(t *testing.T) { waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 5: wait for attester duties to be fetched again for the current epoch - currentSlot.Set(phase0.Slot(49)) + currentSlot.Set(phase0.Slot(50)) mockTicker.Send(currentSlot.Get()) waitForDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 6: skip to the next epoch - currentSlot.Set(phase0.Slot(50)) + currentSlot.Set(phase0.Slot(51)) for slot := currentSlot.Get(); slot < 64; slot++ { mockTicker.Send(slot) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) @@ -730,7 +730,7 @@ func TestScheduler_Attester_Reorg_Current_Indices_Changed(t *testing.T) { waitForDuties = &SafeValue[bool]{} forkEpoch = phase0.Epoch(0) ) - currentSlot.Set(phase0.Slot(47)) + currentSlot.Set(phase0.Slot(48)) scheduler, logger, mockTicker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{handler}, currentSlot, forkEpoch) fetchDutiesCall, executeDutiesCall := setupAttesterDutiesMock(scheduler, dutiesMap, waitForDuties) startFn() @@ -759,7 +759,7 @@ func TestScheduler_Attester_Reorg_Current_Indices_Changed(t *testing.T) { waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 3: Ticker with no action - currentSlot.Set(phase0.Slot(48)) + currentSlot.Set(phase0.Slot(49)) mockTicker.Send(currentSlot.Get()) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) @@ -791,12 +791,12 @@ func TestScheduler_Attester_Reorg_Current_Indices_Changed(t *testing.T) { waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 6: wait for attester duties to be fetched again for the next epoch due to indices change - currentSlot.Set(phase0.Slot(49)) + currentSlot.Set(phase0.Slot(50)) mockTicker.Send(currentSlot.Get()) waitForDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // STEP 7: skip to the next epoch - currentSlot.Set(phase0.Slot(50)) + currentSlot.Set(phase0.Slot(51)) for slot := currentSlot.Get(); slot < 64; slot++ { mockTicker.Send(slot) waitForNoAction(t, logger, fetchDutiesCall, executeDutiesCall, timeout) diff --git a/operator/duties/committee_test.go b/operator/duties/committee_test.go index 23bc80586e..2b97992f6a 100644 --- a/operator/duties/committee_test.go +++ b/operator/duties/committee_test.go @@ -900,7 +900,7 @@ func TestScheduler_Committee_Early_Block(t *testing.T) { t.Skip("TODO") } -func TestScheduler_Committee_Fork_Attester_only(t *testing.T) { +func TestScheduler_Committee_On_Fork_Attester_only(t *testing.T) { var ( dutyStore = dutystore.New() attHandler = NewAttesterHandler(dutyStore.Attester) @@ -949,20 +949,22 @@ func TestScheduler_Committee_Fork_Attester_only(t *testing.T) { startTime := time.Now() ticker.Send(currentSlot.Get()) waitForGenesisDutiesExecution(t, logger, fetchAttesterDutiesCall, executeAttesterDutiesCall, timeout, aExpected) + waitForNoActionCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout) // validate the 1/3 of the slot waiting time require.Less(t, scheduler.network.Beacon.SlotDurationSec()/3, time.Since(startTime)) // skip to the next epoch currentSlot.Set(phase0.Slot(2)) - for slot := currentSlot.Get(); slot < 47; slot++ { + for slot := currentSlot.Get(); slot < 48; slot++ { ticker.Send(slot) waitForNoActionGenesis(t, logger, fetchAttesterDutiesCall, executeAttesterDutiesCall, timeout) + waitForNoActionCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout) currentSlot.Set(slot + 1) } // wait for duties to be fetched for the next fork epoch - currentSlot.Set(phase0.Slot(47)) + currentSlot.Set(phase0.Slot(48)) waitForDuties.Set(true) ticker.Send(currentSlot.Get()) waitForGenesisDutiesFetch(t, logger, fetchAttesterDutiesCall, executeAttesterDutiesCall, timeout) @@ -975,6 +977,7 @@ func TestScheduler_Committee_Fork_Attester_only(t *testing.T) { startTime = time.Now() ticker.Send(currentSlot.Get()) waitForDutiesExecutionCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout, committeeMap) + waitForNoActionGenesis(t, logger, fetchAttesterDutiesCall, executeAttesterDutiesCall, timeout) // validate the 1/3 of the slot waiting time require.Less(t, scheduler.network.Beacon.SlotDurationSec()/3, time.Since(startTime)) @@ -983,3 +986,89 @@ func TestScheduler_Committee_Fork_Attester_only(t *testing.T) { cancel() require.NoError(t, schedulerPool.Wait()) } + +func TestScheduler_Committee_On_Fork(t *testing.T) { + var ( + dutyStore = dutystore.New() + attHandler = NewAttesterHandler(dutyStore.Attester) + syncHandler = NewSyncCommitteeHandler(dutyStore.SyncCommittee) + commHandler = NewCommitteeHandler(dutyStore.Attester, dutyStore.SyncCommittee) + alanForkEpoch = phase0.Epoch(256) + currentSlot = &SafeValue[phase0.Slot]{} + waitForDuties = &SafeValue[bool]{} + attDuties = hashmap.New[phase0.Epoch, []*eth2apiv1.AttesterDuty]() + syncDuties = hashmap.New[uint64, []*eth2apiv1.SyncCommitteeDuty]() + activeShares = []*ssvtypes.SSVShare{{ + Share: spectypes.Share{ + Committee: []*spectypes.ShareMember{ + {Signer: 1}, {Signer: 2}, {Signer: 3}, {Signer: 4}, + }, + ValidatorIndex: 1, + }, + }} + ) + + lastPeriodEpoch := phase0.Epoch(256 - 1) + attDuties.Set(lastPeriodEpoch, []*eth2apiv1.AttesterDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + Slot: phase0.Slot(lastPeriodEpoch*32 + 1), + ValidatorIndex: phase0.ValidatorIndex(1), + }, + }) + attDuties.Set(lastPeriodEpoch+1, []*eth2apiv1.AttesterDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + Slot: phase0.Slot((lastPeriodEpoch + 1) * 32), + ValidatorIndex: phase0.ValidatorIndex(1), + }, + }) + syncDuties.Set(1, []*eth2apiv1.SyncCommitteeDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + ValidatorIndex: phase0.ValidatorIndex(1), + }, + }) + + currentSlot.Set(phase0.Slot(lastPeriodEpoch * 32)) + scheduler, logger, ticker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{attHandler, syncHandler, commHandler}, currentSlot, alanForkEpoch) + fetchAttesterDutiesCall, executeAttesterDutiesCall := setupAttesterGenesisDutiesMock(scheduler, attDuties, waitForDuties) + _, _ = setupSyncCommitteeDutiesMock(scheduler, activeShares, syncDuties, waitForDuties) + fetchDutiesCall, executeDutiesCall := setupCommitteeDutiesMock(scheduler, activeShares, attDuties, syncDuties, waitForDuties) + startFn() + + aDuties, _ := attDuties.Get(lastPeriodEpoch) + aExpected := expectedExecutedGenesisAttesterDuties(attHandler, aDuties) + setExecuteGenesisDutyFunc(scheduler, executeAttesterDutiesCall, len(aExpected)) + + ticker.Send(currentSlot.Get()) + waitForNoActionGenesis(t, logger, fetchAttesterDutiesCall, executeAttesterDutiesCall, timeout) + waitForNoActionCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + currentSlot.Set(phase0.Slot(lastPeriodEpoch*32 + 1)) + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, fetchAttesterDutiesCall, executeAttesterDutiesCall, timeout, aExpected) + waitForNoActionCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + currentSlot.Set(phase0.Slot(lastPeriodEpoch*32 + 2)) + for slot := currentSlot.Get(); slot < 256*32; slot++ { + ticker.Send(slot) + waitForNoActionGenesis(t, logger, fetchAttesterDutiesCall, executeAttesterDutiesCall, timeout) + waitForNoActionCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + currentSlot.Set(slot + 1) + } + + currentSlot.Set(phase0.Slot(256 * 32)) + aDuties, _ = attDuties.Get(lastPeriodEpoch + 1) + sDuties, _ := syncDuties.Get(1) + committeeMap := commHandler.buildCommitteeDuties(aDuties, sDuties, 256, currentSlot.Get()) + setExecuteDutyFuncs(scheduler, executeDutiesCall, len(committeeMap)) + + ticker.Send(currentSlot.Get()) + waitForDutiesExecutionCommittee(t, logger, fetchDutiesCall, executeDutiesCall, timeout, committeeMap) + waitForNoActionGenesis(t, logger, fetchAttesterDutiesCall, executeAttesterDutiesCall, timeout) + + // Stop scheduler & wait for graceful exit. + cancel() + require.NoError(t, schedulerPool.Wait()) +} diff --git a/operator/duties/proposer.go b/operator/duties/proposer.go index 549f3820c8..8056737a3c 100644 --- a/operator/duties/proposer.go +++ b/operator/duties/proposer.go @@ -3,9 +3,10 @@ package duties import ( "context" "fmt" - genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types" "time" + genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types" + eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/phase0" spectypes "github.com/ssvlabs/ssv-spec/types" @@ -135,7 +136,7 @@ func (h *ProposerHandler) processExecution(epoch phase0.Epoch, slot phase0.Slot) return } - if !h.network.PastAlanForkAtEpoch(h.network.Beacon.EstimatedEpochAtSlot(slot)) { + if !h.network.PastAlanForkAtEpoch(epoch) { toExecute := make([]*genesisspectypes.Duty, 0, len(duties)) for _, d := range duties { if h.shouldExecute(d) { diff --git a/operator/duties/proposer_genesis_test.go b/operator/duties/proposer_genesis_test.go new file mode 100644 index 0000000000..60e8d9ba42 --- /dev/null +++ b/operator/duties/proposer_genesis_test.go @@ -0,0 +1,455 @@ +package duties + +import ( + "context" + "testing" + + eth2apiv1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/cornelk/hashmap" + genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + + spectypes "github.com/ssvlabs/ssv-spec/types" + + "github.com/ssvlabs/ssv/beacon/goclient" + "github.com/ssvlabs/ssv/operator/duties/dutystore" + "github.com/ssvlabs/ssv/protocol/v2/types" +) + +func setupProposerGenesisDutiesMock(s *Scheduler, dutiesMap *hashmap.Map[phase0.Epoch, []*eth2apiv1.ProposerDuty]) (chan struct{}, chan []*genesisspectypes.Duty) { + fetchDutiesCall := make(chan struct{}) + executeDutiesCall := make(chan []*genesisspectypes.Duty) + + s.beaconNode.(*MockBeaconNode).EXPECT().ProposerDuties(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, epoch phase0.Epoch, indices []phase0.ValidatorIndex) ([]*eth2apiv1.ProposerDuty, error) { + fetchDutiesCall <- struct{}{} + duties, _ := dutiesMap.Get(epoch) + return duties, nil + }).AnyTimes() + + getShares := func(epoch phase0.Epoch) []*types.SSVShare { + uniqueIndices := make(map[phase0.ValidatorIndex]bool) + + duties, _ := dutiesMap.Get(epoch) + for _, d := range duties { + uniqueIndices[d.ValidatorIndex] = true + } + + shares := make([]*types.SSVShare, 0, len(uniqueIndices)) + for index := range uniqueIndices { + share := &types.SSVShare{ + Share: spectypes.Share{ + ValidatorIndex: index, + }, + } + shares = append(shares, share) + } + + return shares + } + + s.validatorProvider.(*MockValidatorProvider).EXPECT().SelfParticipatingValidators(gomock.Any()).DoAndReturn(getShares).AnyTimes() + s.validatorProvider.(*MockValidatorProvider).EXPECT().ParticipatingValidators(gomock.Any()).DoAndReturn(getShares).AnyTimes() + + return fetchDutiesCall, executeDutiesCall +} + +func expectedExecutedGenesisProposerDuties(handler *ProposerHandler, duties []*eth2apiv1.ProposerDuty) []*genesisspectypes.Duty { + expectedDuties := make([]*genesisspectypes.Duty, 0) + for _, d := range duties { + expectedDuties = append(expectedDuties, handler.toGenesisSpecDuty(d, genesisspectypes.BNRoleProposer)) + } + return expectedDuties +} + +func TestScheduler_Proposer_Genesis_Same_Slot(t *testing.T) { + var ( + handler = NewProposerHandler(dutystore.NewDuties[eth2apiv1.ProposerDuty]()) + currentSlot = &SafeValue[phase0.Slot]{} + dutiesMap = hashmap.New[phase0.Epoch, []*eth2apiv1.ProposerDuty]() + ) + currentSlot.Set(phase0.Slot(0)) + scheduler, logger, ticker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{handler}, currentSlot, goclient.FarFutureEpoch) + fetchDutiesCall, executeDutiesCall := setupProposerGenesisDutiesMock(scheduler, dutiesMap) + startFn() + + dutiesMap.Set(phase0.Epoch(0), []*eth2apiv1.ProposerDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + Slot: phase0.Slot(0), + ValidatorIndex: phase0.ValidatorIndex(1), + }, + }) + + // STEP 1: wait for proposer duties to be fetched and executed at the same slot + duties, _ := dutiesMap.Get(phase0.Epoch(0)) + expected := expectedExecutedGenesisProposerDuties(handler, duties) + setExecuteGenesisDutyFunc(scheduler, executeDutiesCall, len(expected)) + + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + waitForGenesisDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected) + + // Stop scheduler & wait for graceful exit. + cancel() + require.NoError(t, schedulerPool.Wait()) +} + +func TestScheduler_Proposer_Genesis_Diff_Slots(t *testing.T) { + var ( + handler = NewProposerHandler(dutystore.NewDuties[eth2apiv1.ProposerDuty]()) + currentSlot = &SafeValue[phase0.Slot]{} + dutiesMap = hashmap.New[phase0.Epoch, []*eth2apiv1.ProposerDuty]() + ) + currentSlot.Set(phase0.Slot(0)) + scheduler, logger, ticker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{handler}, currentSlot, goclient.FarFutureEpoch) + fetchDutiesCall, executeDutiesCall := setupProposerGenesisDutiesMock(scheduler, dutiesMap) + startFn() + + dutiesMap.Set(phase0.Epoch(0), []*eth2apiv1.ProposerDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + Slot: phase0.Slot(2), + ValidatorIndex: phase0.ValidatorIndex(1), + }, + }) + + // STEP 1: wait for proposer duties to be fetched + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 2: wait for no action to be taken + currentSlot.Set(phase0.Slot(1)) + ticker.Send(currentSlot.Get()) + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 3: wait for proposer duties to be executed + currentSlot.Set(phase0.Slot(2)) + duties, _ := dutiesMap.Get(phase0.Epoch(0)) + expected := expectedExecutedGenesisProposerDuties(handler, duties) + setExecuteGenesisDutyFunc(scheduler, executeDutiesCall, len(expected)) + + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected) + + // Stop scheduler & wait for graceful exit. + cancel() + require.NoError(t, schedulerPool.Wait()) +} + +// execute duty after two slots after the indices changed +func TestScheduler_Proposer_Genesis_Indices_Changed(t *testing.T) { + var ( + handler = NewProposerHandler(dutystore.NewDuties[eth2apiv1.ProposerDuty]()) + currentSlot = &SafeValue[phase0.Slot]{} + dutiesMap = hashmap.New[phase0.Epoch, []*eth2apiv1.ProposerDuty]() + ) + currentSlot.Set(phase0.Slot(0)) + scheduler, logger, ticker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{handler}, currentSlot, goclient.FarFutureEpoch) + fetchDutiesCall, executeDutiesCall := setupProposerGenesisDutiesMock(scheduler, dutiesMap) + startFn() + + // STEP 1: wait for no action to be taken + ticker.Send(currentSlot.Get()) + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 2: wait for no action to be taken + currentSlot.Set(phase0.Slot(1)) + ticker.Send(currentSlot.Get()) + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 3: trigger a change in active indices + scheduler.indicesChg <- struct{}{} + dutiesMap.Set(phase0.Epoch(0), []*eth2apiv1.ProposerDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + Slot: phase0.Slot(1), + ValidatorIndex: phase0.ValidatorIndex(1), + }, + { + PubKey: phase0.BLSPubKey{1, 2, 4}, + Slot: phase0.Slot(2), + ValidatorIndex: phase0.ValidatorIndex(2), + }, + { + PubKey: phase0.BLSPubKey{1, 2, 5}, + Slot: phase0.Slot(3), + ValidatorIndex: phase0.ValidatorIndex(3), + }, + }) + // no execution should happen in slot 1 + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 4: wait for proposer duties to be fetched again + currentSlot.Set(phase0.Slot(2)) + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + // no execution should happen in slot 2 + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 4: wait for proposer duties to be executed + currentSlot.Set(phase0.Slot(3)) + duties, _ := dutiesMap.Get(phase0.Epoch(0)) + expected := expectedExecutedGenesisProposerDuties(handler, []*eth2apiv1.ProposerDuty{duties[2]}) + setExecuteGenesisDutyFunc(scheduler, executeDutiesCall, len(expected)) + + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected) + + // Stop scheduler & wait for graceful exit. + cancel() + require.NoError(t, schedulerPool.Wait()) +} + +func TestScheduler_Proposer_Genesis_Multiple_Indices_Changed_Same_Slot(t *testing.T) { + var ( + handler = NewProposerHandler(dutystore.NewDuties[eth2apiv1.ProposerDuty]()) + currentSlot = &SafeValue[phase0.Slot]{} + dutiesMap = hashmap.New[phase0.Epoch, []*eth2apiv1.ProposerDuty]() + ) + currentSlot.Set(phase0.Slot(0)) + scheduler, logger, ticker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{handler}, currentSlot, goclient.FarFutureEpoch) + fetchDutiesCall, executeDutiesCall := setupProposerGenesisDutiesMock(scheduler, dutiesMap) + startFn() + + dutiesMap.Set(phase0.Epoch(0), []*eth2apiv1.ProposerDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + Slot: phase0.Slot(2), + ValidatorIndex: phase0.ValidatorIndex(1), + }, + }) + + // STEP 1: wait for proposer duties to be fetched + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 2: trigger a change in active indices + scheduler.indicesChg <- struct{}{} + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + duties, _ := dutiesMap.Get(phase0.Epoch(0)) + dutiesMap.Set(phase0.Epoch(0), append(duties, ð2apiv1.ProposerDuty{ + PubKey: phase0.BLSPubKey{1, 2, 4}, + Slot: phase0.Slot(3), + ValidatorIndex: phase0.ValidatorIndex(2), + })) + + // STEP 3: trigger a change in active indices in the same slot + scheduler.indicesChg <- struct{}{} + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + duties, _ = dutiesMap.Get(phase0.Epoch(0)) + dutiesMap.Set(phase0.Epoch(0), append(duties, ð2apiv1.ProposerDuty{ + PubKey: phase0.BLSPubKey{1, 2, 5}, + Slot: phase0.Slot(4), + ValidatorIndex: phase0.ValidatorIndex(3), + })) + + // STEP 4: wait for proposer duties to be fetched again + currentSlot.Set(phase0.Slot(1)) + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 5: wait for proposer duties to be executed + currentSlot.Set(phase0.Slot(2)) + duties, _ = dutiesMap.Get(phase0.Epoch(0)) + expected := expectedExecutedGenesisProposerDuties(handler, []*eth2apiv1.ProposerDuty{duties[0]}) + setExecuteGenesisDutyFunc(scheduler, executeDutiesCall, len(expected)) + + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected) + + // STEP 6: wait for proposer duties to be executed + currentSlot.Set(phase0.Slot(3)) + duties, _ = dutiesMap.Get(phase0.Epoch(0)) + expected = expectedExecutedGenesisProposerDuties(handler, []*eth2apiv1.ProposerDuty{duties[1]}) + setExecuteGenesisDutyFunc(scheduler, executeDutiesCall, len(expected)) + + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected) + + // STEP 7: wait for proposer duties to be executed + currentSlot.Set(phase0.Slot(4)) + duties, _ = dutiesMap.Get(phase0.Epoch(0)) + expected = expectedExecutedGenesisProposerDuties(handler, []*eth2apiv1.ProposerDuty{duties[2]}) + setExecuteGenesisDutyFunc(scheduler, executeDutiesCall, len(expected)) + + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected) + + // Stop scheduler & wait for graceful exit. + cancel() + require.NoError(t, schedulerPool.Wait()) +} + +// reorg current dependent root changed +func TestScheduler_Proposer_Genesis_Reorg_Current(t *testing.T) { + var ( + handler = NewProposerHandler(dutystore.NewDuties[eth2apiv1.ProposerDuty]()) + currentSlot = &SafeValue[phase0.Slot]{} + dutiesMap = hashmap.New[phase0.Epoch, []*eth2apiv1.ProposerDuty]() + ) + currentSlot.Set(phase0.Slot(34)) + scheduler, logger, ticker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{handler}, currentSlot, goclient.FarFutureEpoch) + fetchDutiesCall, executeDutiesCall := setupProposerGenesisDutiesMock(scheduler, dutiesMap) + startFn() + + dutiesMap.Set(phase0.Epoch(1), []*eth2apiv1.ProposerDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + Slot: phase0.Slot(36), + ValidatorIndex: phase0.ValidatorIndex(1), + }, + }) + + // STEP 1: wait for proposer duties to be fetched + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 2: trigger head event + e := ð2apiv1.Event{ + Data: ð2apiv1.HeadEvent{ + Slot: currentSlot.Get(), + CurrentDutyDependentRoot: phase0.Root{0x01}, + }, + } + scheduler.HandleHeadEvent(logger)(e) + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 3: Ticker with no action + currentSlot.Set(phase0.Slot(35)) + ticker.Send(currentSlot.Get()) + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 4: trigger reorg + e = ð2apiv1.Event{ + Data: ð2apiv1.HeadEvent{ + Slot: currentSlot.Get(), + CurrentDutyDependentRoot: phase0.Root{0x02}, + }, + } + dutiesMap.Set(phase0.Epoch(1), []*eth2apiv1.ProposerDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + Slot: phase0.Slot(37), + ValidatorIndex: phase0.ValidatorIndex(1), + }, + }) + scheduler.HandleHeadEvent(logger)(e) + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 5: wait for proposer duties to be fetched again for the current epoch. + // The first assigned duty should not be executed + currentSlot.Set(phase0.Slot(36)) + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 7: The second assigned duty should be executed + currentSlot.Set(phase0.Slot(37)) + duties, _ := dutiesMap.Get(phase0.Epoch(1)) + expected := expectedExecutedGenesisProposerDuties(handler, duties) + setExecuteGenesisDutyFunc(scheduler, executeDutiesCall, len(expected)) + + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected) + + // Stop scheduler & wait for graceful exit. + cancel() + require.NoError(t, schedulerPool.Wait()) +} + +// reorg current dependent root changed +func TestScheduler_Proposer_Genesis_Reorg_Current_Indices_Changed(t *testing.T) { + var ( + handler = NewProposerHandler(dutystore.NewDuties[eth2apiv1.ProposerDuty]()) + currentSlot = &SafeValue[phase0.Slot]{} + dutiesMap = hashmap.New[phase0.Epoch, []*eth2apiv1.ProposerDuty]() + ) + currentSlot.Set(phase0.Slot(34)) + scheduler, logger, ticker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{handler}, currentSlot, goclient.FarFutureEpoch) + fetchDutiesCall, executeDutiesCall := setupProposerGenesisDutiesMock(scheduler, dutiesMap) + startFn() + + dutiesMap.Set(phase0.Epoch(1), []*eth2apiv1.ProposerDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + Slot: phase0.Slot(36), + ValidatorIndex: phase0.ValidatorIndex(1), + }, + }) + + // STEP 1: wait for proposer duties to be fetched + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 2: trigger head event + e := ð2apiv1.Event{ + Data: ð2apiv1.HeadEvent{ + Slot: currentSlot.Get(), + CurrentDutyDependentRoot: phase0.Root{0x01}, + }, + } + scheduler.HandleHeadEvent(logger)(e) + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 3: Ticker with no action + currentSlot.Set(phase0.Slot(35)) + ticker.Send(currentSlot.Get()) + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 4: trigger reorg + e = ð2apiv1.Event{ + Data: ð2apiv1.HeadEvent{ + Slot: currentSlot.Get(), + CurrentDutyDependentRoot: phase0.Root{0x02}, + }, + } + dutiesMap.Set(phase0.Epoch(1), []*eth2apiv1.ProposerDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + Slot: phase0.Slot(37), + ValidatorIndex: phase0.ValidatorIndex(1), + }, + }) + scheduler.HandleHeadEvent(logger)(e) + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 5: trigger a change in active indices in the same slot + scheduler.indicesChg <- struct{}{} + duties, _ := dutiesMap.Get(phase0.Epoch(1)) + dutiesMap.Set(phase0.Epoch(1), append(duties, ð2apiv1.ProposerDuty{ + PubKey: phase0.BLSPubKey{1, 2, 4}, + Slot: phase0.Slot(38), + ValidatorIndex: phase0.ValidatorIndex(2), + })) + waitForNoActionGenesis(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 6: wait for proposer duties to be fetched again for the current epoch. + // The first assigned duty should not be executed + currentSlot.Set(phase0.Slot(36)) + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesFetch(t, logger, fetchDutiesCall, executeDutiesCall, timeout) + + // STEP 7: The second assigned duty should be executed + currentSlot.Set(phase0.Slot(37)) + duties, _ = dutiesMap.Get(phase0.Epoch(1)) + expected := expectedExecutedGenesisProposerDuties(handler, []*eth2apiv1.ProposerDuty{duties[0]}) + setExecuteGenesisDutyFunc(scheduler, executeDutiesCall, len(expected)) + + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected) + + // STEP 8: The second assigned duty should be executed + currentSlot.Set(phase0.Slot(38)) + duties, _ = dutiesMap.Get(phase0.Epoch(1)) + expected = expectedExecutedGenesisProposerDuties(handler, []*eth2apiv1.ProposerDuty{duties[1]}) + setExecuteGenesisDutyFunc(scheduler, executeDutiesCall, len(expected)) + + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected) + + // Stop scheduler & wait for graceful exit. + cancel() + require.NoError(t, schedulerPool.Wait()) +} diff --git a/operator/duties/proposer_test.go b/operator/duties/proposer_test.go index 7b38ef38e3..2959872b64 100644 --- a/operator/duties/proposer_test.go +++ b/operator/duties/proposer_test.go @@ -451,3 +451,74 @@ func TestScheduler_Proposer_Reorg_Current_Indices_Changed(t *testing.T) { cancel() require.NoError(t, schedulerPool.Wait()) } + +func TestScheduler_Proposer_On_Fork(t *testing.T) { + var ( + handler = NewProposerHandler(dutystore.NewDuties[eth2apiv1.ProposerDuty]()) + currentSlot = &SafeValue[phase0.Slot]{} + dutiesMap = hashmap.New[phase0.Epoch, []*eth2apiv1.ProposerDuty]() + alanForkEpoch = phase0.Epoch(1) + ) + currentSlot.Set(phase0.Slot(0)) + scheduler, logger, ticker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{handler}, currentSlot, alanForkEpoch) + fetchDutiesCallGenesis, executeDutiesCallGenesis := setupProposerGenesisDutiesMock(scheduler, dutiesMap) + _, executeDutiesCall := setupProposerDutiesMock(scheduler, dutiesMap) + startFn() + + dutiesMap.Set(phase0.Epoch(0), []*eth2apiv1.ProposerDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + Slot: phase0.Slot(2), + ValidatorIndex: phase0.ValidatorIndex(1), + }, + }) + dutiesMap.Set(phase0.Epoch(1), []*eth2apiv1.ProposerDuty{ + { + PubKey: phase0.BLSPubKey{1, 2, 3}, + Slot: phase0.Slot(32), + ValidatorIndex: phase0.ValidatorIndex(1), + }, + }) + + // STEP 1: wait for proposer genesis duties to be fetched + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesFetch(t, logger, fetchDutiesCallGenesis, executeDutiesCallGenesis, timeout) + + // STEP 2: wait for no action to be taken + currentSlot.Set(phase0.Slot(1)) + ticker.Send(currentSlot.Get()) + waitForNoActionGenesis(t, logger, fetchDutiesCallGenesis, executeDutiesCallGenesis, timeout) + + // STEP 3: wait for proposer duties to be executed + currentSlot.Set(phase0.Slot(2)) + duties, _ := dutiesMap.Get(phase0.Epoch(0)) + expectedGenesis := expectedExecutedGenesisProposerDuties(handler, duties) + setExecuteGenesisDutyFunc(scheduler, executeDutiesCallGenesis, len(expectedGenesis)) + + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, fetchDutiesCallGenesis, executeDutiesCallGenesis, timeout, expectedGenesis) + waitForNoAction(t, logger, fetchDutiesCallGenesis, executeDutiesCall, timeout) + + // skip to the next epoch + currentSlot.Set(phase0.Slot(3)) + for slot := currentSlot.Get(); slot < 32; slot++ { + ticker.Send(slot) + waitForNoActionGenesis(t, logger, fetchDutiesCallGenesis, executeDutiesCallGenesis, timeout) + currentSlot.Set(slot + 1) + } + + // fork epoch + currentSlot.Set(phase0.Slot(32)) + duties, _ = dutiesMap.Get(phase0.Epoch(1)) + expected := expectedExecutedProposerDuties(handler, duties) + setExecuteDutyFunc(scheduler, executeDutiesCall, len(expected)) + + ticker.Send(currentSlot.Get()) + waitForDutiesFetch(t, logger, fetchDutiesCallGenesis, executeDutiesCall, timeout) + waitForDutiesExecution(t, logger, fetchDutiesCallGenesis, executeDutiesCall, timeout, expected) + waitForNoActionGenesis(t, logger, fetchDutiesCallGenesis, executeDutiesCallGenesis, timeout) + + // Stop scheduler & wait for graceful exit. + cancel() + require.NoError(t, schedulerPool.Wait()) +} diff --git a/operator/duties/sync_committee.go b/operator/duties/sync_committee.go index 4c55c5498b..a596106fb6 100644 --- a/operator/duties/sync_committee.go +++ b/operator/duties/sync_committee.go @@ -16,16 +16,16 @@ import ( "github.com/ssvlabs/ssv/operator/duties/dutystore" ) -// syncCommitteePreparationEpochs is the number of epochs ahead of the sync committee -// period change at which to prepare the relevant duties. -var syncCommitteePreparationEpochs = uint64(2) - type SyncCommitteeHandler struct { baseHandler duties *dutystore.SyncCommitteeDuties fetchCurrentPeriod bool fetchNextPeriod bool + + // preparationSlots is the number of slots ahead of the sync committee + // period change at which to prepare the relevant duties. + preparationSlots uint64 } func NewSyncCommitteeHandler(duties *dutystore.SyncCommitteeDuties) *SyncCommitteeHandler { @@ -64,6 +64,10 @@ func (h *SyncCommitteeHandler) HandleDuties(ctx context.Context) { h.logger.Info("starting duty handler") defer h.logger.Info("duty handler exited") + // Prepare relevant duties 1.5 epochs (48 slots) ahead of the sync committee period change. + // The 1.5 epochs timing helps ensure setup occurs when the beacon node is likely less busy. + h.preparationSlots = h.network.Beacon.SlotsPerEpoch() * 3 / 2 + if h.shouldFetchNextPeriod(h.network.Beacon.EstimatedCurrentSlot()) { h.fetchNextPeriod = true } @@ -87,12 +91,9 @@ func (h *SyncCommitteeHandler) HandleDuties(ctx context.Context) { h.processFetching(ctx, period, true) cancel() - // If we have reached the mid-point of the epoch, fetch the duties for the next period in the next slot. - // This allows us to set them up at a time when the beacon node should be less busy. - epochsPerPeriod := h.network.Beacon.EpochsPerSyncCommitteePeriod() - if uint64(slot)%h.network.Beacon.SlotsPerEpoch() == h.network.Beacon.SlotsPerEpoch()/2-2 && - // Update the next period if we close to an EPOCHS_PER_SYNC_COMMITTEE_PERIOD boundary. - uint64(epoch)%epochsPerPeriod == epochsPerPeriod-syncCommitteePreparationEpochs { + // if we have reached the preparation slots -1, prepare the next period duties in the next slot. + periodSlots := h.slotsPerPeriod() + if uint64(slot)%periodSlots == periodSlots-h.preparationSlots-1 { h.fetchNextPeriod = true } @@ -317,8 +318,10 @@ func calculateSubscriptions(endEpoch phase0.Epoch, duties []*eth2apiv1.SyncCommi } func (h *SyncCommitteeHandler) shouldFetchNextPeriod(slot phase0.Slot) bool { - epochsPerPeriod := h.network.Beacon.EpochsPerSyncCommitteePeriod() - epoch := h.network.Beacon.EstimatedEpochAtSlot(slot) - return uint64(slot)%h.network.SlotsPerEpoch() >= h.network.SlotsPerEpoch()/2-1 && - uint64(epoch)%epochsPerPeriod >= epochsPerPeriod-syncCommitteePreparationEpochs + periodSlots := h.slotsPerPeriod() + return uint64(slot)%periodSlots > periodSlots-h.preparationSlots-2 +} + +func (h *SyncCommitteeHandler) slotsPerPeriod() uint64 { + return h.network.Beacon.EpochsPerSyncCommitteePeriod() * h.network.Beacon.SlotsPerEpoch() } diff --git a/operator/duties/validatorregistration.go b/operator/duties/validatorregistration.go index 7c1c286533..67974ca3e5 100644 --- a/operator/duties/validatorregistration.go +++ b/operator/duties/validatorregistration.go @@ -2,6 +2,7 @@ package duties import ( "context" + genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -50,7 +51,7 @@ func (h *ValidatorRegistrationHandler) HandleDuties(ctx context.Context) { pk := phase0.BLSPubKey{} copy(pk[:], share.ValidatorPubKey[:]) - if !h.network.PastAlanForkAtEpoch(h.network.Beacon.EstimatedEpochAtSlot(slot)) { + if !h.network.PastAlanForkAtEpoch(epoch) { h.dutiesExecutor.ExecuteGenesisDuties(h.logger, []*genesisspectypes.Duty{{ Type: genesisspectypes.BNRoleValidatorRegistration, ValidatorIndex: share.ValidatorIndex, diff --git a/operator/duties/voluntary_exit_genesis_test.go b/operator/duties/voluntary_exit_genesis_test.go new file mode 100644 index 0000000000..e8719d8922 --- /dev/null +++ b/operator/duties/voluntary_exit_genesis_test.go @@ -0,0 +1,148 @@ +package duties + +import ( + "testing" + + "github.com/attestantio/go-eth2-client/spec/phase0" + genesisspectypes "github.com/ssvlabs/ssv-spec-pre-cc/types" + "github.com/stretchr/testify/require" + + "github.com/ssvlabs/ssv/beacon/goclient" + "github.com/ssvlabs/ssv/operator/duties/dutystore" +) + +func TestVoluntaryExitHandler_HandleGenesisDuties(t *testing.T) { + exitCh := make(chan ExitDescriptor) + handler := NewVoluntaryExitHandler(dutystore.NewVoluntaryExit(), exitCh) + + currentSlot := &SafeValue[phase0.Slot]{} + currentSlot.Set(0) + + scheduler, logger, ticker, timeout, cancel, schedulerPool, startFn := setupSchedulerAndMocks(t, []dutyHandler{handler}, currentSlot, goclient.FarFutureEpoch) + startFn() + + blockByNumberCalls := create1to1BlockSlotMapping(scheduler) + assert1to1BlockSlotMapping(t, scheduler) + require.EqualValues(t, 1, blockByNumberCalls.Load()) + + executeDutiesCall := make(chan []*genesisspectypes.Duty) + setExecuteGenesisDutyFunc(scheduler, executeDutiesCall, 1) + + const blockNumber = uint64(1) + + normalExit := ExitDescriptor{ + OwnValidator: true, + PubKey: phase0.BLSPubKey{1, 2, 3}, + ValidatorIndex: phase0.ValidatorIndex(1), + BlockNumber: blockNumber, + } + sameBlockExit := ExitDescriptor{ + OwnValidator: true, + PubKey: phase0.BLSPubKey{4, 5, 6}, + ValidatorIndex: phase0.ValidatorIndex(2), + BlockNumber: normalExit.BlockNumber, + } + newBlockExit := ExitDescriptor{ + OwnValidator: true, + PubKey: phase0.BLSPubKey{1, 2, 3}, + ValidatorIndex: phase0.ValidatorIndex(1), + BlockNumber: normalExit.BlockNumber + 1, + } + pastBlockExit := ExitDescriptor{ + OwnValidator: true, + PubKey: phase0.BLSPubKey{1, 2, 3}, + ValidatorIndex: phase0.ValidatorIndex(1), + BlockNumber: normalExit.BlockNumber + 4, + } + + allDescriptors := []ExitDescriptor{ + normalExit, + sameBlockExit, + newBlockExit, + pastBlockExit, + } + + expectedDuties := expectedExecutedVoluntaryExitGenesisDuties(allDescriptors) + + require.EqualValues(t, 1, blockByNumberCalls.Load()) + exitCh <- normalExit + + t.Run("slot = 0, block = 1 - no execution", func(t *testing.T) { + currentSlot.Set(0) + ticker.Send(currentSlot.Get()) + waitForNoActionGenesis(t, logger, nil, executeDutiesCall, timeout) + require.EqualValues(t, 2, blockByNumberCalls.Load()) + }) + + t.Run("slot = 1, block = 1 - no execution", func(t *testing.T) { + currentSlot.Set(phase0.Slot(normalExit.BlockNumber)) + ticker.Send(currentSlot.Get()) + waitForNoActionGenesis(t, logger, nil, executeDutiesCall, timeout) + require.EqualValues(t, 2, blockByNumberCalls.Load()) + }) + + t.Run("slot = 4, block = 1 - no execution", func(t *testing.T) { + currentSlot.Set(phase0.Slot(normalExit.BlockNumber) + voluntaryExitSlotsToPostpone - 1) + ticker.Send(currentSlot.Get()) + waitForNoActionGenesis(t, logger, nil, executeDutiesCall, timeout) + require.EqualValues(t, 2, blockByNumberCalls.Load()) + }) + + t.Run("slot = 5, block = 1 - executing duty, fetching block number", func(t *testing.T) { + currentSlot.Set(phase0.Slot(normalExit.BlockNumber) + voluntaryExitSlotsToPostpone) + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, nil, executeDutiesCall, timeout, expectedDuties[:1]) + require.EqualValues(t, 2, blockByNumberCalls.Load()) + }) + + exitCh <- sameBlockExit + + t.Run("slot = 5, block = 1 - executing another duty, no block number fetch", func(t *testing.T) { + currentSlot.Set(phase0.Slot(sameBlockExit.BlockNumber) + voluntaryExitSlotsToPostpone) + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, nil, executeDutiesCall, timeout, expectedDuties[1:2]) + require.EqualValues(t, 2, blockByNumberCalls.Load()) + }) + + exitCh <- newBlockExit + + t.Run("slot = 5, block = 2 - no execution", func(t *testing.T) { + currentSlot.Set(phase0.Slot(normalExit.BlockNumber) + voluntaryExitSlotsToPostpone) + ticker.Send(currentSlot.Get()) + waitForNoActionGenesis(t, logger, nil, executeDutiesCall, timeout) + require.EqualValues(t, 3, blockByNumberCalls.Load()) + }) + + t.Run("slot = 6, block = 1 - executing new duty, fetching block number", func(t *testing.T) { + currentSlot.Set(phase0.Slot(newBlockExit.BlockNumber) + voluntaryExitSlotsToPostpone) + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, nil, executeDutiesCall, timeout, expectedDuties[2:3]) + require.EqualValues(t, 3, blockByNumberCalls.Load()) + }) + + exitCh <- pastBlockExit + + t.Run("slot = 10, block = 5 - executing past duty, fetching block number", func(t *testing.T) { + currentSlot.Set(phase0.Slot(pastBlockExit.BlockNumber) + voluntaryExitSlotsToPostpone + 1) + ticker.Send(currentSlot.Get()) + waitForGenesisDutiesExecution(t, logger, nil, executeDutiesCall, timeout, expectedDuties[3:4]) + require.EqualValues(t, 4, blockByNumberCalls.Load()) + }) + + cancel() + close(exitCh) + require.NoError(t, schedulerPool.Wait()) +} + +func expectedExecutedVoluntaryExitGenesisDuties(descriptors []ExitDescriptor) []*genesisspectypes.Duty { + expectedDuties := make([]*genesisspectypes.Duty, 0) + for _, d := range descriptors { + expectedDuties = append(expectedDuties, &genesisspectypes.Duty{ + Type: genesisspectypes.BNRoleVoluntaryExit, + PubKey: d.PubKey, + Slot: phase0.Slot(d.BlockNumber) + voluntaryExitSlotsToPostpone, + ValidatorIndex: d.ValidatorIndex, + }) + } + return expectedDuties +}