Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add active peer probing and a cached addr book #90

Merged
merged 80 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 62 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
3896470
feat: add cached peer book with higher ttls
2color Nov 27, 2024
7dd33ca
feat: initial implementation of active peer probing
2color Nov 27, 2024
0e86ea4
feat: use the cached router
2color Nov 27, 2024
ec2a67a
chore: go mod tidy
2color Nov 27, 2024
fe68140
feat: log probe duration
2color Nov 27, 2024
06c2d0c
chore: log in probe loop
2color Nov 27, 2024
fc76783
fix: update peer state if doesn't exist
2color Nov 27, 2024
e904c3e
fix: add addresses to cached address book
2color Nov 27, 2024
814ae58
fix: wrap with cached router only if available
2color Nov 27, 2024
a4d6456
feat: make everything a little bit better
2color Nov 27, 2024
81feca7
chore: small refinements
2color Nov 28, 2024
e75992f
test: add test for cached addr book
2color Nov 28, 2024
a20a4c3
chore: rename files
2color Nov 28, 2024
c5f1d62
feat: add options to cached addr book
2color Nov 28, 2024
e678be8
feat: add instrumentation
2color Nov 28, 2024
a0965bc
fix: thread safety
2color Nov 28, 2024
d82ad0f
docs: update changelog
2color Nov 28, 2024
a84d5f6
fix: small fixes
2color Nov 28, 2024
9ab02e1
fix: simplify cached router
2color Nov 28, 2024
9658af8
feat(metric): cached_router_peer_addr_lookups
lidel Nov 28, 2024
7cdb5be
Apply suggestions from code review
2color Nov 29, 2024
762136e
Update CHANGELOG.md
2color Nov 29, 2024
2cf46d4
chore: use service name for namespace
2color Nov 29, 2024
a0d5c62
fix: type errors and missing imports
2color Nov 29, 2024
75f1bf2
feat: add queue probe
2color Nov 29, 2024
4cbaa91
Revert "feat: add queue probe"
2color Nov 29, 2024
d038301
chore: simplify composite literal
2color Dec 2, 2024
796e94f
fix: implement custom cache fallback iterator
2color Dec 3, 2024
2e4d12c
fix: add cancel and simplify
2color Dec 3, 2024
811dce8
fix: move select to Val function
2color Dec 3, 2024
b4da9cd
fix: concurrency bug from the ongoingLookups
2color Dec 4, 2024
d00fcb4
chore: clean up comments
2color Dec 4, 2024
6219804
fix: add lint ignores
2color Dec 4, 2024
662f0d4
docs: update changelog
2color Dec 4, 2024
c812cf4
fix: increase bucket sizes for probe duration
2color Dec 4, 2024
8646f38
chore: remove unused peer state fields
2color Dec 4, 2024
46a74a3
feat: enable caching for FindPeer in cached router
2color Dec 4, 2024
d9601e4
fix: handle peer not found case
2color Dec 4, 2024
986b010
Apply suggestions from code review
2color Dec 5, 2024
ecd0757
fix: wait longer during cleanup function
2color Dec 5, 2024
a0443d0
test: remove bitswap record test
2color Dec 5, 2024
22aacd7
refactor: extract connectedness checks to a func
2color Dec 5, 2024
fe372ac
fix: set ttl for both signed and unsigned addrs
2color Dec 5, 2024
03a4078
fix: prevent race condition
2color Dec 5, 2024
84393fd
feat: use 2q-lru cache for peer state
2color Dec 5, 2024
d466dc7
chore: remove return count
2color Dec 5, 2024
8078cb5
test: improve reliability of tests
2color Dec 5, 2024
7decf6c
fix: record failed connections
2color Dec 5, 2024
b536e82
feat: add exponential backoff for probes/peer lookups
2color Dec 5, 2024
7182699
fix: return peers with no addrs that wont probe
2color Dec 5, 2024
b0b24e0
fix: brittle test
2color Dec 6, 2024
697457d
feat: add probed peers counter
2color Dec 6, 2024
7fcf45f
fix: adjust probe duration metric buckets
2color Dec 6, 2024
1718215
fix: prevent race conditions
2color Dec 6, 2024
dc57e9f
feat: increase cache size and add max backoff
2color Dec 6, 2024
c5abeec
fix: omit providers whose peer cannot be found
2color Dec 9, 2024
0cc76f9
chore: remove unused function
2color Dec 10, 2024
f0e0bd4
deps: upgrade go-libp2p
2color Dec 10, 2024
2211aae
fix: avoid using the cache in FindPeers
2color Dec 10, 2024
be5958a
fix: do not return cached results for FindPeers
2color Dec 11, 2024
af7c3a8
refactor: small optimisation
2color Dec 11, 2024
62c0d9f
chore: re-add comment
2color Dec 11, 2024
8b36b0c
Apply suggestions from code review
2color Dec 16, 2024
b58b50d
Apply suggestions from code review
2color Dec 16, 2024
41922af
fix: use separate context for dispatched jobs
2color Dec 16, 2024
06cef21
fix: ensure proper cleanup of cache fallback iter
2color Dec 16, 2024
7a2160a
Update main.go
2color Dec 16, 2024
84bc4f7
fix: formatting
2color Dec 16, 2024
0c28c6b
fix: let consumer handle cleanup
2color Dec 16, 2024
e0a601f
fix: remove from address book when removed from peer state
2color Dec 17, 2024
7f0ec50
fix: use normal lru cache instead of 2Q
2color Dec 17, 2024
2e025eb
fix: update the metric when removing from the peer cache
2color Dec 17, 2024
6b4b40d
fix: increase max backoff to 48 hours
2color Dec 17, 2024
fe7ad54
feat: add env var for recently connected ttl
2color Dec 17, 2024
49efe9b
feat: add env var to control active probing
2color Dec 17, 2024
8ca4d19
fix: bug from closing the iterator twice
2color Dec 17, 2024
317ccb7
docs: update comment
2color Dec 17, 2024
327f9cb
docs: improve changelog
2color Dec 17, 2024
48e1943
test: fix background test
2color Dec 17, 2024
c1ac41b
feat(metrics): track online vs offline probe ratio
lidel Dec 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ The following emojis are used to highlight certain changes:

