Skip to content

Commit

Permalink
fix: (discovery) incorrect domain type check and stale ENR on fork (#…
Browse files Browse the repository at this point in the history
…1656)

* fix: (discovery) incorrect domain type check and stale ENR on fork

* fixes

* changes

* domain type

* fix tests

* fix tests

* revert domain type

* fixed tests err msg
  • Loading branch information
moshe-blox authored Aug 27, 2024
1 parent 219b99a commit 44b5fd3
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 42 deletions.
6 changes: 5 additions & 1 deletion network/discovery/dv5_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ func (dvs *DiscV5Service) Advertise(ctx context.Context, ns string, opt ...disco
return opts.Ttl, nil
}

if err := dvs.RegisterSubnets(logger, subnet); err != nil {
updated, err := dvs.RegisterSubnets(logger, subnet)
if err != nil {
return 0, err
}
if updated {
go dvs.PublishENR(logger)
}

return opts.Ttl, nil
}
Expand Down
61 changes: 39 additions & 22 deletions network/discovery/dv5_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

var (
defaultDiscoveryInterval = time.Second
defaultDiscoveryInterval = time.Millisecond * 100
publishENRTimeout = time.Minute

publishStateReady = int32(0)
Expand Down Expand Up @@ -152,9 +152,14 @@ func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error {
if err != nil {
return errors.Wrap(err, "could not read domain type")
}
if nodeDomainType != dvs.domainType.DomainType() &&
nodeDomainType != dvs.domainType.NextDomainType() {
return fmt.Errorf("mismatched domain type: %x", nodeDomainType)
nodeNextDomainType, err := records.GetDomainTypeEntry(e.Node.Record(), records.KeyNextDomainType)
if err != nil && !errors.Is(err, records.ErrEntryNotFound) {
return errors.Wrap(err, "could not read domain type")
}
if dvs.domainType.DomainType() != nodeDomainType &&
dvs.domainType.DomainType() != nodeNextDomainType {
return fmt.Errorf("mismatched domain type: neither %x nor %x match %x",
nodeDomainType, nodeNextDomainType, dvs.domainType.DomainType())
}

// Get the peer's subnets, skipping if it has none.
Expand Down Expand Up @@ -258,50 +263,62 @@ func (dvs *DiscV5Service) discover(ctx context.Context, handler HandleNewPeer, i
}

// RegisterSubnets adds the given subnets and publish the updated node record
func (dvs *DiscV5Service) RegisterSubnets(logger *zap.Logger, subnets ...int) error {
func (dvs *DiscV5Service) RegisterSubnets(logger *zap.Logger, subnets ...int) (updated bool, err error) {
if len(subnets) == 0 {
return nil
return false, nil
}
updated, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), subnets, nil)
updatedSubnets, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), subnets, nil)
if err != nil {
return errors.Wrap(err, "could not update ENR")
return false, errors.Wrap(err, "could not update ENR")
}
if updated != nil {
dvs.subnets = updated
if updatedSubnets != nil {
dvs.subnets = updatedSubnets
logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode()))
go dvs.publishENR(logger)
return true, nil
}
return nil
return false, nil
}

// DeregisterSubnets removes the given subnets and publish the updated node record
func (dvs *DiscV5Service) DeregisterSubnets(logger *zap.Logger, subnets ...int) error {
func (dvs *DiscV5Service) DeregisterSubnets(logger *zap.Logger, subnets ...int) (updated bool, err error) {
logger = logger.Named(logging.NameDiscoveryService)

if len(subnets) == 0 {
return nil
return false, nil
}
updated, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), nil, subnets)
updatedSubnets, err := records.UpdateSubnets(dvs.dv5Listener.LocalNode(), commons.Subnets(), nil, subnets)
if err != nil {
return errors.Wrap(err, "could not update ENR")
return false, errors.Wrap(err, "could not update ENR")
}
if updated != nil {
dvs.subnets = updated
if updatedSubnets != nil {
dvs.subnets = updatedSubnets
logger.Debug("updated subnets", fields.UpdatedENRLocalNode(dvs.dv5Listener.LocalNode()))
go dvs.publishENR(logger)
return true, nil
}
return nil
return false, nil
}

