Skip to content

Commit

Permalink
Add Validator Gossiper (#2015)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim authored Sep 13, 2023
1 parent e5f676a commit d1ad965
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 46 deletions.
103 changes: 65 additions & 38 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,53 @@ import (
"github.com/ava-labs/avalanchego/utils/wrappers"
)

var (
_ Gossiper = (*ValidatorGossiper)(nil)
_ Gossiper = (*PullGossiper[testTx, *testTx])(nil)
)

// Gossiper gossips Gossipables to other nodes
type Gossiper interface {
// Gossip runs a cycle of gossip. Returns an error if we failed to gossip.
Gossip(ctx context.Context) error
}

// GossipableAny exists to help create non-nil pointers to a concrete Gossipable
// ref: https://stackoverflow.com/questions/69573113/how-can-i-instantiate-a-non-nil-pointer-of-type-argument-with-generic-go
type GossipableAny[T any] interface {
*T
Gossipable
}

// ValidatorGossiper only calls [Gossip] if the given node is a validator
type ValidatorGossiper struct {
Gossiper

NodeID ids.NodeID
Validators p2p.ValidatorSet
}

func (v ValidatorGossiper) Gossip(ctx context.Context) error {
if !v.Validators.Has(ctx, v.NodeID) {
return nil
}

return v.Gossiper.Gossip(ctx)
}

type Config struct {
Namespace string
Frequency time.Duration
PollSize int
}

func NewGossiper[T any, U GossipableAny[T]](
func NewPullGossiper[T any, U GossipableAny[T]](
config Config,
log logging.Logger,
set Set[U],
client *p2p.Client,
metrics prometheus.Registerer,
) (*Gossiper[T, U], error) {
g := &Gossiper[T, U]{
) (*PullGossiper[T, U], error) {
p := &PullGossiper[T, U]{
config: config,
log: log,
set: set,
Expand All @@ -59,14 +85,14 @@ func NewGossiper[T any, U GossipableAny[T]](

errs := wrappers.Errs{}
errs.Add(
metrics.Register(g.receivedN),
metrics.Register(g.receivedBytes),
metrics.Register(p.receivedN),
metrics.Register(p.receivedBytes),
)

return g, errs.Err
return p, errs.Err
}

type Gossiper[T any, U GossipableAny[T]] struct {
type PullGossiper[T any, U GossipableAny[T]] struct {
config Config
log logging.Logger
set Set[U]
Expand All @@ -75,25 +101,8 @@ type Gossiper[T any, U GossipableAny[T]] struct {
receivedBytes prometheus.Counter
}

func (g *Gossiper[_, _]) Gossip(ctx context.Context) {
gossipTicker := time.NewTicker(g.config.Frequency)
defer gossipTicker.Stop()

for {
select {
case <-gossipTicker.C:
if err := g.gossip(ctx); err != nil {
g.log.Warn("failed to gossip", zap.Error(err))
}
case <-ctx.Done():
g.log.Debug("shutting down gossip")
return
}
}
}

func (g *Gossiper[_, _]) gossip(ctx context.Context) error {
bloom, salt, err := g.set.GetFilter()
func (p *PullGossiper[_, _]) Gossip(ctx context.Context) error {
bloom, salt, err := p.set.GetFilter()
if err != nil {
return err
}
Expand All @@ -107,23 +116,23 @@ func (g *Gossiper[_, _]) gossip(ctx context.Context) error {
return err
}

for i := 0; i < g.config.PollSize; i++ {
if err := g.client.AppRequestAny(ctx, msgBytes, g.handleResponse); err != nil {
for i := 0; i < p.config.PollSize; i++ {
if err := p.client.AppRequestAny(ctx, msgBytes, p.handleResponse); err != nil {
return err
}
}

return nil
}

func (g *Gossiper[T, U]) handleResponse(
func (p *PullGossiper[T, U]) handleResponse(
_ context.Context,
nodeID ids.NodeID,
responseBytes []byte,
err error,
) {
if err != nil {
g.log.Debug(
p.log.Debug(
"failed gossip request",
zap.Stringer("nodeID", nodeID),
zap.Error(err),
Expand All @@ -133,7 +142,7 @@ func (g *Gossiper[T, U]) handleResponse(

response := &sdk.PullGossipResponse{}
if err := proto.Unmarshal(responseBytes, response); err != nil {
g.log.Debug("failed to unmarshal gossip response", zap.Error(err))
p.log.Debug("failed to unmarshal gossip response", zap.Error(err))
return
}

Expand All @@ -143,7 +152,7 @@ func (g *Gossiper[T, U]) handleResponse(

gossipable := U(new(T))
if err := gossipable.Unmarshal(bytes); err != nil {
g.log.Debug(
p.log.Debug(
"failed to unmarshal gossip",
zap.Stringer("nodeID", nodeID),
zap.Error(err),
Expand All @@ -152,13 +161,13 @@ func (g *Gossiper[T, U]) handleResponse(
}

hash := gossipable.GetID()
g.log.Debug(
p.log.Debug(
"received gossip",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", hash),
)
if err := g.set.Add(gossipable); err != nil {
g.log.Debug(
if err := p.set.Add(gossipable); err != nil {
p.log.Debug(
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", hash),
Expand All @@ -168,6 +177,24 @@ func (g *Gossiper[T, U]) handleResponse(
}
}

g.receivedN.Add(float64(len(response.Gossip)))
g.receivedBytes.Add(float64(receivedBytes))
p.receivedN.Add(float64(len(response.Gossip)))
p.receivedBytes.Add(float64(receivedBytes))
}

// Every calls [Gossip] every [frequency] amount of time.
func Every(ctx context.Context, log logging.Logger, gossiper Gossiper, frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()

for {
select {
case <-ticker.C:
if err := gossiper.Gossip(ctx); err != nil {
log.Warn("failed to gossip", zap.Error(err))
}
case <-ctx.Done():
log.Debug("shutting down gossip")
return
}
}
}
85 changes: 77 additions & 8 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ import (
"github.com/ava-labs/avalanchego/utils/set"
)

var (
_ p2p.ValidatorSet = (*testValidatorSet)(nil)
_ Gossiper = (*testGossiper)(nil)
)

func TestGossiperShutdown(t *testing.T) {
require := require.New(t)

config := Config{Frequency: time.Second}
metrics := prometheus.NewRegistry()
gossiper, err := NewGossiper[testTx](
config,
gossiper, err := NewPullGossiper[testTx](
Config{},
logging.NoLog{},
nil,
nil,
Expand All @@ -41,7 +45,7 @@ func TestGossiperShutdown(t *testing.T) {
wg.Add(1)

go func() {
gossiper.Gossip(ctx)
Every(ctx, logging.NoLog{}, gossiper, time.Second)
wg.Done()
}()

Expand Down Expand Up @@ -166,10 +170,9 @@ func TestGossiperGossip(t *testing.T) {
require.NoError(err)

config := Config{
Frequency: 500 * time.Millisecond,
PollSize: 1,
PollSize: 1,
}
gossiper, err := NewGossiper[testTx, *testTx](
gossiper, err := NewPullGossiper[testTx, *testTx](
config,
logging.NoLog{},
requestSet,
Expand All @@ -182,7 +185,7 @@ func TestGossiperGossip(t *testing.T) {
received.Add(tx)
}

require.NoError(gossiper.gossip(context.Background()))
require.NoError(gossiper.Gossip(context.Background()))
<-gossiped

require.Len(requestSet.set, tt.expectedLen)
Expand All @@ -196,3 +199,69 @@ func TestGossiperGossip(t *testing.T) {
})
}
}

func TestEvery(*testing.T) {
ctx, cancel := context.WithCancel(context.Background())
calls := 0
gossiper := &testGossiper{
gossipF: func(context.Context) error {
if calls >= 10 {
cancel()
return nil
}

calls++
return nil
},
}

go Every(ctx, logging.NoLog{}, gossiper, time.Millisecond)
<-ctx.Done()
}

func TestValidatorGossiper(t *testing.T) {
require := require.New(t)

nodeID := ids.GenerateTestNodeID()

validators := testValidatorSet{
validators: set.Of(nodeID),
}

calls := 0
gossiper := ValidatorGossiper{
Gossiper: &testGossiper{
gossipF: func(context.Context) error {
calls++
return nil
},
},
NodeID: nodeID,
Validators: validators,
}

// we are a validator, so we should request gossip
require.NoError(gossiper.Gossip(context.Background()))
require.Equal(1, calls)

// we are not a validator, so we should not request gossip
validators.validators = set.Set[ids.NodeID]{}
require.NoError(gossiper.Gossip(context.Background()))
require.Equal(2, calls)
}

type testGossiper struct {
gossipF func(ctx context.Context) error
}

func (t *testGossiper) Gossip(ctx context.Context) error {
return t.gossipF(ctx)
}

type testValidatorSet struct {
validators set.Set[ids.NodeID]
}

func (t testValidatorSet) Has(_ context.Context, nodeID ids.NodeID) bool {
return t.validators.Contains(nodeID)
}

0 comments on commit d1ad965

Please sign in to comment.