Skip to content

Commit

Permalink
push gossiper implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com>
  • Loading branch information
joshua-kim committed Dec 11, 2023
1 parent 02e7588 commit 9b7e287
Show file tree
Hide file tree
Showing 8 changed files with 560 additions and 29 deletions.
90 changes: 90 additions & 0 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package gossip

import (
"context"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -23,6 +24,7 @@ import (
var (
_ Gossiper = (*ValidatorGossiper)(nil)
_ Gossiper = (*PullGossiper[testTx, *testTx])(nil)
_ Gossiper = (*PushGossiper[*testTx])(nil)
)

// Gossiper gossips Gossipables to other nodes
Expand Down Expand Up @@ -179,6 +181,94 @@ func (p *PullGossiper[T, U]) handleResponse(
p.receivedBytes.Add(float64(receivedBytes))
}

// NewPushGossiper returns an instance of PushGossiper
func NewPushGossiper[T Gossipable](client *p2p.Client) *PushGossiper[T] {
return &PushGossiper[T]{
sender: &gossipClient{
client: client,
},
}
}

// PushGossiper broadcasts gossip to peers randomly in the network
type PushGossiper[T Gossipable] struct {
sender gossipSender

lock sync.Mutex
queued []T
}

// Add queues gossipables to be gossiped
func (p *PushGossiper[T]) Add(gossipables ...T) {
p.lock.Lock()
defer p.lock.Unlock()

p.queued = append(p.queued, gossipables...)
}

// Gossip flushes any queued gossipables
func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
p.lock.Lock()
defer p.lock.Unlock()

if len(p.queued) == 0 {
return nil
}

msg := &sdk.PushGossip{
Gossip: make([][]byte, 0, len(p.queued)),
}

for _, tx := range p.queued {
bytes, err := tx.Marshal()
if err != nil {
return err
}

msg.Gossip = append(msg.Gossip, bytes)
}

p.queued = nil

msgBytes, err := proto.Marshal(msg)
if err != nil {
return err
}

return p.sender.sendGossip(ctx, msgBytes)
}

// Subscribe gossips a gossipable whenever one is made available
func Subscribe[T Gossipable](
ctx context.Context,
log logging.Logger,
gossiper *PushGossiper[T],
gossipables <-chan T,
) {
for {
select {
case gossipable, ok := <-gossipables:
if !ok {
log.Debug("shutting down push gossip",
zap.String("reason", "channel closed"),
)
return
}

gossiper.Add(gossipable)

if err := gossiper.Gossip(ctx); err != nil {
log.Warn("push gossip failed", zap.Error(err))
}
case <-ctx.Done():
log.Debug("shutting down push gossip",
zap.String("reason", "context cancelled"),
)
return
}
}
}

// Every calls [Gossip] every [frequency] amount of time.
func Every(ctx context.Context, log logging.Logger, gossiper Gossiper, frequency time.Duration) {
ticker := time.NewTicker(frequency)
Expand Down
Loading

0 comments on commit 9b7e287

Please sign in to comment.