Skip to content

Commit

Permalink
Add manager validator set callbacks (#2950)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed Apr 18, 2024
1 parent eca19b7 commit 52dd10f
Show file tree
Hide file tree
Showing 15 changed files with 432 additions and 199 deletions.
8 changes: 4 additions & 4 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func (m *manager) createAvalancheChain(
if err != nil {
return nil, fmt.Errorf("error creating peer tracker: %w", err)
}
vdrs.RegisterCallbackListener(ctx.SubnetID, connectedValidators)
vdrs.RegisterSetCallbackListener(ctx.SubnetID, connectedValidators)

peerTracker, err := p2p.NewPeerTracker(
ctx.Log,
Expand Down Expand Up @@ -794,7 +794,7 @@ func (m *manager) createAvalancheChain(

connectedBeacons := tracker.NewPeers()
startupTracker := tracker.NewStartup(connectedBeacons, (3*bootstrapWeight+3)/4)
vdrs.RegisterCallbackListener(ctx.SubnetID, startupTracker)
vdrs.RegisterSetCallbackListener(ctx.SubnetID, startupTracker)

snowGetHandler, err := snowgetter.New(
vmWrappingProposerVM,
Expand Down Expand Up @@ -1107,7 +1107,7 @@ func (m *manager) createSnowmanChain(
if err != nil {
return nil, fmt.Errorf("error creating peer tracker: %w", err)
}
vdrs.RegisterCallbackListener(ctx.SubnetID, connectedValidators)
vdrs.RegisterSetCallbackListener(ctx.SubnetID, connectedValidators)

peerTracker, err := p2p.NewPeerTracker(
ctx.Log,
Expand Down Expand Up @@ -1139,7 +1139,7 @@ func (m *manager) createSnowmanChain(

connectedBeacons := tracker.NewPeers()
startupTracker := tracker.NewStartup(connectedBeacons, (3*bootstrapWeight+3)/4)
beacons.RegisterCallbackListener(ctx.SubnetID, startupTracker)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startupTracker)

snowGetHandler, err := snowgetter.New(
vm,
Expand Down
2 changes: 1 addition & 1 deletion network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func NewNetwork(
if err != nil {
return nil, fmt.Errorf("initializing ip tracker failed with: %w", err)
}
config.Validators.RegisterCallbackListener(constants.PrimaryNetworkID, ipTracker)
config.Validators.RegisterSetCallbackListener(constants.PrimaryNetworkID, ipTracker)

// Track all default bootstrappers to ensure their current IPs are gossiped
// like validator IPs.
Expand Down
8 changes: 6 additions & 2 deletions node/overridden_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ func (o *overriddenManager) GetMap(ids.ID) map[ids.NodeID]*validators.GetValidat
return o.manager.GetMap(o.subnetID)
}

func (o *overriddenManager) RegisterCallbackListener(_ ids.ID, listener validators.SetCallbackListener) {
o.manager.RegisterCallbackListener(o.subnetID, listener)
func (o *overriddenManager) RegisterCallbackListener(listener validators.ManagerCallbackListener) {
o.manager.RegisterCallbackListener(listener)
}

func (o *overriddenManager) RegisterSetCallbackListener(_ ids.ID, listener validators.SetCallbackListener) {
o.manager.RegisterSetCallbackListener(o.subnetID, listener)
}

func (o *overriddenManager) String() string {
Expand Down
2 changes: 1 addition & 1 deletion snow/engine/avalanche/bootstrap/bootstrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func newConfig(t *testing.T) (Config, ids.NodeID, *common.SenderTest, *vertex.Te
totalWeight, err := vdrs.TotalWeight(constants.PrimaryNetworkID)
require.NoError(err)
startupTracker := tracker.NewStartup(peerTracker, totalWeight/2+1)
vdrs.RegisterCallbackListener(constants.PrimaryNetworkID, startupTracker)
vdrs.RegisterSetCallbackListener(constants.PrimaryNetworkID, startupTracker)

avaGetHandler, err := getter.New(manager, sender, ctx.Log, time.Second, 2000, ctx.AvalancheRegisterer)
require.NoError(err)
Expand Down
6 changes: 3 additions & 3 deletions snow/engine/snowman/bootstrap/bootstrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newConfig(t *testing.T) (Config, ids.NodeID, *common.SenderTest, *block.Tes
totalWeight, err := vdrs.TotalWeight(ctx.SubnetID)
require.NoError(err)
startupTracker := tracker.NewStartup(tracker.NewPeers(), totalWeight/2+1)
vdrs.RegisterCallbackListener(ctx.SubnetID, startupTracker)
vdrs.RegisterSetCallbackListener(ctx.SubnetID, startupTracker)

require.NoError(startupTracker.Connected(context.Background(), peer, version.CurrentApp))

Expand Down Expand Up @@ -126,7 +126,7 @@ func TestBootstrapperStartsOnlyIfEnoughStakeIsConnected(t *testing.T) {
startupAlpha := alpha

startupTracker := tracker.NewStartup(tracker.NewPeers(), startupAlpha)
peers.RegisterCallbackListener(ctx.SubnetID, startupTracker)
peers.RegisterSetCallbackListener(ctx.SubnetID, startupTracker)

snowGetHandler, err := getter.New(vm, sender, ctx.Log, time.Second, 2000, ctx.Registerer)
require.NoError(err)
Expand Down Expand Up @@ -650,7 +650,7 @@ func TestBootstrapNoParseOnNew(t *testing.T) {
totalWeight, err := peers.TotalWeight(ctx.SubnetID)
require.NoError(err)
startupTracker := tracker.NewStartup(tracker.NewPeers(), totalWeight/2+1)
peers.RegisterCallbackListener(ctx.SubnetID, startupTracker)
peers.RegisterSetCallbackListener(ctx.SubnetID, startupTracker)
require.NoError(startupTracker.Connected(context.Background(), peer, version.CurrentApp))

snowGetHandler, err := getter.New(vm, sender, ctx.Log, time.Second, 2000, ctx.Registerer)
Expand Down
30 changes: 15 additions & 15 deletions snow/engine/snowman/syncer/state_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func TestStateSyncingStartsOnlyIfEnoughStakeIsConnected(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, _, sender := buildTestsObjects(t, ctx, startup, beacons, alpha)

Expand Down Expand Up @@ -159,7 +159,7 @@ func TestStateSyncLocalSummaryIsIncludedAmongFrontiersIfAvailable(t *testing.T)

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, _ := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -197,7 +197,7 @@ func TestStateSyncNotFoundOngoingSummaryIsNotIncludedAmongFrontiers(t *testing.T

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, _ := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -228,7 +228,7 @@ func TestBeaconsAreReachedForFrontiersUponStartup(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, _, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -267,7 +267,7 @@ func TestUnRequestedStateSummaryFrontiersAreDropped(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -357,7 +357,7 @@ func TestMalformedStateSummaryFrontiersAreDropped(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -426,7 +426,7 @@ func TestLateResponsesFromUnresponsiveFrontiersAreNotRecorded(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -509,7 +509,7 @@ func TestStateSyncIsRestartedIfTooManyFrontierSeedersTimeout(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -598,7 +598,7 @@ func TestVoteRequestsAreSentAsAllFrontierBeaconsResponded(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -669,7 +669,7 @@ func TestUnRequestedVotesAreDropped(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -786,7 +786,7 @@ func TestVotesForUnknownSummariesAreDropped(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down Expand Up @@ -890,7 +890,7 @@ func TestStateSummaryIsPassedToVMAsMajorityOfVotesIsCastedForIt(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, alpha)

Expand Down Expand Up @@ -1035,7 +1035,7 @@ func TestVotingIsRestartedIfMajorityIsNotReachedDueToTimeouts(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, alpha)

Expand Down Expand Up @@ -1141,7 +1141,7 @@ func TestStateSyncIsStoppedIfEnoughVotesAreCastedWithNoClearMajority(t *testing.

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, fullVM, sender := buildTestsObjects(t, ctx, startup, beacons, alpha)

Expand Down Expand Up @@ -1286,7 +1286,7 @@ func TestStateSyncIsDoneOnceVMNotifies(t *testing.T) {

peers := tracker.NewPeers()
startup := tracker.NewStartup(peers, startupAlpha)
beacons.RegisterCallbackListener(ctx.SubnetID, startup)
beacons.RegisterSetCallbackListener(ctx.SubnetID, startup)

syncer, _, _ := buildTestsObjects(t, ctx, startup, beacons, (totalWeight+1)/2)

Expand Down
2 changes: 1 addition & 1 deletion snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func New(config Config) (*Transitive, error) {
}

acceptedFrontiers := tracker.NewAccepted()
config.Validators.RegisterCallbackListener(config.Ctx.SubnetID, acceptedFrontiers)
config.Validators.RegisterSetCallbackListener(config.Ctx.SubnetID, acceptedFrontiers)

factory := poll.NewEarlyTermNoTraversalFactory(
config.Params.AlphaPreference,
Expand Down
2 changes: 1 addition & 1 deletion snow/engine/snowman/transitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func setup(t *testing.T, engCfg Config) (ids.NodeID, validators.Manager, *common
require.NoError(vals.AddStaker(engCfg.Ctx.SubnetID, vdr, nil, ids.Empty, 1))
require.NoError(engCfg.ConnectedValidators.Connected(context.Background(), vdr, version.CurrentApp))

vals.RegisterCallbackListener(engCfg.Ctx.SubnetID, engCfg.ConnectedValidators)
vals.RegisterSetCallbackListener(engCfg.Ctx.SubnetID, engCfg.ConnectedValidators)

sender := &common.SenderTest{T: t}
engCfg.Sender = sender
Expand Down
2 changes: 1 addition & 1 deletion snow/networking/handler/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestHealthCheckSubnet(t *testing.T) {
require.NoError(err)

peerTracker := commontracker.NewPeers()
vdrs.RegisterCallbackListener(ctx.SubnetID, peerTracker)
vdrs.RegisterSetCallbackListener(ctx.SubnetID, peerTracker)

sb := subnets.New(
ctx.NodeID,
Expand Down
35 changes: 28 additions & 7 deletions snow/validators/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ var (
ErrMissingValidators = errors.New("missing validators")
)

type ManagerCallbackListener interface {
OnValidatorAdded(subnetID ids.ID, nodeID ids.NodeID, pk *bls.PublicKey, txID ids.ID, weight uint64)
OnValidatorRemoved(subnetID ids.ID, nodeID ids.NodeID, weight uint64)
OnValidatorWeightChanged(subnetID ids.ID, nodeID ids.NodeID, oldWeight, newWeight uint64)
}

type SetCallbackListener interface {
OnValidatorAdded(nodeID ids.NodeID, pk *bls.PublicKey, txID ids.ID, weight uint64)
OnValidatorRemoved(nodeID ids.NodeID, weight uint64)
Expand Down Expand Up @@ -88,9 +94,13 @@ type Manager interface {
// Map of the validators in this subnet
GetMap(subnetID ids.ID) map[ids.NodeID]*GetValidatorOutput

// When a validator's weight changes, or a validator is added/removed,
// this listener is called.
RegisterCallbackListener(subnetID ids.ID, listener SetCallbackListener)
// When a validator is added, removed, or its weight changes, the listener
// will be notified of the event.
RegisterCallbackListener(listener ManagerCallbackListener)

// When a validator is added, removed, or its weight changes on [subnetID],
// the listener will be notified of the event.
RegisterSetCallbackListener(subnetID ids.ID, listener SetCallbackListener)
}

// NewManager returns a new, empty manager
Expand All @@ -105,7 +115,8 @@ type manager struct {

// Key: Subnet ID
// Value: The validators that validate the subnet
subnetToVdrs map[ids.ID]*vdrSet
subnetToVdrs map[ids.ID]*vdrSet
callbackListeners []ManagerCallbackListener
}

func (m *manager) AddStaker(subnetID ids.ID, nodeID ids.NodeID, pk *bls.PublicKey, txID ids.ID, weight uint64) error {
Expand All @@ -118,7 +129,7 @@ func (m *manager) AddStaker(subnetID ids.ID, nodeID ids.NodeID, pk *bls.PublicKe

set, exists := m.subnetToVdrs[subnetID]
if !exists {
set = newSet()
set = newSet(subnetID, m.callbackListeners)
m.subnetToVdrs[subnetID] = set
}

Expand Down Expand Up @@ -264,13 +275,23 @@ func (m *manager) GetMap(subnetID ids.ID) map[ids.NodeID]*GetValidatorOutput {
return set.Map()
}

func (m *manager) RegisterCallbackListener(subnetID ids.ID, listener SetCallbackListener) {
func (m *manager) RegisterCallbackListener(listener ManagerCallbackListener) {
m.lock.Lock()
defer m.lock.Unlock()

m.callbackListeners = append(m.callbackListeners, listener)
for _, set := range m.subnetToVdrs {
set.RegisterManagerCallbackListener(listener)
}
}

func (m *manager) RegisterSetCallbackListener(subnetID ids.ID, listener SetCallbackListener) {
m.lock.Lock()
defer m.lock.Unlock()

set, exists := m.subnetToVdrs[subnetID]
if !exists {
set = newSet()
set = newSet(subnetID, m.callbackListeners)
m.subnetToVdrs[subnetID] = set
}

Expand Down
Loading

0 comments on commit 52dd10f

Please sign in to comment.