From f5c1f9e64d97e6ef73a97c3c56247b1181bc4f72 Mon Sep 17 00:00:00 2001 From: Andrei Lebedev Date: Fri, 3 May 2024 21:16:54 +1000 Subject: [PATCH] network/p2p/gossip: refactor Set.Add to accept multiple elements Allows potential optimized implementations for adding multiple elements to sets of Gossipables. --- network/p2p/gossip/gossip.go | 29 ++++++---- network/p2p/gossip/gossipable.go | 4 +- network/p2p/gossip/handler.go | 12 +++-- network/p2p/gossip/test_gossip.go | 22 ++++---- vms/avm/network/gossip.go | 44 +++++++++------- vms/avm/network/network.go | 4 +- vms/platformvm/network/gossip.go | 88 +++++++++++++++++-------------- vms/platformvm/network/network.go | 4 +- 8 files changed, 119 insertions(+), 88 deletions(-) diff --git a/network/p2p/gossip/gossip.go b/network/p2p/gossip/gossip.go index 2aeb06e1f4f4..2369de48501c 100644 --- a/network/p2p/gossip/gossip.go +++ b/network/p2p/gossip/gossip.go @@ -198,7 +198,7 @@ func (p *PullGossiper[_]) Gossip(ctx context.Context) error { return nil } -func (p *PullGossiper[_]) handleResponse( +func (p *PullGossiper[T]) handleResponse( _ context.Context, nodeID ids.NodeID, responseBytes []byte, @@ -220,6 +220,7 @@ func (p *PullGossiper[_]) handleResponse( } receivedBytes := 0 + gossipables := make([]T, 0, len(gossip)) for _, bytes := range gossip { receivedBytes += len(bytes) @@ -233,20 +234,24 @@ func (p *PullGossiper[_]) handleResponse( continue } - gossipID := gossipable.GossipID() p.log.Debug( "received gossip", zap.Stringer("nodeID", nodeID), - zap.Stringer("id", gossipID), + zap.Stringer("id", gossipable.GossipID()), ) - if err := p.set.Add(gossipable); err != nil { + + gossipables = append(gossipables, gossipable) + } + + errs := p.set.Add(gossipables...) + for i, err := range errs { + if err != nil { p.log.Debug( "failed to add gossip to the known set", zap.Stringer("nodeID", nodeID), - zap.Stringer("id", gossipID), + zap.Stringer("id", gossipables[i].GossipID()), zap.Error(err), ) - continue } } @@ -596,8 +601,12 @@ func (EmptySet[_]) Gossip(context.Context) error { return nil } -func (EmptySet[T]) Add(T) error { - return errEmptySetCantAdd +func (EmptySet[T]) Add(gossipables ...T) []error { + errs := make([]error, len(gossipables)) + for i := range errs { + errs[i] = errEmptySetCantAdd + } + return errs } func (EmptySet[T]) Has(ids.ID) bool { @@ -616,8 +625,8 @@ func (FullSet[_]) Gossip(context.Context) error { return nil } -func (FullSet[T]) Add(T) error { - return nil +func (FullSet[T]) Add(gossipables ...T) []error { + return make([]error, len(gossipables)) } func (FullSet[T]) Has(ids.ID) bool { diff --git a/network/p2p/gossip/gossipable.go b/network/p2p/gossip/gossipable.go index 6af60d666bb6..05b6dfad553e 100644 --- a/network/p2p/gossip/gossipable.go +++ b/network/p2p/gossip/gossipable.go @@ -18,9 +18,9 @@ type Marshaller[T Gossipable] interface { // Set holds a set of known Gossipable items type Set[T Gossipable] interface { - // Add adds a Gossipable to the set. Returns an error if gossipable was not + // Add adds Gossipables to the set. Returns an error if gossipable was not // added. - Add(gossipable T) error + Add(gossipables ...T) []error // Has returns true if the gossipable is in the set. Has(gossipID ids.ID) bool // Iterate iterates over elements until [f] returns false diff --git a/network/p2p/gossip/handler.go b/network/p2p/gossip/handler.go index 5c125864cc62..f687a6dbc610 100644 --- a/network/p2p/gossip/handler.go +++ b/network/p2p/gossip/handler.go @@ -94,7 +94,7 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req return MarshalAppResponse(gossipBytes) } -func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) { +func (h Handler[T]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) { gossip, err := ParseAppGossip(gossipBytes) if err != nil { h.log.Debug("failed to unmarshal gossip", zap.Error(err)) @@ -102,6 +102,7 @@ func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes } receivedBytes := 0 + gossipables := make([]T, 0, len(gossip)) for _, bytes := range gossip { receivedBytes += len(bytes) gossipable, err := h.marshaller.UnmarshalGossip(bytes) @@ -113,11 +114,16 @@ func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes continue } - if err := h.set.Add(gossipable); err != nil { + gossipables = append(gossipables, gossipable) + } + + errs := h.set.Add(gossipables...) + for i, err := range errs { + if err != nil { h.log.Debug( "failed to add gossip to the known set", zap.Stringer("nodeID", nodeID), - zap.Stringer("id", gossipable.GossipID()), + zap.Stringer("id", gossipables[i].GossipID()), zap.Error(err), ) } diff --git a/network/p2p/gossip/test_gossip.go b/network/p2p/gossip/test_gossip.go index 7f8782b65916..cdfed5ac3bc3 100644 --- a/network/p2p/gossip/test_gossip.go +++ b/network/p2p/gossip/test_gossip.go @@ -42,18 +42,22 @@ type testSet struct { onAdd func(tx *testTx) } -func (t *testSet) Add(gossipable *testTx) error { - if _, ok := t.txs[gossipable.id]; ok { - return fmt.Errorf("%s already present", gossipable.id) - } +func (t *testSet) Add(gossipables ...*testTx) []error { + errs := make([]error, len(gossipables)) + for i, gossipable := range gossipables { + if _, ok := t.txs[gossipable.id]; ok { + errs[i] = fmt.Errorf("%s already present", gossipable.id) + continue + } - t.txs[gossipable.id] = gossipable - t.bloom.Add(gossipable) - if t.onAdd != nil { - t.onAdd(gossipable) + t.txs[gossipable.id] = gossipable + t.bloom.Add(gossipable) + if t.onAdd != nil { + t.onAdd(gossipable) + } } - return nil + return errs } func (t *testSet) Has(gossipID ids.ID) bool { diff --git a/vms/avm/network/gossip.go b/vms/avm/network/gossip.go index 2d3ab40bf7bb..5fb7ba9a0a70 100644 --- a/vms/avm/network/gossip.go +++ b/vms/avm/network/gossip.go @@ -99,27 +99,33 @@ type gossipMempool struct { // us and when handling transactions that were pulled from a peer. If this // returns a nil error while handling push gossip, the p2p SDK will queue the // transaction to push gossip as well. -func (g *gossipMempool) Add(tx *txs.Tx) error { - txID := tx.ID() - if _, ok := g.Mempool.Get(txID); ok { - return fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID) +func (g *gossipMempool) Add(txs ...*txs.Tx) []error { + errs := make([]error, len(txs)) + for i, tx := range txs { + txID := tx.ID() + if _, ok := g.Mempool.Get(txID); ok { + errs[i] = fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID) + continue + } + + if errs[i] = g.Mempool.GetDropReason(txID); errs[i] != nil { + // If the tx is being dropped - just ignore it + // + // TODO: Should we allow re-verification of the transaction even if it + // failed previously? + continue + } + + // Verify the tx at the currently preferred state + if errs[i] = g.txVerifier.VerifyTx(tx); errs[i] != nil { + g.Mempool.MarkDropped(txID, errs[i]) + continue + } + + errs[i] = g.AddWithoutVerification(tx) } - if reason := g.Mempool.GetDropReason(txID); reason != nil { - // If the tx is being dropped - just ignore it - // - // TODO: Should we allow re-verification of the transaction even if it - // failed previously? - return reason - } - - // Verify the tx at the currently preferred state - if err := g.txVerifier.VerifyTx(tx); err != nil { - g.Mempool.MarkDropped(txID, err) - return err - } - - return g.AddWithoutVerification(tx) + return errs } func (g *gossipMempool) Has(txID ids.ID) bool { diff --git a/vms/avm/network/network.go b/vms/avm/network/network.go index ed565b1bd578..0a624ee0cbe3 100644 --- a/vms/avm/network/network.go +++ b/vms/avm/network/network.go @@ -190,8 +190,8 @@ func (n *Network) PullGossip(ctx context.Context) { // returned. // If the tx is not added to the mempool, an error will be returned. func (n *Network) IssueTxFromRPC(tx *txs.Tx) error { - if err := n.mempool.Add(tx); err != nil { - return err + if errs := n.mempool.Add(tx); errs[0] != nil { + return errs[0] } n.txPushGossiper.Add(tx) return nil diff --git a/vms/platformvm/network/gossip.go b/vms/platformvm/network/gossip.go index 4ef98de351ba..e1bd5c258f2c 100644 --- a/vms/platformvm/network/gossip.go +++ b/vms/platformvm/network/gossip.go @@ -90,49 +90,55 @@ type gossipMempool struct { bloom *gossip.BloomFilter } -func (g *gossipMempool) Add(tx *txs.Tx) error { - txID := tx.ID() - if _, ok := g.Mempool.Get(txID); ok { - return fmt.Errorf("tx %s dropped: %w", txID, mempool.ErrDuplicateTx) +func (g *gossipMempool) Add(txns ...*txs.Tx) []error { + errs := make([]error, len(txns)) + for i, tx := range txns { + txID := tx.ID() + if _, ok := g.Mempool.Get(txID); ok { + errs[i] = fmt.Errorf("tx %s dropped: %w", txID, mempool.ErrDuplicateTx) + continue + } + + if errs[i] = g.Mempool.GetDropReason(txID); errs[i] != nil { + // If the tx is being dropped - just ignore it + // + // TODO: Should we allow re-verification of the transaction even if it + // failed previously? + continue + } + + if errs[i] = g.txVerifier.VerifyTx(tx); errs[i] != nil { + g.Mempool.MarkDropped(txID, errs[i]) + continue + } + + if errs[i] = g.Mempool.Add(tx); errs[i] != nil { + g.Mempool.MarkDropped(txID, errs[i]) + continue + } + + g.lock.Lock() + defer g.lock.Unlock() + + g.bloom.Add(tx) + var reset bool + reset, errs[i] = gossip.ResetBloomFilterIfNeeded(g.bloom, g.Mempool.Len()*bloomChurnMultiplier) + if errs[i] != nil { + continue + } + + if reset { + g.log.Debug("resetting bloom filter") + g.Mempool.Iterate(func(tx *txs.Tx) bool { + g.bloom.Add(tx) + return true + }) + } + + g.Mempool.RequestBuildBlock(false) } - if reason := g.Mempool.GetDropReason(txID); reason != nil { - // If the tx is being dropped - just ignore it - // - // TODO: Should we allow re-verification of the transaction even if it - // failed previously? - return reason - } - - if err := g.txVerifier.VerifyTx(tx); err != nil { - g.Mempool.MarkDropped(txID, err) - return err - } - - if err := g.Mempool.Add(tx); err != nil { - g.Mempool.MarkDropped(txID, err) - return err - } - - g.lock.Lock() - defer g.lock.Unlock() - - g.bloom.Add(tx) - reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, g.Mempool.Len()*bloomChurnMultiplier) - if err != nil { - return err - } - - if reset { - g.log.Debug("resetting bloom filter") - g.Mempool.Iterate(func(tx *txs.Tx) bool { - g.bloom.Add(tx) - return true - }) - } - - g.Mempool.RequestBuildBlock(false) - return nil + return errs } func (g *gossipMempool) Has(txID ids.ID) bool { diff --git a/vms/platformvm/network/network.go b/vms/platformvm/network/network.go index a43bb4b99aa1..172d88fe885a 100644 --- a/vms/platformvm/network/network.go +++ b/vms/platformvm/network/network.go @@ -213,8 +213,8 @@ func (n *Network) IssueTxFromRPC(tx *txs.Tx) error { return errMempoolDisabledWithPartialSync } - if err := n.mempool.Add(tx); err != nil { - return err + if errs := n.mempool.Add(tx); errs[0] != nil { + return errs[0] } n.txPushGossiper.Add(tx) return nil