Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network/p2p/gossip: refactor Set.Add to accept multiple elements #2986

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,5 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)

replace github.com/ava-labs/coreth => github.com/lebdron/coreth v0.12.9-rc.9.0.20240508233954-e7b07b849892
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.13.4-0.20240506124912-82b6c4e91557 h1:92JWd4u2pqpO551gXUIZ/qDZu3l7vn8jIxX2qRyyFwM=
github.com/ava-labs/coreth v0.13.4-0.20240506124912-82b6c4e91557/go.mod h1:yMIxezDyB/5moKt8LlATlfwR/Z5cmipY3gUQ1SqHvQ0=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down Expand Up @@ -391,6 +389,8 @@ github.com/labstack/echo/v4 v4.5.0/go.mod h1:czIriw4a0C1dFun+ObrXp7ok03xON0N1awS
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/lebdron/coreth v0.12.9-rc.9.0.20240508233954-e7b07b849892 h1:nAWCp3wWuiynwhaKvANTpEXrmCTGw0jaY/luEdLtGrE=
github.com/lebdron/coreth v0.12.9-rc.9.0.20240508233954-e7b07b849892/go.mod h1:yMIxezDyB/5moKt8LlATlfwR/Z5cmipY3gUQ1SqHvQ0=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
Expand Down
29 changes: 19 additions & 10 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (p *PullGossiper[_]) Gossip(ctx context.Context) error {
return nil
}

