diff --git a/go.mod b/go.mod index 616d28622b3..d6bffe6999e 100644 --- a/go.mod +++ b/go.mod @@ -156,3 +156,5 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect rsc.io/tmplfunc v0.0.3 // indirect ) + +replace github.com/ava-labs/coreth => github.com/lebdron/coreth v0.12.9-rc.9.0.20240508233954-e7b07b849892 diff --git a/go.sum b/go.sum index 41df2f52657..90da0055dde 100644 --- a/go.sum +++ b/go.sum @@ -60,8 +60,6 @@ github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/ava-labs/coreth v0.13.4-0.20240506124912-82b6c4e91557 h1:92JWd4u2pqpO551gXUIZ/qDZu3l7vn8jIxX2qRyyFwM= -github.com/ava-labs/coreth v0.13.4-0.20240506124912-82b6c4e91557/go.mod h1:yMIxezDyB/5moKt8LlATlfwR/Z5cmipY3gUQ1SqHvQ0= github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc= github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= @@ -391,6 +389,8 @@ github.com/labstack/echo/v4 v4.5.0/go.mod h1:czIriw4a0C1dFun+ObrXp7ok03xON0N1awS github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= +github.com/lebdron/coreth v0.12.9-rc.9.0.20240508233954-e7b07b849892 h1:nAWCp3wWuiynwhaKvANTpEXrmCTGw0jaY/luEdLtGrE= +github.com/lebdron/coreth v0.12.9-rc.9.0.20240508233954-e7b07b849892/go.mod h1:yMIxezDyB/5moKt8LlATlfwR/Z5cmipY3gUQ1SqHvQ0= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= diff --git a/network/p2p/gossip/gossip.go b/network/p2p/gossip/gossip.go index 918f19ca5ba..268ba9e0a1c 100644 --- a/network/p2p/gossip/gossip.go +++ b/network/p2p/gossip/gossip.go @@ -227,7 +227,7 @@ func (p *PullGossiper[_]) Gossip(ctx context.Context) error { return nil } -func (p *PullGossiper[_]) handleResponse( +func (p *PullGossiper[T]) handleResponse( _ context.Context, nodeID ids.NodeID, responseBytes []byte, @@ -249,6 +249,7 @@ func (p *PullGossiper[_]) handleResponse( } receivedBytes := 0 + gossipables := make([]T, 0, len(gossip)) for _, bytes := range gossip { receivedBytes += len(bytes) @@ -262,20 +263,24 @@ func (p *PullGossiper[_]) handleResponse( continue } - gossipID := gossipable.GossipID() p.log.Debug( "received gossip", zap.Stringer("nodeID", nodeID), - zap.Stringer("id", gossipID), + zap.Stringer("id", gossipable.GossipID()), ) - if err := p.set.Add(gossipable); err != nil { + + gossipables = append(gossipables, gossipable) + } + + errs := p.set.Add(gossipables...) + for i, err := range errs { + if err != nil { p.log.Debug( "failed to add gossip to the known set", zap.Stringer("nodeID", nodeID), - zap.Stringer("id", gossipID), + zap.Stringer("id", gossipables[i].GossipID()), zap.Error(err), ) - continue } } @@ -610,8 +615,12 @@ func (EmptySet[_]) Gossip(context.Context) error { return nil } -func (EmptySet[T]) Add(T) error { - return errEmptySetCantAdd +func (EmptySet[T]) Add(gossipables ...T) []error { + errs := make([]error, len(gossipables)) + for i := range errs { + errs[i] = errEmptySetCantAdd + } + return errs } func (EmptySet[T]) Has(ids.ID) bool { @@ -630,8 +639,8 @@ func (FullSet[_]) Gossip(context.Context) error { return nil } -func (FullSet[T]) Add(T) error { - return nil +func (FullSet[T]) Add(gossipables ...T) []error { + return make([]error, len(gossipables)) } func (FullSet[T]) Has(ids.ID) bool { diff --git a/network/p2p/gossip/gossip_test.go b/network/p2p/gossip/gossip_test.go index a1eed80bed3..60896dcdb3b 100644 --- a/network/p2p/gossip/gossip_test.go +++ b/network/p2p/gossip/gossip_test.go @@ -116,8 +116,9 @@ func TestGossiperGossip(t *testing.T) { txs: make(map[ids.ID]*testTx), bloom: responseBloom, } - for _, item := range tt.responder { - require.NoError(responseSet.Add(item)) + errs := responseSet.Add(tt.responder...) + for _, err := range errs { + require.NoError(err) } metrics, err := NewMetrics(prometheus.NewRegistry(), "") @@ -147,8 +148,9 @@ func TestGossiperGossip(t *testing.T) { txs: make(map[ids.ID]*testTx), bloom: bloom, } - for _, item := range tt.requester { - require.NoError(requestSet.Add(item)) + errs = requestSet.Add(tt.requester...) + for _, err := range errs { + require.NoError(err) } requestClient := requestNetwork.NewClient(0x0) diff --git a/network/p2p/gossip/gossipable.go b/network/p2p/gossip/gossipable.go index 6af60d666bb..05b6dfad553 100644 --- a/network/p2p/gossip/gossipable.go +++ b/network/p2p/gossip/gossipable.go @@ -18,9 +18,9 @@ type Marshaller[T Gossipable] interface { // Set holds a set of known Gossipable items type Set[T Gossipable] interface { - // Add adds a Gossipable to the set. Returns an error if gossipable was not + // Add adds Gossipables to the set. Returns an error if gossipable was not // added. - Add(gossipable T) error + Add(gossipables ...T) []error // Has returns true if the gossipable is in the set. Has(gossipID ids.ID) bool // Iterate iterates over elements until [f] returns false diff --git a/network/p2p/gossip/handler.go b/network/p2p/gossip/handler.go index 7f5f7b380ed..45973da5825 100644 --- a/network/p2p/gossip/handler.go +++ b/network/p2p/gossip/handler.go @@ -83,7 +83,7 @@ func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, req return MarshalAppResponse(gossipBytes) } -func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) { +func (h Handler[T]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) { gossip, err := ParseAppGossip(gossipBytes) if err != nil { h.log.Debug("failed to unmarshal gossip", zap.Error(err)) @@ -91,6 +91,7 @@ func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes } receivedBytes := 0 + gossipables := make([]T, 0, len(gossip)) for _, bytes := range gossip { receivedBytes += len(bytes) gossipable, err := h.marshaller.UnmarshalGossip(bytes) @@ -102,11 +103,16 @@ func (h Handler[_]) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes continue } - if err := h.set.Add(gossipable); err != nil { + gossipables = append(gossipables, gossipable) + } + + errs := h.set.Add(gossipables...) + for i, err := range errs { + if err != nil { h.log.Debug( "failed to add gossip to the known set", zap.Stringer("nodeID", nodeID), - zap.Stringer("id", gossipable.GossipID()), + zap.Stringer("id", gossipables[i].GossipID()), zap.Error(err), ) } diff --git a/network/p2p/gossip/test_gossip.go b/network/p2p/gossip/test_gossip.go index 7f8782b6591..cdfed5ac3bc 100644 --- a/network/p2p/gossip/test_gossip.go +++ b/network/p2p/gossip/test_gossip.go @@ -42,18 +42,22 @@ type testSet struct { onAdd func(tx *testTx) } -func (t *testSet) Add(gossipable *testTx) error { - if _, ok := t.txs[gossipable.id]; ok { - return fmt.Errorf("%s already present", gossipable.id) - } +func (t *testSet) Add(gossipables ...*testTx) []error { + errs := make([]error, len(gossipables)) + for i, gossipable := range gossipables { + if _, ok := t.txs[gossipable.id]; ok { + errs[i] = fmt.Errorf("%s already present", gossipable.id) + continue + } - t.txs[gossipable.id] = gossipable - t.bloom.Add(gossipable) - if t.onAdd != nil { - t.onAdd(gossipable) + t.txs[gossipable.id] = gossipable + t.bloom.Add(gossipable) + if t.onAdd != nil { + t.onAdd(gossipable) + } } - return nil + return errs } func (t *testSet) Has(gossipID ids.ID) bool { diff --git a/vms/avm/network/gossip.go b/vms/avm/network/gossip.go index 6ea5f144848..80181bd928d 100644 --- a/vms/avm/network/gossip.go +++ b/vms/avm/network/gossip.go @@ -101,27 +101,33 @@ type gossipMempool struct { // us and when handling transactions that were pulled from a peer. If this // returns a nil error while handling push gossip, the p2p SDK will queue the // transaction to push gossip as well. -func (g *gossipMempool) Add(tx *txs.Tx) error { - txID := tx.ID() - if _, ok := g.Mempool.Get(txID); ok { - return fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID) +func (g *gossipMempool) Add(txs ...*txs.Tx) []error { + errs := make([]error, len(txs)) + for i, tx := range txs { + txID := tx.ID() + if _, ok := g.Mempool.Get(txID); ok { + errs[i] = fmt.Errorf("attempted to issue %w: %s ", mempool.ErrDuplicateTx, txID) + continue + } + + if errs[i] = g.Mempool.GetDropReason(txID); errs[i] != nil { + // If the tx is being dropped - just ignore it + // + // TODO: Should we allow re-verification of the transaction even if it + // failed previously? + continue + } + + // Verify the tx at the currently preferred state + if errs[i] = g.txVerifier.VerifyTx(tx); errs[i] != nil { + g.Mempool.MarkDropped(txID, errs[i]) + continue + } + + errs[i] = g.AddWithoutVerification(tx) } - if reason := g.Mempool.GetDropReason(txID); reason != nil { - // If the tx is being dropped - just ignore it - // - // TODO: Should we allow re-verification of the transaction even if it - // failed previously? - return reason - } - - // Verify the tx at the currently preferred state - if err := g.txVerifier.VerifyTx(tx); err != nil { - g.Mempool.MarkDropped(txID, err) - return err - } - - return g.AddWithoutVerification(tx) + return errs } func (g *gossipMempool) Has(txID ids.ID) bool { diff --git a/vms/avm/network/gossip_test.go b/vms/avm/network/gossip_test.go index e84f259cbe3..5fac19e46df 100644 --- a/vms/avm/network/gossip_test.go +++ b/vms/avm/network/gossip_test.go @@ -87,7 +87,7 @@ func TestGossipMempoolAdd(t *testing.T) { TxID: ids.GenerateTestID(), } - require.NoError(mempool.Add(tx)) + require.NoError(mempool.Add(tx)[0]) require.True(mempool.bloom.Has(tx)) } diff --git a/vms/avm/network/network.go b/vms/avm/network/network.go index ed565b1bd57..0a624ee0cbe 100644 --- a/vms/avm/network/network.go +++ b/vms/avm/network/network.go @@ -190,8 +190,8 @@ func (n *Network) PullGossip(ctx context.Context) { // returned. // If the tx is not added to the mempool, an error will be returned. func (n *Network) IssueTxFromRPC(tx *txs.Tx) error { - if err := n.mempool.Add(tx); err != nil { - return err + if errs := n.mempool.Add(tx); errs[0] != nil { + return errs[0] } n.txPushGossiper.Add(tx) return nil diff --git a/vms/platformvm/network/gossip.go b/vms/platformvm/network/gossip.go index c8cfefceed4..8912bb15a91 100644 --- a/vms/platformvm/network/gossip.go +++ b/vms/platformvm/network/gossip.go @@ -92,49 +92,63 @@ type gossipMempool struct { bloom *gossip.BloomFilter } -func (g *gossipMempool) Add(tx *txs.Tx) error { - txID := tx.ID() - if _, ok := g.Mempool.Get(txID); ok { - return fmt.Errorf("tx %s dropped: %w", txID, mempool.ErrDuplicateTx) +func (g *gossipMempool) Add(txns ...*txs.Tx) []error { + errs := make([]error, len(txns)) + hasErr := false + for i, tx := range txns { + txID := tx.ID() + if _, ok := g.Mempool.Get(txID); ok { + errs[i] = fmt.Errorf("tx %s dropped: %w", txID, mempool.ErrDuplicateTx) + hasErr = true + continue + } + + if errs[i] = g.Mempool.GetDropReason(txID); errs[i] != nil { + // If the tx is being dropped - just ignore it + // + // TODO: Should we allow re-verification of the transaction even if it + // failed previously? + hasErr = true + continue + } + + if errs[i] = g.txVerifier.VerifyTx(tx); errs[i] != nil { + g.Mempool.MarkDropped(txID, errs[i]) + hasErr = true + continue + } + + if errs[i] = g.Mempool.Add(tx); errs[i] != nil { + g.Mempool.MarkDropped(txID, errs[i]) + hasErr = true + continue + } + + g.lock.Lock() + + g.bloom.Add(tx) + var reset bool + reset, errs[i] = gossip.ResetBloomFilterIfNeeded(g.bloom, g.Mempool.Len()*bloomChurnMultiplier) + if errs[i] != nil { + hasErr = true + continue + } + + if reset { + g.log.Debug("resetting bloom filter") + g.Mempool.Iterate(func(tx *txs.Tx) bool { + g.bloom.Add(tx) + return true + }) + } + + g.lock.Unlock() } - - if reason := g.Mempool.GetDropReason(txID); reason != nil { - // If the tx is being dropped - just ignore it - // - // TODO: Should we allow re-verification of the transaction even if it - // failed previously? - return reason - } - - if err := g.txVerifier.VerifyTx(tx); err != nil { - g.Mempool.MarkDropped(txID, err) - return err - } - - if err := g.Mempool.Add(tx); err != nil { - g.Mempool.MarkDropped(txID, err) - return err - } - - g.lock.Lock() - defer g.lock.Unlock() - - g.bloom.Add(tx) - reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, g.Mempool.Len()*bloomChurnMultiplier) - if err != nil { - return err - } - - if reset { - g.log.Debug("resetting bloom filter") - g.Mempool.Iterate(func(tx *txs.Tx) bool { - g.bloom.Add(tx) - return true - }) + if !hasErr { + g.Mempool.RequestBuildBlock(false) } - g.Mempool.RequestBuildBlock(false) - return nil + return errs } func (g *gossipMempool) Has(txID ids.ID) bool { diff --git a/vms/platformvm/network/gossip_test.go b/vms/platformvm/network/gossip_test.go index eea36ed5230..c49b283f0c6 100644 --- a/vms/platformvm/network/gossip_test.go +++ b/vms/platformvm/network/gossip_test.go @@ -49,7 +49,7 @@ func TestGossipMempoolAddVerificationError(t *testing.T) { ) require.NoError(err) - err = gossipMempool.Add(tx) + err = gossipMempool.Add(tx)[0] require.ErrorIs(err, errFoo) require.False(gossipMempool.bloom.Has(tx)) } @@ -83,7 +83,7 @@ func TestGossipMempoolAddError(t *testing.T) { ) require.NoError(err) - err = gossipMempool.Add(tx) + err = gossipMempool.Add(tx)[0] require.ErrorIs(err, errFoo) require.False(gossipMempool.bloom.Has(tx)) } @@ -114,7 +114,7 @@ func TestMempoolDuplicate(t *testing.T) { ) require.NoError(err) - err = gossipMempool.Add(tx) + err = gossipMempool.Add(tx)[0] require.ErrorIs(err, mempool.ErrDuplicateTx) require.False(gossipMempool.bloom.Has(tx)) } @@ -149,6 +149,6 @@ func TestGossipAddBloomFilter(t *testing.T) { ) require.NoError(err) - require.NoError(gossipMempool.Add(tx)) + require.NoError(gossipMempool.Add(tx)[0]) require.True(gossipMempool.bloom.Has(tx)) } diff --git a/vms/platformvm/network/network.go b/vms/platformvm/network/network.go index a43bb4b99aa..172d88fe885 100644 --- a/vms/platformvm/network/network.go +++ b/vms/platformvm/network/network.go @@ -213,8 +213,8 @@ func (n *Network) IssueTxFromRPC(tx *txs.Tx) error { return errMempoolDisabledWithPartialSync } - if err := n.mempool.Add(tx); err != nil { - return err + if errs := n.mempool.Add(tx); errs[0] != nil { + return errs[0] } n.txPushGossiper.Add(tx) return nil