-
Notifications
You must be signed in to change notification settings - Fork 671
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add gossip package to p2p SDK (#1958)
- Loading branch information
1 parent
ecf6b4f
commit 484a72f
Showing
9 changed files
with
915 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. | ||
// See the file LICENSE for licensing terms. | ||
|
||
package gossip | ||
|
||
import ( | ||
"crypto/rand" | ||
"encoding/binary" | ||
"hash" | ||
|
||
bloomfilter "github.com/holiman/bloomfilter/v2" | ||
|
||
"github.com/ava-labs/avalanchego/ids" | ||
) | ||
|
||
var _ hash.Hash64 = (*hasher)(nil) | ||
|
||
// NewBloomFilter returns a new instance of a bloom filter with at most | ||
// [maxExpectedElements] elements anticipated at any moment, and a false | ||
// positive probability of [falsePositiveProbability]. | ||
func NewBloomFilter( | ||
maxExpectedElements uint64, | ||
falsePositiveProbability float64, | ||
) (*BloomFilter, error) { | ||
bloom, err := bloomfilter.NewOptimal( | ||
maxExpectedElements, | ||
falsePositiveProbability, | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
salt, err := randomSalt() | ||
return &BloomFilter{ | ||
Bloom: bloom, | ||
Salt: salt, | ||
}, err | ||
} | ||
|
||
type BloomFilter struct { | ||
Bloom *bloomfilter.Filter | ||
// Salt is provided to eventually unblock collisions in Bloom. It's possible | ||
// that conflicting Gossipable items collide in the bloom filter, so a salt | ||
// is generated to eventually resolve collisions. | ||
Salt ids.ID | ||
} | ||
|
||
func (b *BloomFilter) Add(gossipable Gossipable) { | ||
h := gossipable.GetID() | ||
salted := &hasher{ | ||
hash: h[:], | ||
salt: b.Salt, | ||
} | ||
b.Bloom.Add(salted) | ||
} | ||
|
||
func (b *BloomFilter) Has(gossipable Gossipable) bool { | ||
h := gossipable.GetID() | ||
salted := &hasher{ | ||
hash: h[:], | ||
salt: b.Salt, | ||
} | ||
return b.Bloom.Contains(salted) | ||
} | ||
|
||
// ResetBloomFilterIfNeeded resets a bloom filter if it breaches a target false | ||
// positive probability. Returns true if the bloom filter was reset. | ||
func ResetBloomFilterIfNeeded( | ||
bloomFilter *BloomFilter, | ||
falsePositiveProbability float64, | ||
) (bool, error) { | ||
if bloomFilter.Bloom.FalsePosititveProbability() < falsePositiveProbability { | ||
return false, nil | ||
} | ||
|
||
newBloom, err := bloomfilter.New(bloomFilter.Bloom.M(), bloomFilter.Bloom.K()) | ||
if err != nil { | ||
return false, err | ||
} | ||
salt, err := randomSalt() | ||
if err != nil { | ||
return false, err | ||
} | ||
|
||
bloomFilter.Bloom = newBloom | ||
bloomFilter.Salt = salt | ||
return true, nil | ||
} | ||
|
||
func randomSalt() (ids.ID, error) { | ||
salt := ids.ID{} | ||
_, err := rand.Read(salt[:]) | ||
return salt, err | ||
} | ||
|
||
type hasher struct { | ||
hash []byte | ||
salt ids.ID | ||
} | ||
|
||
func (h *hasher) Write(p []byte) (n int, err error) { | ||
h.hash = append(h.hash, p...) | ||
return len(p), nil | ||
} | ||
|
||
func (h *hasher) Sum(b []byte) []byte { | ||
h.hash = append(h.hash, b...) | ||
return h.hash | ||
} | ||
|
||
func (h *hasher) Reset() { | ||
h.hash = ids.Empty[:] | ||
} | ||
|
||
func (*hasher) BlockSize() int { | ||
return ids.IDLen | ||
} | ||
|
||
func (h *hasher) Sum64() uint64 { | ||
salted := ids.ID{} | ||
for i := 0; i < len(h.hash) && i < ids.IDLen; i++ { | ||
salted[i] = h.hash[i] ^ h.salt[i] | ||
} | ||
|
||
return binary.BigEndian.Uint64(salted[:]) | ||
} | ||
|
||
func (h *hasher) Size() int { | ||
return len(h.hash) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. | ||
// See the file LICENSE for licensing terms. | ||
|
||
package gossip | ||
|
||
import ( | ||
"testing" | ||
|
||
bloomfilter "github.com/holiman/bloomfilter/v2" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/ava-labs/avalanchego/ids" | ||
) | ||
|
||
func TestBloomFilterRefresh(t *testing.T) { | ||
tests := []struct { | ||
name string | ||
falsePositiveProbability float64 | ||
add []*testTx | ||
expected []*testTx | ||
}{ | ||
{ | ||
name: "no refresh", | ||
falsePositiveProbability: 1, | ||
add: []*testTx{ | ||
{id: ids.ID{0}}, | ||
}, | ||
expected: []*testTx{ | ||
{id: ids.ID{0}}, | ||
}, | ||
}, | ||
{ | ||
name: "refresh", | ||
falsePositiveProbability: 0.1, | ||
add: []*testTx{ | ||
{id: ids.ID{0}}, | ||
{id: ids.ID{1}}, | ||
}, | ||
expected: []*testTx{ | ||
{id: ids.ID{1}}, | ||
}, | ||
}, | ||
} | ||
|
||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
require := require.New(t) | ||
b, err := bloomfilter.New(10, 1) | ||
require.NoError(err) | ||
bloom := BloomFilter{ | ||
Bloom: b, | ||
} | ||
|
||
for _, item := range tt.add { | ||
_, err = ResetBloomFilterIfNeeded(&bloom, tt.falsePositiveProbability) | ||
require.NoError(err) | ||
bloom.Add(item) | ||
} | ||
|
||
require.Equal(uint64(len(tt.expected)), bloom.Bloom.N()) | ||
|
||
for _, expected := range tt.expected { | ||
require.True(bloom.Has(expected)) | ||
} | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. | ||
// See the file LICENSE for licensing terms. | ||
|
||
package gossip | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"go.uber.org/zap" | ||
|
||
"google.golang.org/protobuf/proto" | ||
|
||
"github.com/ava-labs/avalanchego/ids" | ||
"github.com/ava-labs/avalanchego/network/p2p" | ||
"github.com/ava-labs/avalanchego/proto/pb/sdk" | ||
"github.com/ava-labs/avalanchego/utils/logging" | ||
) | ||
|
||
// GossipableAny exists to help create non-nil pointers to a concrete Gossipable | ||
// ref: https://stackoverflow.com/questions/69573113/how-can-i-instantiate-a-non-nil-pointer-of-type-argument-with-generic-go | ||
type GossipableAny[T any] interface { | ||
*T | ||
Gossipable | ||
} | ||
|
||
type Config struct { | ||
Frequency time.Duration | ||
PollSize int | ||
} | ||
|
||
func NewGossiper[T any, U GossipableAny[T]]( | ||
config Config, | ||
log logging.Logger, | ||
set Set[U], | ||
client *p2p.Client, | ||
) *Gossiper[T, U] { | ||
return &Gossiper[T, U]{ | ||
config: config, | ||
log: log, | ||
set: set, | ||
client: client, | ||
} | ||
} | ||
|
||
type Gossiper[T any, U GossipableAny[T]] struct { | ||
config Config | ||
log logging.Logger | ||
set Set[U] | ||
client *p2p.Client | ||
} | ||
|
||
func (g *Gossiper[_, _]) Gossip(ctx context.Context) { | ||
gossipTicker := time.NewTicker(g.config.Frequency) | ||
defer gossipTicker.Stop() | ||
|
||
for { | ||
select { | ||
case <-gossipTicker.C: | ||
if err := g.gossip(ctx); err != nil { | ||
g.log.Warn("failed to gossip", zap.Error(err)) | ||
} | ||
case <-ctx.Done(): | ||
g.log.Debug("shutting down gossip") | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (g *Gossiper[_, _]) gossip(ctx context.Context) error { | ||
bloom, salt, err := g.set.GetFilter() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
request := &sdk.PullGossipRequest{ | ||
Filter: bloom, | ||
Salt: salt, | ||
} | ||
msgBytes, err := proto.Marshal(request) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
for i := 0; i < g.config.PollSize; i++ { | ||
if err := g.client.AppRequestAny(ctx, msgBytes, g.handleResponse); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (g *Gossiper[T, U]) handleResponse( | ||
nodeID ids.NodeID, | ||
responseBytes []byte, | ||
err error, | ||
) { | ||
if err != nil { | ||
g.log.Debug( | ||
"failed gossip request", | ||
zap.Stringer("nodeID", nodeID), | ||
zap.Error(err), | ||
) | ||
return | ||
} | ||
|
||
response := &sdk.PullGossipResponse{} | ||
if err := proto.Unmarshal(responseBytes, response); err != nil { | ||
g.log.Debug("failed to unmarshal gossip response", zap.Error(err)) | ||
return | ||
} | ||
|
||
for _, bytes := range response.Gossip { | ||
gossipable := U(new(T)) | ||
if err := gossipable.Unmarshal(bytes); err != nil { | ||
g.log.Debug( | ||
"failed to unmarshal gossip", | ||
zap.Stringer("nodeID", nodeID), | ||
zap.Error(err), | ||
) | ||
continue | ||
} | ||
|
||
hash := gossipable.GetID() | ||
g.log.Debug( | ||
"received gossip", | ||
zap.Stringer("nodeID", nodeID), | ||
zap.Stringer("id", hash), | ||
) | ||
if err := g.set.Add(gossipable); err != nil { | ||
g.log.Debug( | ||
"failed to add gossip to the known set", | ||
zap.Stringer("nodeID", nodeID), | ||
zap.Stringer("id", hash), | ||
zap.Error(err), | ||
) | ||
continue | ||
} | ||
} | ||
} |
Oops, something went wrong.