func (p *PullGossiper[_]) handleResponse(
func (p *PullGossiper[T]) handleResponse(
_ context.Context,
nodeID ids.NodeID,
responseBytes []byte,
Expand All @@ -249,6 +249,7 @@ func (p *PullGossiper[_]) handleResponse(
}

receivedBytes := 0
gossipables := make([]T, 0, len(gossip))
for _, bytes := range gossip {
receivedBytes += len(bytes)

Expand All @@ -262,20 +263,24 @@ func (p *PullGossiper[_]) handleResponse(
continue
}

gossipID := gossipable.GossipID()
p.log.Debug(
"received gossip",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", gossipID),
zap.Stringer("id", gossipable.GossipID()),
)
if err := p.set.Add(gossipable); err != nil {

gossipables = append(gossipables, gossipable)
}

errs := p.set.Add(gossipables...)
for i, err := range errs {
if err != nil {
p.log.Debug(
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", gossipID),
zap.Stringer("id", gossipables[i].GossipID()),
zap.Error(err),
)
continue
}
}

Expand Down Expand Up @@ -610,8 +615,12 @@ func (EmptySet[_]) Gossip(context.Context) error {
return nil
}

func (EmptySet[T]) Add(T) error {
return errEmptySetCantAdd
func (EmptySet[T]) Add(gossipables ...T) []error {
errs := make([]error, len(gossipables))
for i := range errs {
errs[i] = errEmptySetCantAdd
}
return errs
}

func (EmptySet[T]) Has(ids.ID) bool {
Expand All @@ -630,8 +639,8 @@ func (FullSet[_]) Gossip(context.Context) error {
return nil
}

func (FullSet[T]) Add(T) error {
return nil
func (FullSet[T]) Add(gossipables ...T) []error {
return make([]error, len(gossipables))
}

func (FullSet[T]) Has(ids.ID) bool {
Expand Down
10 changes: 6 additions & 4 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ func TestGossiperGossip(t *testing.T) {
txs: make(map[ids.ID]*testTx),
bloom: responseBloom,
}
for _, item := range tt.responder {
require.NoError(responseSet.Add(item))
errs := responseSet.Add(tt.responder...)
for _, err := range errs {
require.NoError(err)
}

metrics, err := NewMetrics(prometheus.NewRegistry(), "")
Expand Down Expand Up @@ -147,8 +148,9 @@ func TestGossiperGossip(t *testing.T) {
txs: make(map[ids.ID]*testTx),
bloom: bloom,
}
for _, item := range tt.requester {
require.NoError(requestSet.Add(item))
errs = requestSet.Add(tt.requester...)
for _, err := range errs {
require.NoError(err)
}

requestClient := requestNetwork.NewClient(0x0)
Expand Down
4 changes: 2 additions & 2 deletions network/p2p/gossip/gossipable.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ type Marshaller[T Gossipable] interface {

// Set holds a set of known Gossipable items
type Set[T Gossipable] interface {
// Add adds a Gossipable to the set. Returns an error if gossipable was not
// Add adds Gossipables to the set. Returns an error if gossipable was not
// added.
Add(gossipable T) error
Add(gossipables ...T) []error
// Has returns true if the gossipable is in the set.
Has(gossipID ids.ID) bool
// Iterate iterates over elements until [f] returns false
Expand Down
12 changes: 9 additions & 3 deletions network/p2p/gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,15 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req
return MarshalAppResponse(gossipBytes)
}

func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) {
func (h Handler[T]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) {
gossip, err := ParseAppGossip(gossipBytes)
if err != nil {
h.log.Debug("failed to unmarshal gossip", zap.Error(err))
return
}

receivedBytes := 0
gossipables := make([]T, 0, len(gossip))
for _, bytes := range gossip {
receivedBytes += len(bytes)
gossipable, err := h.marshaller.UnmarshalGossip(bytes)
Expand All @@ -102,11 +103,16 @@ func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes
continue
}

if err := h.set.Add(gossipable); err != nil {
gossipables = append(gossipables, gossipable)
}

errs := h.set.Add(gossipables...)
for i, err := range errs {
if err != nil {
h.log.Debug(
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Stringer("id", gossipable.GossipID()),
zap.Stringer("id", gossipables[i].GossipID()),
zap.Error(err),
)
}
Expand Down
22 changes: 13 additions & 9 deletions network/p2p/gossip/test_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,22 @@ type testSet struct {
onAdd func(tx *testTx)
}

func (t *testSet) Add(gossipable *testTx) error {
if _, ok := t.txs[gossipable.id]; ok {
return fmt.Errorf("%s already present", gossipable.id)
}
func (t *testSet) Add(gossipables ...*testTx) []error {
errs := make([]error, len(gossipables))
for i, gossipable := range gossipables {
if _, ok := t.txs[gossipable.id]; ok {
errs[i] = fmt.Errorf("%s already present", gossipable.id)
continue
}

t.txs[gossipable.id] = gossipable
t.bloom.Add(gossipable)
if t.onAdd != nil {
t.onAdd(gossipable)
t.txs[gossipable.id] = gossipable
t.bloom.Add(gossipable)
if t.onAdd != nil {
t.onAdd(gossipable)
}
}

return nil
return errs
}

func (t *testSet) Has(gossipID ids.ID) bool {
Expand Down
44 changes: 25 additions & 19 deletions vms/avm/network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,27 +101,33 @@ type gossipMempool struct {
// us and when handling transactions that were pulled from a peer. If this
// returns a nil error while handling push gossip, the p2p SDK will queue the
// transaction to push gossip as well.
func (g *gossipMempool) Add(tx *txs.Tx) error {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
return fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID)
func (g *gossipMempool) Add(txs ...*txs.Tx) []error {
errs := make([]error, len(txs))
for i, tx := range txs {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
errs[i] = fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID)
continue
}

if errs[i] = g.Mempool.GetDropReason(txID); errs[i] != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
continue
}

// Verify the tx at the currently preferred state
if errs[i] = g.txVerifier.VerifyTx(tx); errs[i] != nil {
g.Mempool.MarkDropped(txID, errs[i])
continue
}

errs[i] = g.AddWithoutVerification(tx)
}

if reason := g.Mempool.GetDropReason(txID); reason != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
return reason
}

// Verify the tx at the currently preferred state
if err := g.txVerifier.VerifyTx(tx); err != nil {
g.Mempool.MarkDropped(txID, err)
return err
}

return g.AddWithoutVerification(tx)
return errs
}

func (g *gossipMempool) Has(txID ids.ID) bool {
Expand Down
2 changes: 1 addition & 1 deletion vms/avm/network/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestGossipMempoolAdd(t *testing.T) {
TxID: ids.GenerateTestID(),
}

require.NoError(mempool.Add(tx))
require.NoError(mempool.Add(tx)[0])
require.True(mempool.bloom.Has(tx))
}

Expand Down
4 changes: 2 additions & 2 deletions vms/avm/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ func (n *Network) PullGossip(ctx context.Context) {
// returned.
// If the tx is not added to the mempool, an error will be returned.
func (n *Network) IssueTxFromRPC(tx *txs.Tx) error {
if err := n.mempool.Add(tx); err != nil {
return err
if errs := n.mempool.Add(tx); errs[0] != nil {
return errs[0]
}
n.txPushGossiper.Add(tx)
return nil
Expand Down
94 changes: 54 additions & 40 deletions vms/platformvm/network/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,49 +92,63 @@ type gossipMempool struct {
bloom *gossip.BloomFilter
}

func (g *gossipMempool) Add(tx *txs.Tx) error {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
return fmt.Errorf("tx %s dropped: %w", txID, mempool.ErrDuplicateTx)
func (g *gossipMempool) Add(txns ...*txs.Tx) []error {
errs := make([]error, len(txns))
hasErr := false
for i, tx := range txns {
txID := tx.ID()
if _, ok := g.Mempool.Get(txID); ok {
errs[i] = fmt.Errorf("tx %s dropped: %w", txID, mempool.ErrDuplicateTx)
hasErr = true
continue
}

if errs[i] = g.Mempool.GetDropReason(txID); errs[i] != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
hasErr = true
continue
}

if errs[i] = g.txVerifier.VerifyTx(tx); errs[i] != nil {
g.Mempool.MarkDropped(txID, errs[i])
hasErr = true
continue
}

if errs[i] = g.Mempool.Add(tx); errs[i] != nil {
g.Mempool.MarkDropped(txID, errs[i])
hasErr = true
continue
}

g.lock.Lock()

g.bloom.Add(tx)
var reset bool
reset, errs[i] = gossip.ResetBloomFilterIfNeeded(g.bloom, g.Mempool.Len()*bloomChurnMultiplier)
if errs[i] != nil {
hasErr = true
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add tests for multiple txs - I think this can continue to the next iteration with the lock held.

Would it make sense to keep the single tx addition as a method so that we can keep using defer g.lock.Unlock()?

}

if reset {
g.log.Debug("resetting bloom filter")
g.Mempool.Iterate(func(tx *txs.Tx) bool {
g.bloom.Add(tx)
return true
})
}

g.lock.Unlock()
}

if reason := g.Mempool.GetDropReason(txID); reason != nil {
// If the tx is being dropped - just ignore it
//
// TODO: Should we allow re-verification of the transaction even if it
// failed previously?
return reason
}

if err := g.txVerifier.VerifyTx(tx); err != nil {
g.Mempool.MarkDropped(txID, err)
return err
}

if err := g.Mempool.Add(tx); err != nil {
g.Mempool.MarkDropped(txID, err)
return err
}

g.lock.Lock()
defer g.lock.Unlock()

g.bloom.Add(tx)
reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, g.Mempool.Len()*bloomChurnMultiplier)
if err != nil {
return err
}

if reset {
g.log.Debug("resetting bloom filter")
g.Mempool.Iterate(func(tx *txs.Tx) bool {
g.bloom.Add(tx)
return true
})
if !hasErr {
g.Mempool.RequestBuildBlock(false)
}

g.Mempool.RequestBuildBlock(false)
return nil
return errs
}

func (g *gossipMempool) Has(txID ids.ID) bool {
Expand Down
Loading