Skip to content

Commit

Permalink
SDK Push Gossiper implementation (#2428)
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com>
Signed-off-by: Stephen Buttolph <stephen@avalabs.org>
Co-authored-by: Stephen Buttolph <stephen@avalabs.org>
  • Loading branch information
joshua-kim and StephenButtolph authored Dec 14, 2023
1 parent abf4fbc commit 512f342
Show file tree
Hide file tree
Showing 10 changed files with 626 additions and 138 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/DataDog/zstd v1.5.2
github.com/Microsoft/go-winio v0.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231213002358-53424dd5480c
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231213223840-6e78d609ed32
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231213002358-53424dd5480c h1:bWPdqoi+J6ztfVhEl7iexFSaKyaFlMpIltIMVTpXDQY=
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231213002358-53424dd5480c/go.mod h1:v8pqR8wC9VuyPTEbI6/wmflXPIAmUr6SUwEKP+hi9iU=
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231213223840-6e78d609ed32 h1:CZ1N++oMSL6yKV/FcCx/7/2cmpAk3rQse797Xz/6Ro0=
github.com/ava-labs/coreth v0.12.9-rc.9.0.20231213223840-6e78d609ed32/go.mod h1:bHPGzEjcBOLIKGbik9ZzETOhHxnzdLyVX+Q/XvGmGeE=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
232 changes: 194 additions & 38 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package gossip

import (
"context"
"fmt"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -17,13 +19,25 @@ import (
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/buffer"
"github.com/ava-labs/avalanchego/utils/logging"
)

const (
typeLabel = "type"
pushType = "push"
pullType = "pull"
)

var (
_ Gossiper = (*ValidatorGossiper)(nil)
_ Gossiper = (*PullGossiper[testTx, *testTx])(nil)
_ Gossiper = (*NoOpGossiper)(nil)

_ Accumulator[*testTx] = (*PushGossiper[*testTx])(nil)
_ Accumulator[*testTx] = (*NoOpAccumulator[*testTx])(nil)

metricLabels = []string{typeLabel}
)

// Gossiper gossips Gossipables to other nodes
Expand All @@ -32,6 +46,13 @@ type Gossiper interface {
Gossip(ctx context.Context) error
}

// Accumulator allows a caller to accumulate gossipables to be gossiped
type Accumulator[T Gossipable] interface {
Gossiper
// Add queues gossipables to be gossiped
Add(gossipables ...T)
}

// GossipableAny exists to help create non-nil pointers to a concrete Gossipable
// ref: https://stackoverflow.com/questions/69573113/how-can-i-instantiate-a-non-nil-pointer-of-type-argument-with-generic-go
type GossipableAny[T any] interface {
Expand All @@ -47,6 +68,51 @@ type ValidatorGossiper struct {
Validators p2p.ValidatorSet
}

// Metrics that are tracked across a gossip protocol. A given protocol should
// only use a single instance of Metrics.
type Metrics struct {
sentCount *prometheus.CounterVec
sentBytes *prometheus.CounterVec
receivedCount *prometheus.CounterVec
receivedBytes *prometheus.CounterVec
}

// NewMetrics returns a common set of metrics
func NewMetrics(
metrics prometheus.Registerer,
namespace string,
) (Metrics, error) {
m := Metrics{
sentCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "gossip_sent_count",
Help: "amount of gossip sent (n)",
}, metricLabels),
sentBytes: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "gossip_sent_bytes",
Help: "amount of gossip sent (bytes)",
}, metricLabels),
receivedCount: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "gossip_received_count",
Help: "amount of gossip received (n)",
}, metricLabels),
receivedBytes: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Name: "gossip_received_bytes",
Help: "amount of gossip received (bytes)",
}, metricLabels),
}
err := utils.Err(
metrics.Register(m.sentCount),
metrics.Register(m.sentBytes),
metrics.Register(m.receivedCount),
metrics.Register(m.receivedBytes),
)
return m, err
}

func (v ValidatorGossiper) Gossip(ctx context.Context) error {
if !v.Validators.Has(ctx, v.NodeID) {
return nil
Expand All @@ -55,49 +121,32 @@ func (v ValidatorGossiper) Gossip(ctx context.Context) error {
return v.Gossiper.Gossip(ctx)
}

type Config struct {
Namespace string
PollSize int
}

func NewPullGossiper[T any, U GossipableAny[T]](
config Config,
log logging.Logger,
set Set[U],
client *p2p.Client,
metrics prometheus.Registerer,
) (*PullGossiper[T, U], error) {
p := &PullGossiper[T, U]{
config: config,
log: log,
set: set,
client: client,
receivedN: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: config.Namespace,
Name: "gossip_received_n",
Help: "amount of gossip received (n)",
}),
receivedBytes: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: config.Namespace,
Name: "gossip_received_bytes",
Help: "amount of gossip received (bytes)",
}),
metrics Metrics,
pollSize int,
) *PullGossiper[T, U] {
return &PullGossiper[T, U]{
log: log,
set: set,
client: client,
metrics: metrics,
pollSize: pollSize,
labels: prometheus.Labels{
typeLabel: pullType,
},
}

err := utils.Err(
metrics.Register(p.receivedN),
metrics.Register(p.receivedBytes),
)
return p, err
}

