Skip to content

Commit

Permalink
network/p2p/gossip: refactor Set.Add to accept multiple elements
Browse files Browse the repository at this point in the history
Allows potential optimized implementations for adding multiple elements to sets of Gossipables.
  • Loading branch information
lebdron committed May 4, 2024
1 parent 2f0216b commit 8ff1cc7
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 88 deletions.
29 changes: 19 additions & 10 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (p *PullGossiper[_]) Gossip(ctx context.Context) error {
return nil
}

func (p *PullGossiper[_]) handleResponse(
func (p *PullGossiper[T]) handleResponse(
_ context.Context,
nodeID ids.NodeID,
responseBytes []byte,
Expand All @@ -220,6 +220,7 @@ func (p *PullGossiper[_]) handleResponse(
}

receivedBytes := 0
gossipables := make([]T, 0, len(gossip))
for _, bytes := range gossip {
receivedBytes += len(bytes)

Expand All @@ -233,20 +234,24 @@ func (p *PullGossiper[_]) handleResponse(
continue
}

gossipID := gossipable.GossipID()
p.log.Debug(
"received gossip",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", gossipID),
zap.Stringer("id", gossipable.GossipID()),
)
if err := p.set.Add(gossipable); err != nil {

gossipables = append(gossipables, gossipable)
}

errs := p.set.Add(gossipables...)
for i, err := range errs {
if err != nil {
p.log.Debug(
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", gossipID),
zap.Stringer("id", gossipables[i].GossipID()),
zap.Error(err),
)
continue
}
}

Expand Down Expand Up @@ -596,8 +601,12 @@ func (EmptySet[_]) Gossip(context.Context) error {
return nil
}

func (EmptySet[T]) Add(T) error {
return errEmptySetCantAdd
func (EmptySet[T]) Add(gossipables ...T) []error {
errs := make([]error, len(gossipables))
for i := range errs {
errs[i] = errEmptySetCantAdd
}
return errs
}

func (EmptySet[T]) Has(ids.ID) bool {
Expand All @@ -616,8 +625,8 @@ func (FullSet[_]) Gossip(context.Context) error {
return nil
}

func (FullSet[T]) Add(T) error {
return nil
func (FullSet[T]) Add(gossipables ...T) []error {
return make([]error, len(gossipables))
}

func (FullSet[T]) Has(ids.ID) bool {
Expand Down
4 changes: 2 additions & 2 deletions network/p2p/gossip/gossipable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ type Marshaller[T Gossipable] interface {

// Set holds a set of known Gossipable items
type Set[T Gossipable] interface {
// Add adds a Gossipable to the set. Returns an error if gossipable was not
// Add adds Gossipables to the set. Returns an error if gossipable was not
// added.
Add(gossipable T) error
Add(gossipables ...T) []error
// Has returns true if the gossipable is in the set.
Has(gossipID ids.ID) bool
// Iterate iterates over elements until [f] returns false
Expand Down
12 changes: 9 additions & 3 deletions network/p2p/gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req
return MarshalAppResponse(gossipBytes)
}

func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) {
func (h Handler[T]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) {
gossip, err := ParseAppGossip(gossipBytes)
if err != nil {
h.log.Debug("failed to unmarshal gossip", zap.Error(err))
return
}

receivedBytes := 0
gossipables := make([]T, 0, len(gossip))
for _, bytes := range gossip {
receivedBytes += len(bytes)
gossipable, err := h.marshaller.UnmarshalGossip(bytes)
Expand All @@ -113,11 +114,16 @@ func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes
continue
}

if err := h.set.Add(gossipable); err != nil {
gossipables = append(gossipables, gossipable)
}

errs := h.set.Add(gossipables...)
for i, err := range errs {
if err != nil {
h.log.Debug(
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", gossipable.GossipID()),
zap.Stringer("id", gossipables[i].GossipID()),
zap.Error(err),
)
}
Expand Down
22 changes: 13 additions & 9 deletions network/p2p/gossip/test_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,22 @@ type testSet struct {
onAdd func(tx *testTx)
}

func (t *testSet) Add(gossipable *testTx) error {
if _, ok := t.txs[gossipable.id]; ok {
return fmt.Errorf("%s already present", gossipable.id)
}
func (t *testSet) Add(gossipables ...*testTx) []error {
errs := make([]error, len(gossipables))
for i, gossipable := range gossipables {
if _, ok := t.txs[gossipable.id]; ok {
errs[i] = fmt.Errorf("%s already present", gossipable.id)
continue
}

t.txs[gossipable.id] = gossipable
t.bloom.Add(gossipable)
if t.onAdd != nil {
t.onAdd(gossipable)
t.txs[gossipable.id] = gossipable
t.bloom.Add(gossipable)
if t.onAdd != nil {
t.onAdd(gossipable)
}
}

return nil
return errs
}

func (t *testSet) Has(gossipID ids.ID) bool {
Expand Down
44 changes: 25 additions & 19 deletions vms/avm/network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,27 +99,33 @@ type gossipMempool struct {
// us and when handling transactions that were pulled from a peer. If this
// returns a nil error while handling push gossip, the p2p SDK will queue the
// transaction to push gossip as well.
func (g *gossipMempool) Add(tx *txs.Tx) error {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
return fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID)
func (g *gossipMempool) Add(txs ...*txs.Tx) []error {
errs := make([]error, len(txs))
for i, tx := range txs {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
errs[i] = fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID)
continue
}

if errs[i] = g.Mempool.GetDropReason(txID); errs[i] != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
continue
}

// Verify the tx at the currently preferred state
if errs[i] = g.txVerifier.VerifyTx(tx); errs[i] != nil {
g.Mempool.MarkDropped(txID, errs[i])
continue
}

errs[i] = g.AddWithoutVerification(tx)
}

if reason := g.Mempool.GetDropReason(txID); reason != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
return reason
}

// Verify the tx at the currently preferred state
if err := g.txVerifier.VerifyTx(tx); err != nil {
g.Mempool.MarkDropped(txID, err)
return err
}

return g.AddWithoutVerification(tx)
return errs
}

func (g *gossipMempool) Has(txID ids.ID) bool {
Expand Down
4 changes: 2 additions & 2 deletions vms/avm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ func (n *Network) PullGossip(ctx context.Context) {
// returned.
// If the tx is not added to the mempool, an error will be returned.
func (n *Network) IssueTxFromRPC(tx *txs.Tx) error {
if err := n.mempool.Add(tx); err != nil {
return err
if errs := n.mempool.Add(tx); errs[0] != nil {
return errs[0]
}
n.txPushGossiper.Add(tx)
return nil
Expand Down
87 changes: 46 additions & 41 deletions vms/platformvm/network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,49 +90,54 @@ type gossipMempool struct {
bloom *gossip.BloomFilter
}

func (g *gossipMempool) Add(tx *txs.Tx) error {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
return fmt.Errorf("tx %s dropped: %w", txID, mempool.ErrDuplicateTx)
func (g *gossipMempool) Add(txns ...*txs.Tx) []error {
errs := make([]error, len(txns))
for i, tx := range txns {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
errs[i] = fmt.Errorf("tx %s dropped: %w", txID, mempool.ErrDuplicateTx)
continue
}

if errs[i] = g.Mempool.GetDropReason(txID); errs[i] != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
continue
}

if errs[i] = g.txVerifier.VerifyTx(tx); errs[i] != nil {
g.Mempool.MarkDropped(txID, errs[i])
continue
}

if errs[i] = g.Mempool.Add(tx); errs[i] != nil {
g.Mempool.MarkDropped(txID, errs[i])
continue
}

g.lock.Lock()
defer g.lock.Unlock()

g.bloom.Add(tx)
var reset bool
reset, errs[i] = gossip.ResetBloomFilterIfNeeded(g.bloom, g.Mempool.Len()*bloomChurnMultiplier)
if errs[i] != nil {
continue
}

if reset {
g.log.Debug("resetting bloom filter")
g.Mempool.Iterate(func(tx *txs.Tx) bool {
g.bloom.Add(tx)
return true
})
}
}

if reason := g.Mempool.GetDropReason(txID); reason != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
return reason
}

if err := g.txVerifier.VerifyTx(tx); err != nil {
g.Mempool.MarkDropped(txID, err)
return err
}

if err := g.Mempool.Add(tx); err != nil {
g.Mempool.MarkDropped(txID, err)
return err
}

g.lock.Lock()
defer g.lock.Unlock()

g.bloom.Add(tx)
reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, g.Mempool.Len()*bloomChurnMultiplier)
if err != nil {
return err
}

if reset {
g.log.Debug("resetting bloom filter")
g.Mempool.Iterate(func(tx *txs.Tx) bool {
g.bloom.Add(tx)
return true
})
}

g.Mempool.RequestBuildBlock(false)
return nil

return errs
}

func (g *gossipMempool) Has(txID ids.ID) bool {
Expand Down
4 changes: 2 additions & 2 deletions vms/platformvm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ func (n *Network) IssueTxFromRPC(tx *txs.Tx) error {
return errMempoolDisabledWithPartialSync
}

if err := n.mempool.Add(tx); err != nil {
return err
if errs := n.mempool.Add(tx); errs[0] != nil {
return errs[0]
}
n.txPushGossiper.Add(tx)
return nil
Expand Down

0 comments on commit 8ff1cc7

Please sign in to comment.