### Added

2color marked this conversation as resolved.
Show resolved Hide resolved
- Default caching of peer addresses for 48h to match [provider record expiration on Amino DHT](https://github.com/libp2p/go-libp2p-kad-dht/blob/v0.28.1/amino/defaults.go#L40-L43). Someguy will return cached addresses for peers without multiaddrs in `FindProviders` if there are no addresses for a provider. This can be enabled via `SOMEGUY_CACHED_ADDR_BOOK=true|false` (enabled by default)
- Added a new `cachedAddrBook` implementation that caches peer addresses by subscribing to Identify events and probes those peers in the background.
- Added a new `cachedRouter` that uses `cachedAddrBook` to retrieve cached addresses for peers without multiaddrs. If a Peer is encountered with no cached addresses, `FindPeer` is dispatched in the background.
2color marked this conversation as resolved.
Show resolved Hide resolved

### Changed

### Removed
Expand Down
311 changes: 311 additions & 0 deletions cached_addr_book.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
package main

import (
"context"
"io"
"sync"
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/ipfs/boxo/routing/http/types"
"github.com/libp2p/go-libp2p-kad-dht/amino"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

const (
Subsystem = "cached_addr_book"
// The TTL to keep recently connected peers for. Same as [amino.DefaultProvideValidity] in go-libp2p-kad-dht
RecentlyConnectedAddrTTL = amino.DefaultProvideValidity

// Connected peers don't expire until they disconnect
ConnectedAddrTTL = peerstore.ConnectedAddrTTL

// How long to wait since last connection before probing a peer again
PeerProbeThreshold = time.Hour
lidel marked this conversation as resolved.
Show resolved Hide resolved

// How often to run the probe peers loop
ProbeInterval = time.Minute * 15

// How many concurrent probes to run at once
MaxConcurrentProbes = 20

// How long to wait for a connect in a probe to complete.
// The worst case is a peer behind a relay, so we use the relay connect timeout.
ConnectTimeout = relay.ConnectTimeout

// How many peers to cache in the peer state cache
// 1_000_000 is 10x the default number of signed peer records cached by the memory address book.
PeerCacheSize = 1_000_000

// Maximum backoff duration for probing a peer
MaxBackoffDuration = time.Hour * 24
2color marked this conversation as resolved.
Show resolved Hide resolved
)

var (
probeDurationHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "probe_duration_seconds",
Namespace: name,
Subsystem: Subsystem,
Help: "Duration of peer probing operations in seconds",
// Buckets probe durations from 5s to 15 minutes
Buckets: []float64{5, 10, 30, 60, 120, 300, 600, 900},
})

probedPeersCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "probed_peers",
Subsystem: Subsystem,
Namespace: name,
Help: "Number of peers probed",
})

peerStateSize = promauto.NewGauge(prometheus.GaugeOpts{
Name: "peer_state_size",
Subsystem: Subsystem,
Namespace: name,
Help: "Number of peers object currently in the peer state",
})
)

