Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network/p2p/gossip: refactor Set.Add to accept multiple elements #2986

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (p *PullGossiper[_]) Gossip(ctx context.Context) error {
return nil
}

func (p *PullGossiper[_]) handleResponse(
func (p *PullGossiper[T]) handleResponse(
_ context.Context,
nodeID ids.NodeID,
responseBytes []byte,
Expand All @@ -220,6 +220,7 @@ func (p *PullGossiper[_]) handleResponse(
}

receivedBytes := 0
gossipables := make([]T, 0, len(gossip))
for _, bytes := range gossip {
receivedBytes += len(bytes)

Expand All @@ -233,20 +234,24 @@ func (p *PullGossiper[_]) handleResponse(
continue
}

gossipID := gossipable.GossipID()
p.log.Debug(
"received gossip",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", gossipID),
zap.Stringer("id", gossipable.GossipID()),
)
if err := p.set.Add(gossipable); err != nil {

gossipables = append(gossipables, gossipable)
}

errs := p.set.Add(gossipables...)
for i, err := range errs {
if err != nil {
p.log.Debug(
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", gossipID),
zap.Stringer("id", gossipables[i].GossipID()),
zap.Error(err),
)
continue
}
}

Expand Down Expand Up @@ -596,8 +601,12 @@ func (EmptySet[_]) Gossip(context.Context) error {
return nil
}

func (EmptySet[T]) Add(T) error {
return errEmptySetCantAdd
func (EmptySet[T]) Add(gossipables ...T) []error {
errs := make([]error, len(gossipables))
for i := range errs {
errs[i] = errEmptySetCantAdd
}
return errs
}

func (EmptySet[T]) Has(ids.ID) bool {
Expand All @@ -616,8 +625,8 @@ func (FullSet[_]) Gossip(context.Context) error {
return nil
}

func (FullSet[T]) Add(T) error {
return nil
func (FullSet[T]) Add(gossipables ...T) []error {
return make([]error, len(gossipables))
}

func (FullSet[T]) Has(ids.ID) bool {
Expand Down
4 changes: 2 additions & 2 deletions network/p2p/gossip/gossipable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ type Marshaller[T Gossipable] interface {

// Set holds a set of known Gossipable items
type Set[T Gossipable] interface {
// Add adds a Gossipable to the set. Returns an error if gossipable was not
// Add adds Gossipables to the set. Returns an error if gossipable was not
// added.
Add(gossipable T) error
Add(gossipables ...T) []error
// Has returns true if the gossipable is in the set.
Has(gossipID ids.ID) bool
// Iterate iterates over elements until [f] returns false
Expand Down
12 changes: 9 additions & 3 deletions network/p2p/gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,15 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req
return MarshalAppResponse(gossipBytes)
}

func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) {
func (h Handler[T]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) {
gossip, err := ParseAppGossip(gossipBytes)
if err != nil {
h.log.Debug("failed to unmarshal gossip", zap.Error(err))
return
}

receivedBytes := 0
gossipables := make([]T, 0, len(gossip))
for _, bytes := range gossip {
receivedBytes += len(bytes)
gossipable, err := h.marshaller.UnmarshalGossip(bytes)
Expand All @@ -113,11 +114,16 @@ func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes
continue
}

if err := h.set.Add(gossipable); err != nil {
gossipables = append(gossipables, gossipable)
}

errs := h.set.Add(gossipables...)
for i, err := range errs {
if err != nil {
h.log.Debug(
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", gossipable.GossipID()),
zap.Stringer("id", gossipables[i].GossipID()),
zap.Error(err),
)
}
Expand Down
22 changes: 13 additions & 9 deletions network/p2p/gossip/test_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,22 @@ type testSet struct {
onAdd func(tx *testTx)
}

func (t *testSet) Add(gossipable *testTx) error {
if _, ok := t.txs[gossipable.id]; ok {
return fmt.Errorf("%s already present", gossipable.id)
}
func (t *testSet) Add(gossipables ...*testTx) []error {
errs := make([]error, len(gossipables))
for i, gossipable := range gossipables {
if _, ok := t.txs[gossipable.id]; ok {
errs[i] = fmt.Errorf("%s already present", gossipable.id)
continue
}

t.txs[gossipable.id] = gossipable
t.bloom.Add(gossipable)
if t.onAdd != nil {
t.onAdd(gossipable)
t.txs[gossipable.id] = gossipable
t.bloom.Add(gossipable)
if t.onAdd != nil {
t.onAdd(gossipable)
}
}

return nil
return errs
}

func (t *testSet) Has(gossipID ids.ID) bool {
Expand Down
44 changes: 25 additions & 19 deletions vms/avm/network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,27 +99,33 @@ type gossipMempool struct {
// us and when handling transactions that were pulled from a peer. If this
// returns a nil error while handling push gossip, the p2p SDK will queue the
// transaction to push gossip as well.
func (g *gossipMempool) Add(tx *txs.Tx) error {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
return fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID)
func (g *gossipMempool) Add(txs ...*txs.Tx) []error {
errs := make([]error, len(txs))
for i, tx := range txs {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
errs[i] = fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID)
continue
}

if errs[i] = g.Mempool.GetDropReason(txID); errs[i] != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
continue
}

// Verify the tx at the currently preferred state
if errs[i] = g.txVerifier.VerifyTx(tx); errs[i] != nil {
g.Mempool.MarkDropped(txID, errs[i])
continue
}

errs[i] = g.AddWithoutVerification(tx)
}

if reason := g.Mempool.GetDropReason(txID); reason != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
return reason
}

// Verify the tx at the currently preferred state
if err := g.txVerifier.VerifyTx(tx); err != nil {
g.Mempool.MarkDropped(txID, err)
return err
}

return g.AddWithoutVerification(tx)
return errs
}

func (g *gossipMempool) Has(txID ids.ID) bool {
Expand Down
4 changes: 2 additions & 2 deletions vms/avm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ func (n *Network) PullGossip(ctx context.Context) {
// returned.
// If the tx is not added to the mempool, an error will be returned.
func (n *Network) IssueTxFromRPC(tx *txs.Tx) error {
if err := n.mempool.Add(tx); err != nil {
return err
if errs := n.mempool.Add(tx); errs[0] != nil {
return errs[0]
}
n.txPushGossiper.Add(tx)
return nil
Expand Down
87 changes: 46 additions & 41 deletions vms/platformvm/network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,49 +90,54 @@ type gossipMempool struct {
bloom *gossip.BloomFilter
}

func (g *gossipMempool) Add(tx *txs.Tx) error {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
return fmt.Errorf("tx %s dropped: %w", txID, mempool.ErrDuplicateTx)
func (g *gossipMempool) Add(txns ...*txs.Tx) []error {
errs := make([]error, len(txns))
for i, tx := range txns {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
errs[i] = fmt.Errorf("tx %s dropped: %w", txID, mempool.ErrDuplicateTx)
continue
}

if errs[i] = g.Mempool.GetDropReason(txID); errs[i] != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
continue
}

if errs[i] = g.txVerifier.VerifyTx(tx); errs[i] != nil {
g.Mempool.MarkDropped(txID, errs[i])
continue
}

if errs[i] = g.Mempool.Add(tx); errs[i] != nil {
g.Mempool.MarkDropped(txID, errs[i])
continue
}

g.lock.Lock()
defer g.lock.Unlock()
Copy link
Contributor

@StephenButtolph StephenButtolph May 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is going to work as expected if multiple transactions are provided. The defer will be executed on function return - not when the loop iterates.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, overlooked that, thanks! Regarding X/P-Chain mempools, I'd rather put all the checks and tx verification inside the mempool not to acquire RLock for every lookup. Plus, GetDropReason acquires RW lock on mutex inside LRU cache, which would also be possible to rearrange if we do the verification inside Add. It's possible to pass txVerifier as a dependency to mempool I think.


g.bloom.Add(tx)
var reset bool
reset, errs[i] = gossip.ResetBloomFilterIfNeeded(g.bloom, g.Mempool.Len()*bloomChurnMultiplier)
if errs[i] != nil {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add tests for multiple txs - I think this can continue to the next iteration with the lock held.

Would it make sense to keep the single tx addition as a method so that we can keep using defer g.lock.Unlock()?

}

if reset {
g.log.Debug("resetting bloom filter")
g.Mempool.Iterate(func(tx *txs.Tx) bool {
g.bloom.Add(tx)
return true
})
}
}

if reason := g.Mempool.GetDropReason(txID); reason != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
return reason
}

if err := g.txVerifier.VerifyTx(tx); err != nil {
g.Mempool.MarkDropped(txID, err)
return err
}

if err := g.Mempool.Add(tx); err != nil {
g.Mempool.MarkDropped(txID, err)
return err
}

g.lock.Lock()
defer g.lock.Unlock()

g.bloom.Add(tx)
reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, g.Mempool.Len()*bloomChurnMultiplier)
if err != nil {
return err
}

if reset {
g.log.Debug("resetting bloom filter")
g.Mempool.Iterate(func(tx *txs.Tx) bool {
g.bloom.Add(tx)
return true
})
}

g.Mempool.RequestBuildBlock(false)
return nil

return errs
}

func (g *gossipMempool) Has(txID ids.ID) bool {
Expand Down
4 changes: 2 additions & 2 deletions vms/platformvm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ func (n *Network) IssueTxFromRPC(tx *txs.Tx) error {
return errMempoolDisabledWithPartialSync
}

if err := n.mempool.Add(tx); err != nil {
return err
if errs := n.mempool.Add(tx); errs[0] != nil {
return errs[0]
}
n.txPushGossiper.Add(tx)
return nil
Expand Down