// publishENR publishes the new ENR across the network
func (dvs *DiscV5Service) publishENR(logger *zap.Logger) {
// PublishENR publishes the ENR with the current domain type across the network
func (dvs *DiscV5Service) PublishENR(logger *zap.Logger) {
ctx, done := context.WithTimeout(dvs.ctx, publishENRTimeout)
defer done()
if !atomic.CompareAndSwapInt32(&dvs.publishState, publishStateReady, publishStatePending) {
// pending
logger.Debug("pending publish ENR")
return
}

err := records.SetDomainTypeEntry(dvs.dv5Listener.LocalNode(), records.KeyDomainType, dvs.domainType.DomainType())
if err != nil {
logger.Error("could not set domain type", zap.Error(err))
return
}
err = records.SetDomainTypeEntry(dvs.dv5Listener.LocalNode(), records.KeyNextDomainType, dvs.domainType.NextDomainType())
if err != nil {
logger.Error("could not set next domain type", zap.Error(err))
return
}

defer atomic.StoreInt32(&dvs.publishState, publishStateReady)
dvs.discover(ctx, func(e PeerEvent) {
metricPublishEnrPings.Inc()
Expand Down
49 changes: 41 additions & 8 deletions network/discovery/dv5_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (td *TestDomainTypeProvider) DomainType() spectypes.DomainType {
}

func (td *TestDomainTypeProvider) NextDomainType() spectypes.DomainType {
return spectypes.DomainType{0x1, 0x2, 0x3, 0x4}
return spectypes.DomainType{0x1, 0x2, 0x3, 0x5}
}

func (td *TestDomainTypeProvider) DomainTypeAtEpoch(epoch phase0.Epoch) spectypes.DomainType {
Expand All @@ -55,10 +55,38 @@ func TestCheckPeer(t *testing.T) {
expectedError: errors.New("could not read domain type: not found"),
},
{
name: "domain type mismatch",
name: "missing domain type but has next domain type",
domainType: nil,
nextDomainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x5},
subnets: mySubnets,
expectedError: errors.New("could not read domain type: not found"),
},
{
name: "domain type mismatch",
domainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x5},
nextDomainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x6},
subnets: mySubnets,
expectedError: errors.New("mismatched domain type: neither 01020305 nor 01020306 match 01020304"),
},
{
name: "domain type mismatch (missing next domain type)",
domainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x5},
subnets: mySubnets,
expectedError: errors.New("mismatched domain type: 01020305"),
expectedError: errors.New("mismatched domain type: neither 01020305 nor 00000000 match 01020304"),
},
{
name: "only next domain type matches",
domainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x3},
nextDomainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x4},
subnets: mySubnets,
expectedError: nil,
},
{
name: "both domain types match",
domainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x4},
nextDomainType: &spectypes.DomainType{0x1, 0x2, 0x3, 0x4},
subnets: mySubnets,
expectedError: nil,
},
{
name: "missing subnets",
Expand Down Expand Up @@ -112,6 +140,10 @@ func TestCheckPeer(t *testing.T) {
err := records.SetDomainTypeEntry(localNode, records.KeyDomainType, *test.domainType)
require.NoError(t, err)
}
if test.nextDomainType != nil {
err := records.SetDomainTypeEntry(localNode, records.KeyNextDomainType, *test.nextDomainType)
require.NoError(t, err)
}
if test.subnets != nil {
err := records.SetSubnetsEntry(localNode, test.subnets)
require.NoError(t, err)
Expand Down Expand Up @@ -147,11 +179,12 @@ func TestCheckPeer(t *testing.T) {
}

type checkPeerTest struct {
name string
domainType *spectypes.DomainType
subnets []byte
localNode *enode.LocalNode
expectedError error
name string
domainType *spectypes.DomainType
nextDomainType *spectypes.DomainType
subnets []byte
localNode *enode.LocalNode
expectedError error
}

func mockSubnets(active ...int) []byte {
Expand Down
12 changes: 8 additions & 4 deletions network/discovery/local_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,19 @@ func (md *localDiscovery) FindPeers(ctx context.Context, ns string, opt ...disco
}

// RegisterSubnets implements Service
func (md *localDiscovery) RegisterSubnets(logger *zap.Logger, subnets ...int) error {
func (md *localDiscovery) RegisterSubnets(logger *zap.Logger, subnets ...int) (updated bool, err error) {
// TODO
return nil
return false, nil
}

// DeregisterSubnets implements Service
func (md *localDiscovery) DeregisterSubnets(logger *zap.Logger, subnets ...int) error {
func (md *localDiscovery) DeregisterSubnets(logger *zap.Logger, subnets ...int) (updated bool, err error) {
// TODO
return false, nil
}

func (md *localDiscovery) PublishENR(logger *zap.Logger) {
// TODO
return nil
}

// discoveryNotifee gets notified when we find a new peer via mDNS discovery
Expand Down
5 changes: 3 additions & 2 deletions network/discovery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ type Options struct {
type Service interface {
discovery.Discovery
io.Closer
RegisterSubnets(logger *zap.Logger, subnets ...int) error
DeregisterSubnets(logger *zap.Logger, subnets ...int) error
RegisterSubnets(logger *zap.Logger, subnets ...int) (updated bool, err error)
DeregisterSubnets(logger *zap.Logger, subnets ...int) (updated bool, err error)
Bootstrap(logger *zap.Logger, handler HandleNewPeer) error
PublishENR(logger *zap.Logger)
}

// NewService creates new discovery.Service
Expand Down
14 changes: 11 additions & 3 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"context"
"errors"
"fmt"
ma "github.com/multiformats/go-multiaddr"
"strings"
"sync/atomic"
"time"

ma "github.com/multiformats/go-multiaddr"

"github.com/cornelk/hashmap"
"github.com/libp2p/go-libp2p/core/connmgr"
connmgrcore "github.com/libp2p/go-libp2p/core/connmgr"
Expand Down Expand Up @@ -380,16 +381,20 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) {
return self
})

// Register/unregister subnets for discovery.
var errs error
var hasAdded, hasRemoved bool
if len(addedSubnets) > 0 {
err := n.disc.RegisterSubnets(logger.Named(logging.NameDiscoveryService), addedSubnets...)
var err error
hasAdded, err = n.disc.RegisterSubnets(logger.Named(logging.NameDiscoveryService), addedSubnets...)
if err != nil {
logger.Debug("could not register subnets", zap.Error(err))
errs = errors.Join(errs, err)
}
}
if len(removedSubnets) > 0 {
err := n.disc.DeregisterSubnets(logger.Named(logging.NameDiscoveryService), removedSubnets...)
var err error
hasRemoved, err = n.disc.DeregisterSubnets(logger.Named(logging.NameDiscoveryService), removedSubnets...)
if err != nil {
logger.Debug("could not unregister subnets", zap.Error(err))
errs = errors.Join(errs, err)
Expand All @@ -405,6 +410,9 @@ func (n *p2pNetwork) UpdateSubnets(logger *zap.Logger) {
}
}
}
if hasAdded || hasRemoved {
go n.disc.PublishENR(logger.Named(logging.NameDiscoveryService))
}

allSubs, _ := records.Subnets{}.FromString(records.AllSubnets)
subnetsList := records.SharedSubnets(allSubs, n.activeSubnets, 0)
Expand Down
4 changes: 3 additions & 1 deletion network/p2p/p2p_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ func (n *p2pNetwork) setupDiscovery(logger *zap.Logger) error {
discV5Opts.Subnets = n.fixedSubnets
logger = logger.With(zap.String("subnets", records.Subnets(n.fixedSubnets).String()))
}
logger.Info("discovery: using discv5", zap.Strings("bootnodes", discV5Opts.Bootnodes))
logger.Info("discovery: using discv5",
zap.Strings("bootnodes", discV5Opts.Bootnodes),
zap.String("ip", discV5Opts.IP))
} else {
logger.Info("discovery: using mdns (local)")
}
Expand Down
6 changes: 5 additions & 1 deletion networkconfig/holesky-stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ var HoleskyStage = NetworkConfig{
RegistryContractAddr: "0x0d33801785340072C452b994496B19f196b7eE15",
AlanForkEpoch: 99999999,
Bootnodes: []string{
"enr:-Ja4QDYHVgUs9NvlMqq93ot6VNqbmrIlMrwKnq4X3DPRgyUNB4ospDp8ubMvsf-KsgqY8rzpZKy4GbE1DLphabpRBc-GAY_diLjngmlkgnY0gmlwhDQrLYqJc2VjcDI1NmsxoQKnAiuSlgSR8asjCH0aYoVKM8uPbi4noFuFHZHaAHqknYNzc3YBg3RjcIITiYN1ZHCCD6E",
// Public bootnode:
// "enr:-Ja4QDYHVgUs9NvlMqq93ot6VNqbmrIlMrwKnq4X3DPRgyUNB4ospDp8ubMvsf-KsgqY8rzpZKy4GbE1DLphabpRBc-GAY_diLjngmlkgnY0gmlwhDQrLYqJc2VjcDI1NmsxoQKnAiuSlgSR8asjCH0aYoVKM8uPbi4noFuFHZHaAHqknYNzc3YBg3RjcIITiYN1ZHCCD6E",

// Private bootnode:
"enr:-Ja4QEF4O52Pl9pxfF1y_gBzmtp9s2-ncWoN2-VPebEvjibaGJH7VpYGZKdCVws_gIAkjjatn67rXhGuItCfcp2kv4SGAZGOmChHgmlkgnY0gmlwhAoqckWJc2VjcDI1NmsxoQP_bBE-ZYvaXKBR3dRYMN5K_lZP-q-YsBzDZEtxH_4T_YNzc3YBg3RjcIITioN1ZHCCD6I",
},
}

0 comments on commit 44b5fd3

Please sign in to comment.