type peerState struct {
lastConnTime time.Time // last time we successfully connected to this peer
lastFailedConnTime time.Time // last time we failed to find or connect to this peer
connectFailures uint // number of times we've failed to connect to this peer
}

type cachedAddrBook struct {
addrBook peerstore.AddrBook // memory address book
peerCache *lru.TwoQueueCache[peer.ID, peerState] // LRU cache with additional metadata about peer
isProbing atomic.Bool
allowPrivateIPs bool // for testing
}

type AddrBookOption func(*cachedAddrBook) error

func WithAllowPrivateIPs() AddrBookOption {
return func(cab *cachedAddrBook) error {
cab.allowPrivateIPs = true
return nil
}
}

func newCachedAddrBook(opts ...AddrBookOption) (*cachedAddrBook, error) {
peerCache, err := lru.New2Q[peer.ID, peerState](PeerCacheSize)
if err != nil {
return nil, err
}

Check warning on line 106 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L105-L106

Added lines #L105 - L106 were not covered by tests

cab := &cachedAddrBook{
peerCache: peerCache,
addrBook: pstoremem.NewAddrBook(),
}

for _, opt := range opts {
err := opt(cab)
if err != nil {
return nil, err
}

Check warning on line 117 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L116-L117

Added lines #L116 - L117 were not covered by tests
}
return cab, nil
}

func (cab *cachedAddrBook) background(ctx context.Context, host host.Host) {
sub, err := host.EventBus().Subscribe([]interface{}{
&event.EvtPeerIdentificationCompleted{},
&event.EvtPeerConnectednessChanged{},
})
if err != nil {
logger.Errorf("failed to subscribe to peer identification events: %v", err)
return
}
defer sub.Close()

probeTicker := time.NewTicker(ProbeInterval)
defer probeTicker.Stop()

for {
select {
case <-ctx.Done():
cabCloser, ok := cab.addrBook.(io.Closer)
if ok {
errClose := cabCloser.Close()
if errClose != nil {
logger.Warnf("failed to close addr book: %v", errClose)
}

Check warning on line 144 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L122-L144

Added lines #L122 - L144 were not covered by tests
}
return
case ev := <-sub.Out():
switch ev := ev.(type) {
case event.EvtPeerIdentificationCompleted:
pState, exists := cab.peerCache.Get(ev.Peer)
2color marked this conversation as resolved.
Show resolved Hide resolved
if !exists {
pState = peerState{}
}
pState.lastConnTime = time.Now()
pState.lastFailedConnTime = time.Time{} // reset failed connection time
pState.connectFailures = 0 // reset connect failures on successful connection
cab.peerCache.Add(ev.Peer, pState)
peerStateSize.Set(float64(cab.peerCache.Len())) // update metric

ttl := getTTL(host.Network().Connectedness(ev.Peer))
if ev.SignedPeerRecord != nil {
logger.Debug("Caching signed peer record")
cab, ok := peerstore.GetCertifiedAddrBook(cab.addrBook)
if ok {
_, err := cab.ConsumePeerRecord(ev.SignedPeerRecord, ttl)
if err != nil {
logger.Warnf("failed to consume signed peer record: %v", err)
}

Check warning on line 168 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L146-L168

Added lines #L146 - L168 were not covered by tests
}
} else {
logger.Debug("No signed peer record, caching listen addresses")
// We don't have a signed peer record, so we use the listen addresses
cab.addrBook.AddAddrs(ev.Peer, ev.ListenAddrs, ttl)
}
case event.EvtPeerConnectednessChanged:
// If the peer is not connected or limited, we update the TTL
if !hasValidConnectedness(ev.Connectedness) {
cab.addrBook.UpdateAddrs(ev.Peer, ConnectedAddrTTL, RecentlyConnectedAddrTTL)
}

Check warning on line 179 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L170-L179

Added lines #L170 - L179 were not covered by tests
}
case <-probeTicker.C:
if cab.isProbing.Load() {
logger.Debug("Skipping peer probe, still running")
continue

Check warning on line 184 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L181-L184

Added lines #L181 - L184 were not covered by tests
}
logger.Debug("Starting to probe peers")
cab.isProbing.Store(true)
go cab.probePeers(ctx, host)

Check warning on line 188 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L186-L188

Added lines #L186 - L188 were not covered by tests
}
}
}