type PullGossiper[T any, U GossipableAny[T]] struct {
config Config
log logging.Logger
set Set[U]
client *p2p.Client
receivedN prometheus.Counter
receivedBytes prometheus.Counter
log logging.Logger
set Set[U]
client *p2p.Client
metrics Metrics
pollSize int
labels prometheus.Labels
}

func (p *PullGossiper[_, _]) Gossip(ctx context.Context) error {
Expand All @@ -115,7 +164,7 @@ func (p *PullGossiper[_, _]) Gossip(ctx context.Context) error {
return err
}

for i := 0; i < p.config.PollSize; i++ {
for i := 0; i < p.pollSize; i++ {
if err := p.client.AppRequestAny(ctx, msgBytes, p.handleResponse); err != nil {
return err
}
Expand Down Expand Up @@ -176,8 +225,107 @@ func (p *PullGossiper[T, U]) handleResponse(
}
}

p.receivedN.Add(float64(len(response.Gossip)))
p.receivedBytes.Add(float64(receivedBytes))
receivedCountMetric, err := p.metrics.receivedCount.GetMetricWith(p.labels)
if err != nil {
p.log.Error("failed to get received count metric", zap.Error(err))
return
}

receivedBytesMetric, err := p.metrics.receivedBytes.GetMetricWith(p.labels)
if err != nil {
p.log.Error("failed to get received bytes metric", zap.Error(err))
return
}

receivedCountMetric.Add(float64(len(response.Gossip)))
receivedBytesMetric.Add(float64(receivedBytes))
}

// NewPushGossiper returns an instance of PushGossiper
func NewPushGossiper[T Gossipable](client *p2p.Client, metrics Metrics, targetGossipSize int) *PushGossiper[T] {
return &PushGossiper[T]{
client: client,
metrics: metrics,
targetGossipSize: targetGossipSize,
labels: prometheus.Labels{
typeLabel: pushType,
},
pending: buffer.NewUnboundedDeque[T](0),
}
}

// PushGossiper broadcasts gossip to peers randomly in the network
type PushGossiper[T Gossipable] struct {
client *p2p.Client
metrics Metrics
targetGossipSize int

labels prometheus.Labels

lock sync.Mutex
pending buffer.Deque[T]
}

// Gossip flushes any queued gossipables
func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
p.lock.Lock()
defer p.lock.Unlock()

if p.pending.Len() == 0 {
return nil
}

msg := &sdk.PushGossip{
Gossip: make([][]byte, 0, p.pending.Len()),
}

sentBytes := 0
for sentBytes < p.targetGossipSize {
gossipable, ok := p.pending.PeekLeft()
if !ok {
break
}

bytes, err := gossipable.Marshal()
if err != nil {
// remove this item so we don't get stuck in a loop
_, _ = p.pending.PopLeft()
return err
}

msg.Gossip = append(msg.Gossip, bytes)
sentBytes += len(bytes)
p.pending.PopLeft()
}

msgBytes, err := proto.Marshal(msg)
if err != nil {
return err
}

sentCountMetric, err := p.metrics.sentCount.GetMetricWith(p.labels)
if err != nil {
return fmt.Errorf("failed to get sent count metric: %w", err)
}

sentBytesMetric, err := p.metrics.sentBytes.GetMetricWith(p.labels)
if err != nil {
return fmt.Errorf("failed to get sent bytes metric: %w", err)
}

sentCountMetric.Add(float64(len(msg.Gossip)))
sentBytesMetric.Add(float64(sentBytes))

return p.client.AppGossip(ctx, msgBytes)
}

func (p *PushGossiper[T]) Add(gossipables ...T) {
p.lock.Lock()
defer p.lock.Unlock()

for _, gossipable := range gossipables {
p.pending.PushRight(gossipable)
}
}

// Every calls [Gossip] every [frequency] amount of time.
Expand All @@ -203,3 +351,11 @@ type NoOpGossiper struct{}
func (NoOpGossiper) Gossip(context.Context) error {
return nil
}

type NoOpAccumulator[T Gossipable] struct{}

func (NoOpAccumulator[_]) Gossip(context.Context) error {
return nil
}

func (NoOpAccumulator[T]) Add(...T) {}
Loading

0 comments on commit 512f342

Please sign in to comment.