Skip to content

Commit

Permalink
Increase gossip size on first push
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed Feb 29, 2024
1 parent d1312cb commit 4347aff
Show file tree
Hide file tree
Showing 10 changed files with 326 additions and 188 deletions.
133 changes: 75 additions & 58 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,22 +263,19 @@ func NewPushGossiper[T Gossipable](
mempool Set[T],
client *p2p.Client,
metrics Metrics,
numValidators int,
numNonValidators int,
numPeers int,
gossipParams BranchingFactor,
regossipParams BranchingFactor,
discardedSize int,
targetGossipSize int,
maxRegossipFrequency time.Duration,
) (*PushGossiper[T], error) {
if err := gossipParams.Verify(); err != nil {
return nil, fmt.Errorf("invalid gossip params: %w", err)
}
if err := regossipParams.Verify(); err != nil {
return nil, fmt.Errorf("invalid regossip params: %w", err)
}
switch {
case numValidators < 0:
return nil, ErrInvalidNumValidators
case numNonValidators < 0:
return nil, ErrInvalidNumNonValidators
case numPeers < 0:
return nil, ErrInvalidNumPeers
case max(numValidators, numNonValidators, numPeers) == 0:
return nil, ErrInvalidNumToGossip
case discardedSize < 0:
return nil, ErrInvalidDiscardedSize
case targetGossipSize < 0:
Expand All @@ -292,9 +289,8 @@ func NewPushGossiper[T Gossipable](
set: mempool,
client: client,
metrics: metrics,
numValidators: numValidators,
numNonValidators: numNonValidators,
numPeers: numPeers,
gossipParams: gossipParams,
regossipParams: regossipParams,
targetGossipSize: targetGossipSize,
maxRegossipFrequency: maxRegossipFrequency,

Expand All @@ -312,9 +308,8 @@ type PushGossiper[T Gossipable] struct {
client *p2p.Client
metrics Metrics

numValidators int
numNonValidators int
numPeers int
gossipParams BranchingFactor
regossipParams BranchingFactor
targetGossipSize int
maxRegossipFrequency time.Duration

Expand All @@ -326,6 +321,27 @@ type PushGossiper[T Gossipable] struct {
discarded *cache.LRU[ids.ID, struct{}] // discarded attempts to avoid overgossiping transactions that are frequently dropped
}

type BranchingFactor struct {
Validators int
NonValidators int
Peers int
}

func (b *BranchingFactor) Verify() error {
switch {
case b.Validators < 0:
return ErrInvalidNumValidators
case b.NonValidators < 0:
return ErrInvalidNumNonValidators
case b.Peers < 0:
return ErrInvalidNumPeers
case max(b.Validators, b.NonValidators, b.Peers) == 0:
return ErrInvalidNumToGossip
default:
return nil
}
}

type tracking struct {
addedTime float64 // unix nanoseconds
lastGossiped time.Time
Expand All @@ -348,46 +364,48 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
return nil
}

var (
sentBytes = 0
gossip = make([][]byte, 0, defaultGossipableCount)
err := p.gossip(
ctx,
now,
p.gossipParams,
p.toGossip,
p.toRegossip,
&cache.Empty[ids.ID, struct{}]{}, // Don't mark dropped unsent transactions as discarded
)
if err != nil {
return fmt.Errorf("unexpected error during gossip: %w", err)
}

// Iterate over all unsent gossipables.
for sentBytes < p.targetGossipSize {
gossipable, ok := p.toGossip.PopLeft()
if !ok {
break
}

// Ensure item is still in the set before we gossip.
gossipID := gossipable.GossipID()
tracking := p.tracking[gossipID]
if !p.set.Has(gossipID) {
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
continue
}

bytes, err := p.marshaller.MarshalGossip(gossipable)
if err != nil {
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
return err
}

gossip = append(gossip, bytes)
sentBytes += len(bytes)
p.toRegossip.PushRight(gossipable)
tracking.lastGossiped = now
err = p.gossip(
ctx,
now,
p.regossipParams,
p.toRegossip,
p.toRegossip,
p.discarded, // Mark dropped sent transactions as discarded
)
if err != nil {
return fmt.Errorf("unexpected error during regossip: %w", err)
}
return nil
}

maxLastGossipTimeToRegossip := now.Add(-p.maxRegossipFrequency)
func (p *PushGossiper[T]) gossip(
ctx context.Context,
now time.Time,
gossipParams BranchingFactor,
toGossip buffer.Deque[T],
toRegossip buffer.Deque[T],
discarded cache.Cacher[ids.ID, struct{}],
) error {
var (
sentBytes = 0
gossip = make([][]byte, 0, defaultGossipableCount)
maxLastGossipTimeToRegossip = now.Add(-p.maxRegossipFrequency)
)

// Iterate over all previously sent gossipables to fill any remaining space
// in the gossip batch.
for sentBytes < p.targetGossipSize {
gossipable, ok := p.toRegossip.PopLeft()
gossipable, ok := toGossip.PopLeft()
if !ok {
break
}
Expand All @@ -398,29 +416,28 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
if !p.set.Has(gossipID) {
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
p.discarded.Put(gossipID, struct{}{}) // only add to discarded if previously sent
discarded.Put(gossipID, struct{}{}) // Cache that the item was dropped
continue
}

// Ensure we don't attempt to send a gossipable too frequently.
if maxLastGossipTimeToRegossip.Before(tracking.lastGossiped) {
// Put the gossipable on the front of the queue to keep items sorted
// by last issuance time.
p.toRegossip.PushLeft(gossipable)
toGossip.PushLeft(gossipable)
break
}

bytes, err := p.marshaller.MarshalGossip(gossipable)
if err != nil {
// Should never happen because we've already sent this once.
delete(p.tracking, gossipID)
p.addedTimeSum -= tracking.addedTime
return err
}

gossip = append(gossip, bytes)
sentBytes += len(bytes)
p.toRegossip.PushRight(gossipable)
toRegossip.PushRight(gossipable)
tracking.lastGossiped = now
}

Expand Down Expand Up @@ -448,9 +465,9 @@ func (p *PushGossiper[T]) Gossip(ctx context.Context) error {
return p.client.AppGossip(
ctx,
msgBytes,
p.numValidators,
p.numNonValidators,
p.numPeers,
gossipParams.Validators,
gossipParams.NonValidators,
gossipParams.Peers,
)
}

Expand Down
Loading

0 comments on commit 4347aff

Please sign in to comment.