// Loops over all peers with addresses and probes them if they haven't been probed recently
func (cab *cachedAddrBook) probePeers(ctx context.Context, host host.Host) {
defer cab.isProbing.Store(false)

start := time.Now()
defer func() {
duration := time.Since(start).Seconds()
probeDurationHistogram.Observe(duration)
logger.Debugf("Finished probing peers in %s", duration)
}()

var wg sync.WaitGroup
// semaphore channel to limit the number of concurrent probes
semaphore := make(chan struct{}, MaxConcurrentProbes)

for i, p := range cab.addrBook.PeersWithAddrs() {
if hasValidConnectedness(host.Network().Connectedness(p)) {
continue // don't probe connected peers

Check warning on line 210 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L210

Added line #L210 was not covered by tests
}

if !cab.ShouldProbePeer(p) {
continue

Check warning on line 214 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L214

Added line #L214 was not covered by tests
}

addrs := cab.addrBook.Addrs(p)

if !cab.allowPrivateIPs {
addrs = ma.FilterAddrs(addrs, manet.IsPublicAddr)
}

Check warning on line 221 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L220-L221

Added lines #L220 - L221 were not covered by tests

if len(addrs) == 0 {
continue // no addresses to probe

Check warning on line 224 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L224

Added line #L224 was not covered by tests
}

wg.Add(1)
semaphore <- struct{}{}
go func() {
defer func() {
<-semaphore // Release semaphore
wg.Done()
}()
probedPeersCounter.Inc()
ctx, cancel := context.WithTimeout(ctx, ConnectTimeout)
defer cancel()
logger.Debugf("Probe %d: PeerID: %s, Addrs: %v", i+1, p, addrs)
// if connect succeeds and identify runs, the background loop will take care of updating the peer state and cache
err := host.Connect(ctx, peer.AddrInfo{
ID: p,
Addrs: addrs,
})
if err != nil {
logger.Debugf("failed to connect to peer %s: %v", p, err)
cab.RecordFailedConnection(p)
}
}()
}
wg.Wait()
}

// Returns the cached addresses for a peer, incrementing the return count
func (cab *cachedAddrBook) GetCachedAddrs(p peer.ID) []types.Multiaddr {
cachedAddrs := cab.addrBook.Addrs(p)

if len(cachedAddrs) == 0 {
return nil
}

result := make([]types.Multiaddr, 0, len(cachedAddrs)) // convert to local Multiaddr type 🙃
for _, addr := range cachedAddrs {
result = append(result, types.Multiaddr{Multiaddr: addr})
}
return result
}

// Update the peer cache with information about a failed connection
// This should be called when a connection attempt to a peer fails
func (cab *cachedAddrBook) RecordFailedConnection(p peer.ID) {
pState, exists := cab.peerCache.Get(p)
if !exists {
pState = peerState{}
}
pState.lastFailedConnTime = time.Now()
pState.connectFailures++
cab.peerCache.Add(p, pState)
}
2color marked this conversation as resolved.
Show resolved Hide resolved

// Returns true if we should probe a peer (either by dialing known addresses or by dispatching a FindPeer)
// based on the last failed connection time and connection failures
func (cab *cachedAddrBook) ShouldProbePeer(p peer.ID) bool {
pState, exists := cab.peerCache.Get(p)
2color marked this conversation as resolved.
Show resolved Hide resolved
if !exists {
return true // default to probing if the peer is not in the cache
}
sukunrt marked this conversation as resolved.
Show resolved Hide resolved

var backoffDuration time.Duration
if pState.connectFailures > 0 {
// Calculate backoff only if we have failures
// this is effectively 2^(connectFailures - 1) * PeerProbeThreshold
// A single failure results in a 1 hour backoff and each additional failure doubles the backoff up to 24 hours
backoffDuration = PeerProbeThreshold * time.Duration(1<<(pState.connectFailures-1))
backoffDuration = min(backoffDuration, MaxBackoffDuration) // clamp to max backoff duration
} else {
backoffDuration = PeerProbeThreshold
}

// Only dispatch if we've waited long enough based on the backoff
return time.Since(pState.lastFailedConnTime) > backoffDuration
}

func hasValidConnectedness(connectedness network.Connectedness) bool {
return connectedness == network.Connected || connectedness == network.Limited
}

func getTTL(connectedness network.Connectedness) time.Duration {
if hasValidConnectedness(connectedness) {
return ConnectedAddrTTL
}
return RecentlyConnectedAddrTTL

Check warning on line 310 in cached_addr_book.go

View check run for this annotation

Codecov / codecov/patch

cached_addr_book.go#L306-L310

Added lines #L306 - L310 were not covered by tests
}
Loading
Loading