From 96ba84eedbf6c1eb85de3a1caa5af03b1531e4c5 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 9 Aug 2024 11:21:05 -0400 Subject: [PATCH 01/30] update utils.ResolveHostIP --- internal/utils/net.go | 25 ++++++------------------- 1 file changed, 6 insertions(+), 19 deletions(-) diff --git a/internal/utils/net.go b/internal/utils/net.go index 0b4706609..8f929e1b6 100644 --- a/internal/utils/net.go +++ b/internal/utils/net.go @@ -36,7 +36,7 @@ func init() { } } -func ResolveHostIP(ctx context.Context, hostIP string) (subnets []string, private bool, _ error) { +func ResolveHostIP(ctx context.Context, hostIP string) (ips []string, private bool, _ error) { // resolve host address host, _, err := net.SplitHostPort(hostIP) if err != nil { @@ -52,30 +52,17 @@ func ResolveHostIP(ctx context.Context, hostIP string) (subnets []string, privat return nil, false, fmt.Errorf("%w: %+v", ErrHostTooManyAddresses, addrs) } - // parse out subnets + // get ips for _, address := range addrs { private = private || isPrivateIP(address.IP) - // figure out the IP range - ipRange := ipv6FilterRange - if address.IP.To4() != nil { - ipRange = ipv4FilterRange - } - - // parse the subnet - cidr := fmt.Sprintf("%s/%d", address.String(), ipRange) - _, ipnet, err := net.ParseCIDR(cidr) - if err != nil { - continue - } - // add it - subnets = append(subnets, ipnet.String()) + ips = append(ips, address.IP.String()) } - // sort the subnets - sort.Slice(subnets, func(i, j int) bool { - return subnets[i] < subnets[j] + // sort the ips + sort.Slice(ips, func(i, j int) bool { + return ips[i] < ips[j] }) return } From 8a9c4a62f1d4a5e95c373836173db8e783ae7eaa Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 9 Aug 2024 11:31:31 -0400 Subject: [PATCH 02/30] rename Subnets -> ResolvedAddresses --- api/host.go | 36 +++++++++++++++--------------- autopilot/contractor/contractor.go | 10 ++++----- autopilot/contractor/hostset.go | 31 +++++++++++-------------- internal/test/host.go | 10 ++++----- stores/hostdb_test.go | 30 ++++++++++++------------- stores/sql/main.go | 10 ++++----- worker/worker.go | 6 ++--- 7 files changed, 64 insertions(+), 69 deletions(-) diff --git a/api/host.go b/api/host.go index 0dfe6da81..b1a59dca9 100644 --- a/api/host.go +++ b/api/host.go @@ -148,18 +148,18 @@ func (opts HostsForScanningOptions) Apply(values url.Values) { type ( Host struct { - KnownSince time.Time `json:"knownSince"` - LastAnnouncement time.Time `json:"lastAnnouncement"` - PublicKey types.PublicKey `json:"publicKey"` - NetAddress string `json:"netAddress"` - PriceTable HostPriceTable `json:"priceTable"` - Settings rhpv2.HostSettings `json:"settings"` - Interactions HostInteractions `json:"interactions"` - Scanned bool `json:"scanned"` - Blocked bool `json:"blocked"` - Checks map[string]HostCheck `json:"checks"` - StoredData uint64 `json:"storedData"` - Subnets []string `json:"subnets"` + KnownSince time.Time `json:"knownSince"` + LastAnnouncement time.Time `json:"lastAnnouncement"` + PublicKey types.PublicKey `json:"publicKey"` + NetAddress string `json:"netAddress"` + PriceTable HostPriceTable `json:"priceTable"` + Settings rhpv2.HostSettings `json:"settings"` + Interactions HostInteractions `json:"interactions"` + Scanned bool `json:"scanned"` + Blocked bool `json:"blocked"` + Checks map[string]HostCheck `json:"checks"` + StoredData uint64 `json:"storedData"` + ResolvedAddresses []string `json:"ResolvedAddresses"` } HostAddress struct { @@ -181,12 +181,12 @@ type ( } HostScan struct { - HostKey types.PublicKey `json:"hostKey"` - PriceTable rhpv3.HostPriceTable - Settings rhpv2.HostSettings - Subnets []string - Success bool - Timestamp time.Time + HostKey types.PublicKey `json:"hostKey"` + PriceTable rhpv3.HostPriceTable + Settings rhpv2.HostSettings + ResolvedAddresses []string + Success bool + Timestamp time.Time } HostPriceTable struct { diff --git a/autopilot/contractor/contractor.go b/autopilot/contractor/contractor.go index 837b9d20a..9440137dd 100644 --- a/autopilot/contractor/contractor.go +++ b/autopilot/contractor/contractor.go @@ -980,7 +980,7 @@ func performContractChecks(ctx *mCtx, alerter alerts.Alerter, bus Bus, w Worker, } // extend logger - logger = logger.With("subnets", host.Subnets). + logger = logger.With("addresses", host.ResolvedAddresses). With("blocked", host.Blocked) // check if host is blocked @@ -1205,7 +1205,7 @@ func performContractFormations(ctx *mCtx, bus Bus, w Worker, cr contractReviser, // prepare a gouging checker logger := logger.With("hostKey", candidate.host.PublicKey). With("remainingBudget", remainingFunds). - With("subnets", candidate.host.Subnets) + With("addresses", candidate.host.ResolvedAddresses) // perform gouging checks on the fly to ensure the host is not gouging its prices if breakdown := gc.Check(nil, &candidate.host.PriceTable.HostPriceTable); breakdown.Gouging() { @@ -1213,7 +1213,7 @@ func performContractFormations(ctx *mCtx, bus Bus, w Worker, cr contractReviser, continue } - // check if we already have a contract with a host on that subnet + // check if we already have a contract with a host on that address if ctx.ShouldFilterRedundantIPs() && ipFilter.HasRedundantIP(candidate.host) { logger.Info("host has redundant IP") continue @@ -1357,8 +1357,8 @@ func performContractMaintenance(ctx *mCtx, alerter alerts.Alerter, bus Bus, chur // STEP 2: perform contract maintenance ipFilter := &hostSet{ - logger: logger.Named("ipFilter"), - subnetToHostKey: make(map[string]string), + logger: logger.Named("ipFilter"), + addressToHostKey: make(map[string]string), } keptContracts, churnReasons, err := performContractChecks(ctx, alerter, bus, w, cc, cr, ipFilter, logger, &remaining) if err != nil { diff --git a/autopilot/contractor/hostset.go b/autopilot/contractor/hostset.go index bc3932767..015ac37e8 100644 --- a/autopilot/contractor/hostset.go +++ b/autopilot/contractor/hostset.go @@ -1,43 +1,38 @@ package contractor import ( - "errors" - "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/utils" "go.uber.org/zap" ) -var ( - errHostTooManySubnets = errors.New("host has more than two subnets") -) - type ( hostSet struct { - subnetToHostKey map[string]string + addressToHostKey map[string]string logger *zap.SugaredLogger } ) func (hs *hostSet) HasRedundantIP(host api.Host) bool { - // validate host subnets - if len(host.Subnets) == 0 { - hs.logger.Errorf("host %v has no subnet, treating its IP %v as redundant", host.PublicKey, host.NetAddress) + // validate host addresses + if len(host.ResolvedAddresses) == 0 { + hs.logger.Errorf("host %v has no address, treating its IP %v as redundant", host.PublicKey, host.NetAddress) return true - } else if len(host.Subnets) > 2 { - hs.logger.Errorf("host %v has more than 2 subnets, treating its IP %v as redundant", host.PublicKey, errHostTooManySubnets) + } else if len(host.ResolvedAddresses) > 2 { + hs.logger.Errorf("host %v has more than 2 addresses, treating its IP %v as redundant", host.PublicKey, utils.ErrHostTooManyAddresses) return true } - // check if we know about this subnet + // check if we know about this address var knownHost string - for _, subnet := range host.Subnets { - if knownHost = hs.subnetToHostKey[subnet]; knownHost != "" { + for _, address := range host.ResolvedAddresses { + if knownHost = hs.addressToHostKey[address]; knownHost != "" { break } } - // if we know about the subnet, the host is redundant if it's not the same + // if we know about the address, the host is redundant if it's not the same if knownHost != "" { return host.PublicKey.String() != knownHost } @@ -45,7 +40,7 @@ func (hs *hostSet) HasRedundantIP(host api.Host) bool { } func (hs *hostSet) Add(host api.Host) { - for _, subnet := range host.Subnets { - hs.subnetToHostKey[subnet] = host.PublicKey.String() + for _, subnet := range host.ResolvedAddresses { + hs.addressToHostKey[subnet] = host.PublicKey.String() } } diff --git a/internal/test/host.go b/internal/test/host.go index e95d1d3d1..18301be3a 100644 --- a/internal/test/host.go +++ b/internal/test/host.go @@ -35,11 +35,11 @@ func NewHost(hk types.PublicKey, pt rhpv3.HostPriceTable, settings rhpv2.HostSet SuccessfulInteractions: 2, FailedInteractions: 0, }, - PublicKey: hk, - PriceTable: api.HostPriceTable{HostPriceTable: pt, Expiry: time.Now().Add(time.Minute)}, - Settings: settings, - Scanned: true, - Subnets: []string{"38.135.51.0/24"}, + PublicKey: hk, + PriceTable: api.HostPriceTable{HostPriceTable: pt, Expiry: time.Now().Add(time.Minute)}, + Settings: settings, + Scanned: true, + ResolvedAddresses: []string{"38.135.51.0/24"}, } } diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index c3002e4cd..5f5390e07 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -436,9 +436,9 @@ func TestRecordScan(t *testing.T) { t.Fatal("mismatch") } - // The host shouldn't have any subnets. - if len(host.Subnets) != 0 { - t.Fatal("unexpected", host.Subnets, len(host.Subnets)) + // The host shouldn't have any addresses. + if len(host.ResolvedAddresses) != 0 { + t.Fatal("unexpected", host.ResolvedAddresses, len(host.ResolvedAddresses)) } // Fetch the host directly to get the creation time. @@ -451,12 +451,12 @@ func TestRecordScan(t *testing.T) { // Record a scan. firstScanTime := time.Now().UTC() - subnets := []string{"212.1.96.0/24", "38.135.51.0/24"} + resolvedAddresses := []string{"212.1.96.0/24", "38.135.51.0/24"} settings := rhpv2.HostSettings{NetAddress: "host.com"} pt := rhpv3.HostPriceTable{ HostBlockHeight: 123, } - if err := ss.RecordHostScans(ctx, []api.HostScan{newTestScan(hk, firstScanTime, settings, pt, true, subnets)}); err != nil { + if err := ss.RecordHostScans(ctx, []api.HostScan{newTestScan(hk, firstScanTime, settings, pt, true, resolvedAddresses)}); err != nil { t.Fatal(err) } host, err = ss.Host(ctx, hk) @@ -474,8 +474,8 @@ func TestRecordScan(t *testing.T) { t.Fatal(err) } - // The host should have the subnets. - if !reflect.DeepEqual(host.Subnets, subnets) { + // The host should have the addresses. + if !reflect.DeepEqual(host.ResolvedAddresses, resolvedAddresses) { t.Fatal("mismatch") } @@ -535,7 +535,7 @@ func TestRecordScan(t *testing.T) { } // The host should still have the subnets. - if !reflect.DeepEqual(host.Subnets, subnets) { + if !reflect.DeepEqual(host.ResolvedAddresses, resolvedAddresses) { t.Fatal("mismatch") } @@ -1110,14 +1110,14 @@ func TestSQLHostBlocklistBasic(t *testing.T) { } // newTestScan returns a host interaction with given parameters. -func newTestScan(hk types.PublicKey, scanTime time.Time, settings rhpv2.HostSettings, pt rhpv3.HostPriceTable, success bool, subnets []string) api.HostScan { +func newTestScan(hk types.PublicKey, scanTime time.Time, settings rhpv2.HostSettings, pt rhpv3.HostPriceTable, success bool, resolvedAddresses []string) api.HostScan { return api.HostScan{ - HostKey: hk, - PriceTable: pt, - Settings: settings, - Subnets: subnets, - Success: success, - Timestamp: scanTime, + HostKey: hk, + PriceTable: pt, + Settings: settings, + ResolvedAddresses: resolvedAddresses, + Success: success, + Timestamp: scanTime, } } diff --git a/stores/sql/main.go b/stores/sql/main.go index 8802966be..9e3c65cd5 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -1755,7 +1755,7 @@ func RecordHostScans(ctx context.Context, tx sql.Tx, scans []api.HostScan) error scan.Success, now, now, // price_table_expiry scan.Success, // successful_interactions !scan.Success, // failed_interactions - len(scan.Subnets) > 0, strings.Join(scan.Subnets, ","), + len(scan.ResolvedAddresses) > 0, strings.Join(scan.ResolvedAddresses, ","), PublicKey(scan.HostKey), ) if err != nil { @@ -2121,20 +2121,20 @@ func SearchHosts(ctx context.Context, tx sql.Tx, autopilot, filterMode, usabilit var h api.Host var hostID int64 var pte dsql.NullTime - var subnets string + var resolvedAddresses string err := rows.Scan(&hostID, &h.KnownSince, &h.LastAnnouncement, (*PublicKey)(&h.PublicKey), &h.NetAddress, (*PriceTable)(&h.PriceTable.HostPriceTable), &pte, (*HostSettings)(&h.Settings), &h.Interactions.TotalScans, (*UnixTimeNS)(&h.Interactions.LastScan), &h.Interactions.LastScanSuccess, &h.Interactions.SecondToLastScanSuccess, &h.Interactions.Uptime, &h.Interactions.Downtime, &h.Interactions.SuccessfulInteractions, &h.Interactions.FailedInteractions, &h.Interactions.LostSectors, - &h.Scanned, &subnets, &h.Blocked, + &h.Scanned, &resolvedAddresses, &h.Blocked, ) if err != nil { return nil, fmt.Errorf("failed to scan host: %w", err) } - if subnets != "" { - h.Subnets = strings.Split(subnets, ",") + if resolvedAddresses != "" { + h.ResolvedAddresses = strings.Split(resolvedAddresses, ",") } h.PriceTable.Expiry = pte.Time h.StoredData = storedDataMap[hostID] diff --git a/worker/worker.go b/worker/worker.go index fb4f8c49d..7e3aa1e4f 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1491,9 +1491,9 @@ func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty // Otherwise scans that time out won't be recorded. scanErr := w.bus.RecordHostScans(ctx, []api.HostScan{ { - HostKey: hostKey, - PriceTable: pt, - Subnets: subnets, + HostKey: hostKey, + PriceTable: pt, + ResolvedAddresses: subnets, // NOTE: A scan is considered successful if both fetching the price // table and the settings succeeded. Right now scanning can't fail From ff98bdcd495515adbef190effb572feaf8ba6a10 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 9 Aug 2024 11:53:32 -0400 Subject: [PATCH 03/30] add migration for subnets -> resolved_addresses --- internal/sql/migrations.go | 6 ++++++ .../main/migration_00014_hosts_resolvedaddresses.sql | 3 +++ .../main/migration_00014_hosts_resolvedaddresses.sql | 3 +++ 3 files changed, 12 insertions(+) create mode 100644 stores/sql/mysql/migrations/main/migration_00014_hosts_resolvedaddresses.sql create mode 100644 stores/sql/sqlite/migrations/main/migration_00014_hosts_resolvedaddresses.sql diff --git a/internal/sql/migrations.go b/internal/sql/migrations.go index cfb735de2..c0b567e99 100644 --- a/internal/sql/migrations.go +++ b/internal/sql/migrations.go @@ -193,6 +193,12 @@ var ( return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00013_coreutils_wallet", log) }, }, + { + ID: "00014_hosts_resolvedaddresses", + Migrate: func(tx Tx) error { + return performMigration(ctx, tx, migrationsFs, dbIdentifier, "00014_hosts_resolvedaddresses", log) + }, + }, } } MetricsMigrations = func(ctx context.Context, migrationsFs embed.FS, log *zap.SugaredLogger) []Migration { diff --git a/stores/sql/mysql/migrations/main/migration_00014_hosts_resolvedaddresses.sql b/stores/sql/mysql/migrations/main/migration_00014_hosts_resolvedaddresses.sql new file mode 100644 index 000000000..f67e46e2c --- /dev/null +++ b/stores/sql/mysql/migrations/main/migration_00014_hosts_resolvedaddresses.sql @@ -0,0 +1,3 @@ +ALTER TABLE hosts DROP COLUMN subnets; +ALTER TABLE hosts ADD resolved_addresses varchar(255); + diff --git a/stores/sql/sqlite/migrations/main/migration_00014_hosts_resolvedaddresses.sql b/stores/sql/sqlite/migrations/main/migration_00014_hosts_resolvedaddresses.sql new file mode 100644 index 000000000..f67e46e2c --- /dev/null +++ b/stores/sql/sqlite/migrations/main/migration_00014_hosts_resolvedaddresses.sql @@ -0,0 +1,3 @@ +ALTER TABLE hosts DROP COLUMN subnets; +ALTER TABLE hosts ADD resolved_addresses varchar(255); + From f3596be1532e5269d2df6c0497240360d1d1601d Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 9 Aug 2024 12:21:42 -0400 Subject: [PATCH 04/30] add custom dialer --- worker/dialer.go | 98 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 98 insertions(+) create mode 100644 worker/dialer.go diff --git a/worker/dialer.go b/worker/dialer.go new file mode 100644 index 000000000..c8c1b2ac9 --- /dev/null +++ b/worker/dialer.go @@ -0,0 +1,98 @@ +package worker + +import ( + "context" + "fmt" + "net" + "sync" +) + +// Cache to store resolved IPs +type HostCache struct { + mu sync.RWMutex + cache map[string]string // hostname -> IP address +} + +func NewHostCache() *HostCache { + return &HostCache{ + cache: make(map[string]string), + } +} + +func (hc *HostCache) Get(hostname string) (string, bool) { + hc.mu.RLock() + defer hc.mu.RUnlock() + ip, ok := hc.cache[hostname] + return ip, ok +} + +func (hc *HostCache) Set(hostname, ip string) { + hc.mu.Lock() + defer hc.mu.Unlock() + hc.cache[hostname] = ip +} + +func (hc *HostCache) Clear(hostname string) { + hc.mu.Lock() + defer hc.mu.Unlock() + delete(hc.cache, hostname) +} + +// CustomDialer implements a custom net.Dialer with a fallback mechanism +type CustomDialer struct { + Cache *HostCache + + Bus Bus + Dialer net.Dialer +} + +func NewCustomDialer(bus Bus, dialer net.Dialer) *CustomDialer { + return &CustomDialer{ + Cache: NewHostCache(), + + Bus: bus, + Dialer: dialer, + } +} + +func (d *CustomDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { + host, port, err := net.SplitHostPort(address) + if err != nil { + return nil, err + } + + // Try to resolve IP + ipAddr, err := net.ResolveIPAddr("ip", host) + if err == nil { + // Cache the resolved IP and dial + d.Cache.Set(host, ipAddr.String()) + return d.Dialer.DialContext(ctx, network, net.JoinHostPort(ipAddr.String(), port)) + } + + // If resolution fails, check the cache + if cachedIP, ok := d.Cache.Get(host); ok { + conn, err := d.Dialer.DialContext(ctx, network, net.JoinHostPort(cachedIP, port)) + if err == nil { + return conn, nil + } + // Clear the cache if the cached IP doesn't work + d.Cache.Clear(host) + } + + // Attempt to resolve using the bus + // hostInfo, err := d.Bus.Host(ctx, host) + // if err != nil { + // return nil, err + // } + + // for _, addr := range hostInfo.ResolvedAddresses { + // conn, err := d.Dialer.DialContext(ctx, network, net.JoinHostPort(addr, port)) + // if err == nil { + // // Update cache on successful dial + // d.Cache.Set(host, addr) + // return conn, nil + // } + // } + + return nil, fmt.Errorf("failed to dial %s with all methods", address) +} From b95120a30b1f1ec8d46a169c0cec3720dd924b30 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 9 Aug 2024 12:32:06 -0400 Subject: [PATCH 05/30] add dialer to Worker --- worker/dialer.go | 26 +++++++++++++------------- worker/worker.go | 3 +++ 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/worker/dialer.go b/worker/dialer.go index c8c1b2ac9..a502841bc 100644 --- a/worker/dialer.go +++ b/worker/dialer.go @@ -8,54 +8,54 @@ import ( ) // Cache to store resolved IPs -type HostCache struct { +type hostCache struct { mu sync.RWMutex cache map[string]string // hostname -> IP address } -func NewHostCache() *HostCache { - return &HostCache{ +func NewhostCache() *hostCache { + return &hostCache{ cache: make(map[string]string), } } -func (hc *HostCache) Get(hostname string) (string, bool) { +func (hc *hostCache) Get(hostname string) (string, bool) { hc.mu.RLock() defer hc.mu.RUnlock() ip, ok := hc.cache[hostname] return ip, ok } -func (hc *HostCache) Set(hostname, ip string) { +func (hc *hostCache) Set(hostname, ip string) { hc.mu.Lock() defer hc.mu.Unlock() hc.cache[hostname] = ip } -func (hc *HostCache) Clear(hostname string) { +func (hc *hostCache) Clear(hostname string) { hc.mu.Lock() defer hc.mu.Unlock() delete(hc.cache, hostname) } -// CustomDialer implements a custom net.Dialer with a fallback mechanism -type CustomDialer struct { - Cache *HostCache +// fallbackDialer implements a custom net.Dialer with a fallback mechanism +type fallbackDialer struct { + Cache *hostCache Bus Bus Dialer net.Dialer } -func NewCustomDialer(bus Bus, dialer net.Dialer) *CustomDialer { - return &CustomDialer{ - Cache: NewHostCache(), +func newFallbackDialer(bus Bus, dialer net.Dialer) *fallbackDialer { + return &fallbackDialer{ + Cache: NewhostCache(), Bus: bus, Dialer: dialer, } } -func (d *CustomDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { +func (d *fallbackDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { host, port, err := net.SplitHostPort(address) if err != nil { return nil, err diff --git a/worker/worker.go b/worker/worker.go index 7e3aa1e4f..76438a749 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -8,6 +8,7 @@ import ( "io" "math" "math/big" + "net" "net/http" "os" "runtime" @@ -214,6 +215,7 @@ type worker struct { uploadManager *uploadManager accounts *accounts + dialer *fallbackDialer cache iworker.WorkerCache priceTables *priceTables transportPoolV3 *transportPoolV3 @@ -1305,6 +1307,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush allowPrivateIPs: allowPrivateIPs, contractLockingDuration: contractLockingDuration, cache: iworker.NewCache(b, l), + dialer: newFallbackDialer(b, net.Dialer{}), eventSubscriber: iworker.NewEventSubscriber(a, b, l, 10*time.Second), id: id, bus: b, From fd49ecde0679a36b02be40e2c207fa8f53e98d4c Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 9 Aug 2024 12:54:14 -0400 Subject: [PATCH 06/30] use new dialer in worker --- worker/net.go | 11 ----------- worker/rhpv2.go | 2 +- worker/rhpv3.go | 20 ++++++++++++-------- 3 files changed, 13 insertions(+), 20 deletions(-) delete mode 100644 worker/net.go diff --git a/worker/net.go b/worker/net.go deleted file mode 100644 index 8401062ab..000000000 --- a/worker/net.go +++ /dev/null @@ -1,11 +0,0 @@ -package worker - -import ( - "context" - "net" -) - -func dial(ctx context.Context, hostIP string) (net.Conn, error) { - conn, err := (&net.Dialer{}).DialContext(ctx, "tcp", hostIP) - return conn, err -} diff --git a/worker/rhpv2.go b/worker/rhpv2.go index 048e42962..1df72fafa 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -635,7 +635,7 @@ func (w *worker) fetchContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevis } func (w *worker) withTransportV2(ctx context.Context, hostKey types.PublicKey, hostIP string, fn func(*rhpv2.Transport) error) (err error) { - conn, err := dial(ctx, hostIP) + conn, err := w.dialer.DialContext(ctx, "tcp", hostIP) if err != nil { return err } diff --git a/worker/rhpv3.go b/worker/rhpv3.go index f95f596d3..0500e3a0b 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -135,6 +135,7 @@ type transportV3 struct { mu sync.Mutex hostKey types.PublicKey siamuxAddr string + dialer *fallbackDialer t *rhpv3.Transport } @@ -174,7 +175,7 @@ func (t *transportV3) DialStream(ctx context.Context) (*streamV3, error) { t.mu.Lock() if t.t == nil { start := time.Now() - newTransport, err := dialTransport(ctx, t.siamuxAddr, t.hostKey) + newTransport, err := dialTransport(ctx, t.dialer, t.siamuxAddr, t.hostKey) if err != nil { t.mu.Unlock() return nil, fmt.Errorf("DialStream: %w: %w (%v)", errDialTransport, err, time.Since(start)) @@ -211,19 +212,21 @@ func (t *transportV3) DialStream(ctx context.Context) (*streamV3, error) { // transportPoolV3 is a pool of rhpv3.Transports which allows for reusing them. type transportPoolV3 struct { - mu sync.Mutex - pool map[string]*transportV3 + mu sync.Mutex + dialer *fallbackDialer + pool map[string]*transportV3 } -func newTransportPoolV3() *transportPoolV3 { +func newTransportPoolV3(dialer *fallbackDialer) *transportPoolV3 { return &transportPoolV3{ - pool: make(map[string]*transportV3), + dialer: dialer, + pool: make(map[string]*transportV3), } } -func dialTransport(ctx context.Context, siamuxAddr string, hostKey types.PublicKey) (*rhpv3.Transport, error) { +func dialTransport(ctx context.Context, dialer *fallbackDialer, siamuxAddr string, hostKey types.PublicKey) (*rhpv3.Transport, error) { // Dial host. - conn, err := dial(ctx, siamuxAddr) + conn, err := dialer.DialContext(ctx, "tcp", siamuxAddr) if err != nil { return nil, err } @@ -253,6 +256,7 @@ func (p *transportPoolV3) withTransportV3(ctx context.Context, hostKey types.Pub t = &transportV3{ hostKey: hostKey, siamuxAddr: siamuxAddr, + dialer: p.dialer, } p.pool[siamuxAddr] = t } @@ -410,7 +414,7 @@ func (w *worker) initTransportPool() { if w.transportPoolV3 != nil { panic("transport pool already initialized") // developer error } - w.transportPoolV3 = newTransportPoolV3() + w.transportPoolV3 = newTransportPoolV3(w.dialer) } // ForHost returns an account to use for a given host. If the account From ebd48952bc6542dd360bd8b7e9df644f5b573fe2 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 9 Aug 2024 13:04:17 -0400 Subject: [PATCH 07/30] use bus in fallback dialer --- worker/dialer.go | 34 ++++++++++++++++++---------------- worker/rhpv2.go | 2 +- worker/rhpv3.go | 2 +- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/worker/dialer.go b/worker/dialer.go index a502841bc..21f7cabe2 100644 --- a/worker/dialer.go +++ b/worker/dialer.go @@ -5,6 +5,8 @@ import ( "fmt" "net" "sync" + + "go.sia.tech/core/types" ) // Cache to store resolved IPs @@ -55,7 +57,7 @@ func newFallbackDialer(bus Bus, dialer net.Dialer) *fallbackDialer { } } -func (d *fallbackDialer) DialContext(ctx context.Context, network, address string) (net.Conn, error) { +func (d *fallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address string) (net.Conn, error) { host, port, err := net.SplitHostPort(address) if err != nil { return nil, err @@ -66,12 +68,12 @@ func (d *fallbackDialer) DialContext(ctx context.Context, network, address strin if err == nil { // Cache the resolved IP and dial d.Cache.Set(host, ipAddr.String()) - return d.Dialer.DialContext(ctx, network, net.JoinHostPort(ipAddr.String(), port)) + return d.Dialer.DialContext(ctx, "tcp", net.JoinHostPort(ipAddr.String(), port)) } // If resolution fails, check the cache if cachedIP, ok := d.Cache.Get(host); ok { - conn, err := d.Dialer.DialContext(ctx, network, net.JoinHostPort(cachedIP, port)) + conn, err := d.Dialer.DialContext(ctx, "tcp", net.JoinHostPort(cachedIP, port)) if err == nil { return conn, nil } @@ -80,19 +82,19 @@ func (d *fallbackDialer) DialContext(ctx context.Context, network, address strin } // Attempt to resolve using the bus - // hostInfo, err := d.Bus.Host(ctx, host) - // if err != nil { - // return nil, err - // } - - // for _, addr := range hostInfo.ResolvedAddresses { - // conn, err := d.Dialer.DialContext(ctx, network, net.JoinHostPort(addr, port)) - // if err == nil { - // // Update cache on successful dial - // d.Cache.Set(host, addr) - // return conn, nil - // } - // } + hostInfo, err := d.Bus.Host(ctx, hk) + if err != nil { + return nil, err + } + + for _, addr := range hostInfo.ResolvedAddresses { + conn, err := d.Dialer.DialContext(ctx, "tcp", net.JoinHostPort(addr, port)) + if err == nil { + // Update cache on successful dial + d.Cache.Set(host, addr) + return conn, nil + } + } return nil, fmt.Errorf("failed to dial %s with all methods", address) } diff --git a/worker/rhpv2.go b/worker/rhpv2.go index 1df72fafa..5387a096c 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -635,7 +635,7 @@ func (w *worker) fetchContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevis } func (w *worker) withTransportV2(ctx context.Context, hostKey types.PublicKey, hostIP string, fn func(*rhpv2.Transport) error) (err error) { - conn, err := w.dialer.DialContext(ctx, "tcp", hostIP) + conn, err := w.dialer.Dial(ctx, hostKey, hostIP) if err != nil { return err } diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 0500e3a0b..d80d47134 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -226,7 +226,7 @@ func newTransportPoolV3(dialer *fallbackDialer) *transportPoolV3 { func dialTransport(ctx context.Context, dialer *fallbackDialer, siamuxAddr string, hostKey types.PublicKey) (*rhpv3.Transport, error) { // Dial host. - conn, err := dialer.DialContext(ctx, "tcp", siamuxAddr) + conn, err := dialer.Dial(ctx, hostKey, siamuxAddr) if err != nil { return nil, err } From 31924b133095a9c75c71f8e53a102975d1ca0a15 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 9 Aug 2024 13:10:06 -0400 Subject: [PATCH 08/30] update subnets in tests --- internal/test/host.go | 2 +- stores/hostdb_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/test/host.go b/internal/test/host.go index 18301be3a..824a12370 100644 --- a/internal/test/host.go +++ b/internal/test/host.go @@ -39,7 +39,7 @@ func NewHost(hk types.PublicKey, pt rhpv3.HostPriceTable, settings rhpv2.HostSet PriceTable: api.HostPriceTable{HostPriceTable: pt, Expiry: time.Now().Add(time.Minute)}, Settings: settings, Scanned: true, - ResolvedAddresses: []string{"38.135.51.0/24"}, + ResolvedAddresses: []string{"38.135.51.0"}, } } diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index 5f5390e07..fd225b84a 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -451,7 +451,7 @@ func TestRecordScan(t *testing.T) { // Record a scan. firstScanTime := time.Now().UTC() - resolvedAddresses := []string{"212.1.96.0/24", "38.135.51.0/24"} + resolvedAddresses := []string{"212.1.96.0", "38.135.51.0"} settings := rhpv2.HostSettings{NetAddress: "host.com"} pt := rhpv3.HostPriceTable{ HostBlockHeight: 123, From 73631dc2a249a589641f22a495c54a1d68cc489f Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 9 Aug 2024 13:19:16 -0400 Subject: [PATCH 09/30] fix lint --- api/host.go | 2 +- internal/utils/net.go | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/api/host.go b/api/host.go index b1a59dca9..282dbb8b8 100644 --- a/api/host.go +++ b/api/host.go @@ -159,7 +159,7 @@ type ( Blocked bool `json:"blocked"` Checks map[string]HostCheck `json:"checks"` StoredData uint64 `json:"storedData"` - ResolvedAddresses []string `json:"ResolvedAddresses"` + ResolvedAddresses []string `json:"resolvedAddresses"` } HostAddress struct { diff --git a/internal/utils/net.go b/internal/utils/net.go index 8f929e1b6..1330eb2b0 100644 --- a/internal/utils/net.go +++ b/internal/utils/net.go @@ -8,11 +8,6 @@ import ( "sort" ) -const ( - ipv4FilterRange = 24 - ipv6FilterRange = 32 -) - var ( privateSubnets []*net.IPNet From 2bfc8ee29aa241f50db7eabada92acdb32797bc0 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Mon, 12 Aug 2024 11:59:28 -0400 Subject: [PATCH 10/30] use subnet in hostset --- autopilot/contractor/contractor.go | 4 ++-- autopilot/contractor/hostset.go | 31 +++++++++++++++++------------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/autopilot/contractor/contractor.go b/autopilot/contractor/contractor.go index 9440137dd..b4a9bbd19 100644 --- a/autopilot/contractor/contractor.go +++ b/autopilot/contractor/contractor.go @@ -1357,8 +1357,8 @@ func performContractMaintenance(ctx *mCtx, alerter alerts.Alerter, bus Bus, chur // STEP 2: perform contract maintenance ipFilter := &hostSet{ - logger: logger.Named("ipFilter"), - addressToHostKey: make(map[string]string), + logger: logger.Named("ipFilter"), + subnetToHostKey: make(map[string]string), } keptContracts, churnReasons, err := performContractChecks(ctx, alerter, bus, w, cc, cr, ipFilter, logger, &remaining) if err != nil { diff --git a/autopilot/contractor/hostset.go b/autopilot/contractor/hostset.go index 015ac37e8..bc3932767 100644 --- a/autopilot/contractor/hostset.go +++ b/autopilot/contractor/hostset.go @@ -1,38 +1,43 @@ package contractor import ( + "errors" + "go.sia.tech/renterd/api" - "go.sia.tech/renterd/internal/utils" "go.uber.org/zap" ) +var ( + errHostTooManySubnets = errors.New("host has more than two subnets") +) + type ( hostSet struct { - addressToHostKey map[string]string + subnetToHostKey map[string]string logger *zap.SugaredLogger } ) func (hs *hostSet) HasRedundantIP(host api.Host) bool { - // validate host addresses - if len(host.ResolvedAddresses) == 0 { - hs.logger.Errorf("host %v has no address, treating its IP %v as redundant", host.PublicKey, host.NetAddress) + // validate host subnets + if len(host.Subnets) == 0 { + hs.logger.Errorf("host %v has no subnet, treating its IP %v as redundant", host.PublicKey, host.NetAddress) return true - } else if len(host.ResolvedAddresses) > 2 { - hs.logger.Errorf("host %v has more than 2 addresses, treating its IP %v as redundant", host.PublicKey, utils.ErrHostTooManyAddresses) + } else if len(host.Subnets) > 2 { + hs.logger.Errorf("host %v has more than 2 subnets, treating its IP %v as redundant", host.PublicKey, errHostTooManySubnets) return true } - // check if we know about this address + // check if we know about this subnet var knownHost string - for _, address := range host.ResolvedAddresses { - if knownHost = hs.addressToHostKey[address]; knownHost != "" { + for _, subnet := range host.Subnets { + if knownHost = hs.subnetToHostKey[subnet]; knownHost != "" { break } } - // if we know about the address, the host is redundant if it's not the same + // if we know about the subnet, the host is redundant if it's not the same if knownHost != "" { return host.PublicKey.String() != knownHost } @@ -40,7 +45,7 @@ func (hs *hostSet) HasRedundantIP(host api.Host) bool { } func (hs *hostSet) Add(host api.Host) { - for _, subnet := range host.ResolvedAddresses { - hs.addressToHostKey[subnet] = host.PublicKey.String() + for _, subnet := range host.Subnets { + hs.subnetToHostKey[subnet] = host.PublicKey.String() } } From b7d50f04bcb8f6e8047f2909cb029a53ec4721f6 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Mon, 12 Aug 2024 12:00:40 -0400 Subject: [PATCH 11/30] remove unnecessary exports in dialer --- worker/dialer.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/worker/dialer.go b/worker/dialer.go index 21f7cabe2..e41ba1ace 100644 --- a/worker/dialer.go +++ b/worker/dialer.go @@ -15,7 +15,7 @@ type hostCache struct { cache map[string]string // hostname -> IP address } -func NewhostCache() *hostCache { +func newHostCache() *hostCache { return &hostCache{ cache: make(map[string]string), } @@ -42,18 +42,18 @@ func (hc *hostCache) Clear(hostname string) { // fallbackDialer implements a custom net.Dialer with a fallback mechanism type fallbackDialer struct { - Cache *hostCache + cache *hostCache - Bus Bus - Dialer net.Dialer + bus Bus + dialer net.Dialer } func newFallbackDialer(bus Bus, dialer net.Dialer) *fallbackDialer { return &fallbackDialer{ - Cache: NewhostCache(), + cache: newHostCache(), - Bus: bus, - Dialer: dialer, + bus: bus, + dialer: dialer, } } @@ -67,31 +67,31 @@ func (d *fallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address s ipAddr, err := net.ResolveIPAddr("ip", host) if err == nil { // Cache the resolved IP and dial - d.Cache.Set(host, ipAddr.String()) - return d.Dialer.DialContext(ctx, "tcp", net.JoinHostPort(ipAddr.String(), port)) + d.cache.Set(host, ipAddr.String()) + return d.dialer.DialContext(ctx, "tcp", net.JoinHostPort(ipAddr.String(), port)) } // If resolution fails, check the cache - if cachedIP, ok := d.Cache.Get(host); ok { - conn, err := d.Dialer.DialContext(ctx, "tcp", net.JoinHostPort(cachedIP, port)) + if cachedIP, ok := d.cache.Get(host); ok { + conn, err := d.dialer.DialContext(ctx, "tcp", net.JoinHostPort(cachedIP, port)) if err == nil { return conn, nil } // Clear the cache if the cached IP doesn't work - d.Cache.Clear(host) + d.cache.Clear(host) } // Attempt to resolve using the bus - hostInfo, err := d.Bus.Host(ctx, hk) + hostInfo, err := d.bus.Host(ctx, hk) if err != nil { return nil, err } for _, addr := range hostInfo.ResolvedAddresses { - conn, err := d.Dialer.DialContext(ctx, "tcp", net.JoinHostPort(addr, port)) + conn, err := d.dialer.DialContext(ctx, "tcp", net.JoinHostPort(addr, port)) if err == nil { // Update cache on successful dial - d.Cache.Set(host, addr) + d.cache.Set(host, addr) return conn, nil } } From 317b7eef308fc6ecf9df756eea80360aaa898aa7 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Mon, 12 Aug 2024 12:11:58 -0400 Subject: [PATCH 12/30] move dialer to internal/worker --- {worker => internal/worker}/dialer.go | 17 +++++++++++------ worker/rhpv3.go | 10 ++++++---- worker/worker.go | 4 ++-- 3 files changed, 19 insertions(+), 12 deletions(-) rename {worker => internal/worker}/dialer.go (82%) diff --git a/worker/dialer.go b/internal/worker/dialer.go similarity index 82% rename from worker/dialer.go rename to internal/worker/dialer.go index e41ba1ace..32043f086 100644 --- a/worker/dialer.go +++ b/internal/worker/dialer.go @@ -7,6 +7,7 @@ import ( "sync" "go.sia.tech/core/types" + "go.sia.tech/renterd/api" ) // Cache to store resolved IPs @@ -40,16 +41,20 @@ func (hc *hostCache) Clear(hostname string) { delete(hc.cache, hostname) } -// fallbackDialer implements a custom net.Dialer with a fallback mechanism -type fallbackDialer struct { +type DialerBus interface { + Host(ctx context.Context, hostKey types.PublicKey) (api.Host, error) +} + +// FallbackDialer implements a custom net.Dialer with a fallback mechanism +type FallbackDialer struct { cache *hostCache - bus Bus + bus DialerBus dialer net.Dialer } -func newFallbackDialer(bus Bus, dialer net.Dialer) *fallbackDialer { - return &fallbackDialer{ +func NewFallbackDialer(bus DialerBus, dialer net.Dialer) *FallbackDialer { + return &FallbackDialer{ cache: newHostCache(), bus: bus, @@ -57,7 +62,7 @@ func newFallbackDialer(bus Bus, dialer net.Dialer) *fallbackDialer { } } -func (d *fallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address string) (net.Conn, error) { +func (d *FallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address string) (net.Conn, error) { host, port, err := net.SplitHostPort(address) if err != nil { return nil, err diff --git a/worker/rhpv3.go b/worker/rhpv3.go index d80d47134..d7051a71a 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -13,6 +13,8 @@ import ( "sync" "time" + iworker "go.sia.tech/renterd/internal/worker" + rhpv2 "go.sia.tech/core/rhp/v2" rhpv3 "go.sia.tech/core/rhp/v3" "go.sia.tech/core/types" @@ -135,7 +137,7 @@ type transportV3 struct { mu sync.Mutex hostKey types.PublicKey siamuxAddr string - dialer *fallbackDialer + dialer *iworker.FallbackDialer t *rhpv3.Transport } @@ -213,18 +215,18 @@ func (t *transportV3) DialStream(ctx context.Context) (*streamV3, error) { // transportPoolV3 is a pool of rhpv3.Transports which allows for reusing them. type transportPoolV3 struct { mu sync.Mutex - dialer *fallbackDialer + dialer *iworker.FallbackDialer pool map[string]*transportV3 } -func newTransportPoolV3(dialer *fallbackDialer) *transportPoolV3 { +func newTransportPoolV3(dialer *iworker.FallbackDialer) *transportPoolV3 { return &transportPoolV3{ dialer: dialer, pool: make(map[string]*transportV3), } } -func dialTransport(ctx context.Context, dialer *fallbackDialer, siamuxAddr string, hostKey types.PublicKey) (*rhpv3.Transport, error) { +func dialTransport(ctx context.Context, dialer *iworker.FallbackDialer, siamuxAddr string, hostKey types.PublicKey) (*rhpv3.Transport, error) { // Dial host. conn, err := dialer.Dial(ctx, hostKey, siamuxAddr) if err != nil { diff --git a/worker/worker.go b/worker/worker.go index 76438a749..e3988a865 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -215,7 +215,7 @@ type worker struct { uploadManager *uploadManager accounts *accounts - dialer *fallbackDialer + dialer *iworker.FallbackDialer cache iworker.WorkerCache priceTables *priceTables transportPoolV3 *transportPoolV3 @@ -1307,7 +1307,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush allowPrivateIPs: allowPrivateIPs, contractLockingDuration: contractLockingDuration, cache: iworker.NewCache(b, l), - dialer: newFallbackDialer(b, net.Dialer{}), + dialer: iworker.NewFallbackDialer(b, net.Dialer{}), eventSubscriber: iworker.NewEventSubscriber(a, b, l, 10*time.Second), id: id, bus: b, From 4142a776d341cecc5a26430f614fe78ed7015839 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Mon, 12 Aug 2024 12:12:42 -0400 Subject: [PATCH 13/30] Clear -> Delete --- internal/worker/dialer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/worker/dialer.go b/internal/worker/dialer.go index 32043f086..71ad729ac 100644 --- a/internal/worker/dialer.go +++ b/internal/worker/dialer.go @@ -35,7 +35,7 @@ func (hc *hostCache) Set(hostname, ip string) { hc.cache[hostname] = ip } -func (hc *hostCache) Clear(hostname string) { +func (hc *hostCache) Delete(hostname string) { hc.mu.Lock() defer hc.mu.Unlock() delete(hc.cache, hostname) @@ -82,8 +82,8 @@ func (d *FallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address s if err == nil { return conn, nil } - // Clear the cache if the cached IP doesn't work - d.cache.Clear(host) + // Delete the cache if the cached IP doesn't work + d.cache.Delete(host) } // Attempt to resolve using the bus From 94e43ca21c4dbb3774cb6a7930c8ca0938a142cf Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Mon, 12 Aug 2024 12:13:29 -0400 Subject: [PATCH 14/30] move dialer above mutex --- worker/rhpv3.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/worker/rhpv3.go b/worker/rhpv3.go index d7051a71a..852bb01a7 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -214,9 +214,10 @@ func (t *transportV3) DialStream(ctx context.Context) (*streamV3, error) { // transportPoolV3 is a pool of rhpv3.Transports which allows for reusing them. type transportPoolV3 struct { - mu sync.Mutex dialer *iworker.FallbackDialer - pool map[string]*transportV3 + + mu sync.Mutex + pool map[string]*transportV3 } func newTransportPoolV3(dialer *iworker.FallbackDialer) *transportPoolV3 { From 30ed63273b5e27a78b14f3105ef25cd26930fd6c Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Mon, 12 Aug 2024 12:18:43 -0400 Subject: [PATCH 15/30] log when using fallbacks --- internal/worker/dialer.go | 7 ++++++- worker/worker.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/internal/worker/dialer.go b/internal/worker/dialer.go index 71ad729ac..0c47f0468 100644 --- a/internal/worker/dialer.go +++ b/internal/worker/dialer.go @@ -8,6 +8,7 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/api" + "go.uber.org/zap" ) // Cache to store resolved IPs @@ -50,14 +51,16 @@ type FallbackDialer struct { cache *hostCache bus DialerBus + logger *zap.SugaredLogger dialer net.Dialer } -func NewFallbackDialer(bus DialerBus, dialer net.Dialer) *FallbackDialer { +func NewFallbackDialer(bus DialerBus, logger *zap.Logger, dialer net.Dialer) *FallbackDialer { return &FallbackDialer{ cache: newHostCache(), bus: bus, + logger: logger.Sugar().Named("fallbackdialer"), dialer: dialer, } } @@ -78,6 +81,7 @@ func (d *FallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address s // If resolution fails, check the cache if cachedIP, ok := d.cache.Get(host); ok { + d.logger.Warn("Failed to resolve host, using cached IP", zap.String("host", host)) conn, err := d.dialer.DialContext(ctx, "tcp", net.JoinHostPort(cachedIP, port)) if err == nil { return conn, nil @@ -87,6 +91,7 @@ func (d *FallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address s } // Attempt to resolve using the bus + d.logger.Warn("Cache not available or cached IP stale, retrieving host resolved addresses from bus", zap.String("host", host)) hostInfo, err := d.bus.Host(ctx, hk) if err != nil { return nil, err diff --git a/worker/worker.go b/worker/worker.go index e3988a865..641985cae 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1307,7 +1307,7 @@ func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlush allowPrivateIPs: allowPrivateIPs, contractLockingDuration: contractLockingDuration, cache: iworker.NewCache(b, l), - dialer: iworker.NewFallbackDialer(b, net.Dialer{}), + dialer: iworker.NewFallbackDialer(b, l, net.Dialer{}), eventSubscriber: iworker.NewEventSubscriber(a, b, l, 10*time.Second), id: id, bus: b, From 267a302b4d4b18a749d1d57ee1334341953de0f2 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Mon, 12 Aug 2024 12:20:49 -0400 Subject: [PATCH 16/30] only cache after successful dial --- internal/worker/dialer.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/worker/dialer.go b/internal/worker/dialer.go index 0c47f0468..1741b7926 100644 --- a/internal/worker/dialer.go +++ b/internal/worker/dialer.go @@ -74,9 +74,13 @@ func (d *FallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address s // Try to resolve IP ipAddr, err := net.ResolveIPAddr("ip", host) if err == nil { - // Cache the resolved IP and dial + // Dial and cache the resolved IP + conn, err := d.dialer.DialContext(ctx, "tcp", net.JoinHostPort(ipAddr.String(), port)) + if err != nil { + return nil, err + } d.cache.Set(host, ipAddr.String()) - return d.dialer.DialContext(ctx, "tcp", net.JoinHostPort(ipAddr.String(), port)) + return conn, nil } // If resolution fails, check the cache From 3e26c33e998ec713812e63041c7c9d2a4ad8ccde Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Mon, 12 Aug 2024 12:25:56 -0400 Subject: [PATCH 17/30] update hostdb test to check subnets --- stores/hostdb_test.go | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index fd225b84a..1fe6c3ec3 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -439,6 +439,8 @@ func TestRecordScan(t *testing.T) { // The host shouldn't have any addresses. if len(host.ResolvedAddresses) != 0 { t.Fatal("unexpected", host.ResolvedAddresses, len(host.ResolvedAddresses)) + } else if len(host.Subnets) != 0 { + t.Fatal("unexpected", host.Subnets, len(host.Subnets)) } // Fetch the host directly to get the creation time. @@ -452,11 +454,12 @@ func TestRecordScan(t *testing.T) { // Record a scan. firstScanTime := time.Now().UTC() resolvedAddresses := []string{"212.1.96.0", "38.135.51.0"} + subnets := []string{"212.1.96.0/24", "38.135.51.0/24"} settings := rhpv2.HostSettings{NetAddress: "host.com"} pt := rhpv3.HostPriceTable{ HostBlockHeight: 123, } - if err := ss.RecordHostScans(ctx, []api.HostScan{newTestScan(hk, firstScanTime, settings, pt, true, resolvedAddresses)}); err != nil { + if err := ss.RecordHostScans(ctx, []api.HostScan{newTestScan(hk, firstScanTime, settings, pt, true, resolvedAddresses, subnets)}); err != nil { t.Fatal(err) } host, err = ss.Host(ctx, hk) @@ -476,7 +479,9 @@ func TestRecordScan(t *testing.T) { // The host should have the addresses. if !reflect.DeepEqual(host.ResolvedAddresses, resolvedAddresses) { - t.Fatal("mismatch") + t.Fatal("resolved addresses mismatch") + } else if !reflect.DeepEqual(host.Subnets, subnets) { + t.Fatal("subnets mismatch") } // We expect no uptime or downtime from only a single scan. @@ -506,7 +511,7 @@ func TestRecordScan(t *testing.T) { // subnets this time. secondScanTime := firstScanTime.Add(time.Hour) pt.HostBlockHeight = 456 - if err := ss.RecordHostScans(ctx, []api.HostScan{newTestScan(hk, secondScanTime, settings, pt, true, nil)}); err != nil { + if err := ss.RecordHostScans(ctx, []api.HostScan{newTestScan(hk, secondScanTime, settings, pt, true, nil, nil)}); err != nil { t.Fatal(err) } host, err = ss.Host(ctx, hk) @@ -536,12 +541,14 @@ func TestRecordScan(t *testing.T) { // The host should still have the subnets. if !reflect.DeepEqual(host.ResolvedAddresses, resolvedAddresses) { - t.Fatal("mismatch") + t.Fatal("resolved addresses mismatch") + } else if !reflect.DeepEqual(host.Subnets, subnets) { + t.Fatal("subnets mismatch") } // Record another scan 2 hours after the second one. This time it fails. thirdScanTime := secondScanTime.Add(2 * time.Hour) - if err := ss.RecordHostScans(ctx, []api.HostScan{newTestScan(hk, thirdScanTime, settings, pt, false, nil)}); err != nil { + if err := ss.RecordHostScans(ctx, []api.HostScan{newTestScan(hk, thirdScanTime, settings, pt, false, nil, nil)}); err != nil { t.Fatal(err) } host, err = ss.Host(ctx, hk) @@ -599,8 +606,8 @@ func TestRemoveHosts(t *testing.T) { pt := rhpv3.HostPriceTable{} t1 := now.Add(-time.Minute * 120) // 2 hours ago t2 := now.Add(-time.Minute * 90) // 1.5 hours ago (30min downtime) - hi1 := newTestScan(hk, t1, rhpv2.HostSettings{NetAddress: "host.com"}, pt, false, nil) - hi2 := newTestScan(hk, t2, rhpv2.HostSettings{NetAddress: "host.com"}, pt, false, nil) + hi1 := newTestScan(hk, t1, rhpv2.HostSettings{NetAddress: "host.com"}, pt, false, nil, nil) + hi2 := newTestScan(hk, t2, rhpv2.HostSettings{NetAddress: "host.com"}, pt, false, nil, nil) // record interactions if err := ss.RecordHostScans(context.Background(), []api.HostScan{hi1, hi2}); err != nil { @@ -630,7 +637,7 @@ func TestRemoveHosts(t *testing.T) { // record interactions t3 := now.Add(-time.Minute * 60) // 1 hour ago (60min downtime) - hi3 := newTestScan(hk, t3, rhpv2.HostSettings{NetAddress: "host.com"}, pt, false, nil) + hi3 := newTestScan(hk, t3, rhpv2.HostSettings{NetAddress: "host.com"}, pt, false, nil, nil) if err := ss.RecordHostScans(context.Background(), []api.HostScan{hi3}); err != nil { t.Fatal(err) } @@ -1110,12 +1117,13 @@ func TestSQLHostBlocklistBasic(t *testing.T) { } // newTestScan returns a host interaction with given parameters. -func newTestScan(hk types.PublicKey, scanTime time.Time, settings rhpv2.HostSettings, pt rhpv3.HostPriceTable, success bool, resolvedAddresses []string) api.HostScan { +func newTestScan(hk types.PublicKey, scanTime time.Time, settings rhpv2.HostSettings, pt rhpv3.HostPriceTable, success bool, resolvedAddresses, subnets []string) api.HostScan { return api.HostScan{ HostKey: hk, PriceTable: pt, Settings: settings, ResolvedAddresses: resolvedAddresses, + Subnets: subnets, Success: success, Timestamp: scanTime, } From 5865fca6a9d2249e1452ce6ece73111e71723b1e Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Mon, 12 Aug 2024 12:57:10 -0400 Subject: [PATCH 18/30] add hostset test --- autopilot/contractor/hostset_test.go | 86 ++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 autopilot/contractor/hostset_test.go diff --git a/autopilot/contractor/hostset_test.go b/autopilot/contractor/hostset_test.go new file mode 100644 index 000000000..01e7280be --- /dev/null +++ b/autopilot/contractor/hostset_test.go @@ -0,0 +1,86 @@ +package contractor + +import ( + "testing" + + "go.sia.tech/core/types" + "go.sia.tech/renterd/api" + "go.uber.org/zap" +) + +func TestHostSet(t *testing.T) { + logger, _ := zap.NewDevelopment() + sugarLogger := logger.Sugar() + + hs := &hostSet{ + subnetToHostKey: make(map[string]string), + logger: sugarLogger, + } + + // Host with no subnets + host1 := api.Host{ + PublicKey: types.GeneratePrivateKey().PublicKey(), + NetAddress: "192.168.1.1", + Subnets: []string{}, + } + if !hs.HasRedundantIP(host1) { + t.Fatalf("Expected host with no subnets to be considered redundant") + } + + // Host with more than 2 subnets + host2 := api.Host{ + PublicKey: types.GeneratePrivateKey().PublicKey(), + NetAddress: "192.168.1.2", + Subnets: []string{"192.168.1.0/24", "10.0.0.0/24", "172.16.0.0/24"}, + } + if !hs.HasRedundantIP(host2) { + t.Fatalf("Expected host with more than 2 subnets to be considered redundant") + } + + // New host with unique subnet + host3 := api.Host{ + PublicKey: types.GeneratePrivateKey().PublicKey(), + NetAddress: "192.168.2.3", + Subnets: []string{"192.168.2.0/24"}, + } + if hs.HasRedundantIP(host3) { + t.Fatal("Expected new host with unique subnet to not be considered redundant") + } + hs.Add(host3) + + // New host with same subnet but different public key + host4 := api.Host{ + PublicKey: types.GeneratePrivateKey().PublicKey(), + NetAddress: "192.168.2.4", + Subnets: []string{"192.168.2.0/24"}, + } + if !hs.HasRedundantIP(host4) { + t.Fatal("Expected host with same subnet but different public key to be considered redundant") + } + + // Same host from before + if hs.HasRedundantIP(host3) { + t.Fatal("Expected same host to not be considered redundant") + } + + // Host with two valid subnets + host5 := api.Host{ + PublicKey: types.GeneratePrivateKey().PublicKey(), + NetAddress: "192.168.3.5", + Subnets: []string{"192.168.3.0/24", "10.0.0.0/24"}, + } + if hs.HasRedundantIP(host5) { + t.Fatal("Expected host with two valid subnets to not be considered redundant") + } + hs.Add(host5) + + // New host with one overlapping subnet + host6 := api.Host{ + PublicKey: types.GeneratePrivateKey().PublicKey(), + NetAddress: "10.0.0.1", + Subnets: []string{"10.0.0.0/24", "172.16.0.0/24"}, + } + if !hs.HasRedundantIP(host6) { + t.Fatal("Expected host with one overlapping subnet to be considered redundant") + } +} \ No newline at end of file From 3be459e7158670da86f4184fff22a209f8cf47d2 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Mon, 12 Aug 2024 14:12:16 -0400 Subject: [PATCH 19/30] keep subnets field --- api/host.go | 2 ++ autopilot/contractor/hostset_test.go | 2 +- internal/utils/net.go | 32 ++++++++++++++++++++++++++++ stores/sql/main.go | 5 +++++ worker/worker.go | 10 +++++++-- 5 files changed, 48 insertions(+), 3 deletions(-) diff --git a/api/host.go b/api/host.go index 282dbb8b8..180231700 100644 --- a/api/host.go +++ b/api/host.go @@ -160,6 +160,7 @@ type ( Checks map[string]HostCheck `json:"checks"` StoredData uint64 `json:"storedData"` ResolvedAddresses []string `json:"resolvedAddresses"` + Subnets []string `json:"subnets"` } HostAddress struct { @@ -185,6 +186,7 @@ type ( PriceTable rhpv3.HostPriceTable Settings rhpv2.HostSettings ResolvedAddresses []string + Subnets []string Success bool Timestamp time.Time } diff --git a/autopilot/contractor/hostset_test.go b/autopilot/contractor/hostset_test.go index 01e7280be..48021e1f7 100644 --- a/autopilot/contractor/hostset_test.go +++ b/autopilot/contractor/hostset_test.go @@ -83,4 +83,4 @@ func TestHostSet(t *testing.T) { if !hs.HasRedundantIP(host6) { t.Fatal("Expected host with one overlapping subnet to be considered redundant") } -} \ No newline at end of file +} diff --git a/internal/utils/net.go b/internal/utils/net.go index 1330eb2b0..ea3bee758 100644 --- a/internal/utils/net.go +++ b/internal/utils/net.go @@ -8,6 +8,11 @@ import ( "sort" ) +const ( + ipv4FilterRange = 24 + ipv6FilterRange = 32 +) + var ( privateSubnets []*net.IPNet @@ -31,6 +36,33 @@ func init() { } } +func AddressesToSubnets(resolvedAddresses []string) ([]string, error) { + var subnets []string + for _, addr := range resolvedAddresses { + parsed := net.ParseIP(addr) + if parsed == nil { + return nil, errors.New("failed to parse address") + } + + // figure out the IP range + ipRange := ipv6FilterRange + if parsed.To4() != nil { + ipRange = ipv4FilterRange + } + + // parse the subnet + cidr := fmt.Sprintf("%s/%d", parsed.String(), ipRange) + _, ipnet, err := net.ParseCIDR(cidr) + if err != nil { + return nil, err + } + + subnets = append(subnets, ipnet.String()) + } + + return subnets, nil +} + func ResolveHostIP(ctx context.Context, hostIP string) (ips []string, private bool, _ error) { // resolve host address host, _, err := net.SplitHostPort(hostIP) diff --git a/stores/sql/main.go b/stores/sql/main.go index 9e3c65cd5..b32ddb1c6 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -22,6 +22,7 @@ import ( "go.sia.tech/coreutils/wallet" "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/sql" + "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/object" "go.sia.tech/renterd/webhooks" "lukechampine.com/frand" @@ -2135,6 +2136,10 @@ func SearchHosts(ctx context.Context, tx sql.Tx, autopilot, filterMode, usabilit if resolvedAddresses != "" { h.ResolvedAddresses = strings.Split(resolvedAddresses, ",") + h.Subnets, err = utils.AddressesToSubnets(h.ResolvedAddresses) + if err != nil { + return nil, fmt.Errorf("failed to convert addresses to subnets: %w", err) + } } h.PriceTable.Expiry = pte.Time h.StoredData = storedDataMap[hostID] diff --git a/worker/worker.go b/worker/worker.go index 641985cae..3b4f26858 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1452,13 +1452,18 @@ func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty // resolve host ip, don't scan if the host is on a private network or if it // resolves to more than two addresses of the same type, if it fails for // another reason the host scan won't have subnets - subnets, private, err := utils.ResolveHostIP(ctx, hostIP) + resolvedAddresses, private, err := utils.ResolveHostIP(ctx, hostIP) if errors.Is(err, utils.ErrHostTooManyAddresses) { return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, err } else if private && !w.allowPrivateIPs { return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, api.ErrHostOnPrivateNetwork } + subnets, err := utils.AddressesToSubnets(resolvedAddresses) + if err != nil { + return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, fmt.Errorf("failed to convert addresses to subnets: %w", err) + } + // scan: first try settings, pt, duration, err := scan() if err != nil { @@ -1496,7 +1501,8 @@ func (w *worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty { HostKey: hostKey, PriceTable: pt, - ResolvedAddresses: subnets, + ResolvedAddresses: resolvedAddresses, + Subnets: subnets, // NOTE: A scan is considered successful if both fetching the price // table and the settings succeeded. Right now scanning can't fail From 24129af8317e70cc07655ba6f7e891f312f22dd6 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Wed, 14 Aug 2024 10:26:15 -0400 Subject: [PATCH 20/30] keep subnets in internal test host --- internal/test/host.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/test/host.go b/internal/test/host.go index 824a12370..c08a11723 100644 --- a/internal/test/host.go +++ b/internal/test/host.go @@ -40,6 +40,7 @@ func NewHost(hk types.PublicKey, pt rhpv3.HostPriceTable, settings rhpv2.HostSet Settings: settings, Scanned: true, ResolvedAddresses: []string{"38.135.51.0"}, + Subnets: []string{"38.135.51.0/24"}, } } From a8e166521218756a68f0ac280950b09fdd6d6919 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Wed, 14 Aug 2024 10:32:54 -0400 Subject: [PATCH 21/30] remove ResolveIPAddr step in dialer and just use the address off the net.Conn --- internal/worker/dialer.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/internal/worker/dialer.go b/internal/worker/dialer.go index 1741b7926..926312bef 100644 --- a/internal/worker/dialer.go +++ b/internal/worker/dialer.go @@ -66,23 +66,18 @@ func NewFallbackDialer(bus DialerBus, logger *zap.Logger, dialer net.Dialer) *Fa } func (d *FallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address string) (net.Conn, error) { + // Dial and cache the resolved IP if dial successful + conn, err := d.dialer.DialContext(ctx, "tcp", address) + if err == nil { + d.cache.Set(host, conn.RemoteAddr().String()) + return conn, nil + } + host, port, err := net.SplitHostPort(address) if err != nil { return nil, err } - // Try to resolve IP - ipAddr, err := net.ResolveIPAddr("ip", host) - if err == nil { - // Dial and cache the resolved IP - conn, err := d.dialer.DialContext(ctx, "tcp", net.JoinHostPort(ipAddr.String(), port)) - if err != nil { - return nil, err - } - d.cache.Set(host, ipAddr.String()) - return conn, nil - } - // If resolution fails, check the cache if cachedIP, ok := d.cache.Get(host); ok { d.logger.Warn("Failed to resolve host, using cached IP", zap.String("host", host)) From c9504db1080bab58b21ed622643b5a3b6b6c9b1f Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Wed, 14 Aug 2024 10:34:31 -0400 Subject: [PATCH 22/30] update schema --- internal/worker/dialer.go | 10 +++++----- stores/sql/mysql/migrations/main/schema.sql | 2 +- stores/sql/sqlite/migrations/main/schema.sql | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/worker/dialer.go b/internal/worker/dialer.go index 926312bef..3378b27ca 100644 --- a/internal/worker/dialer.go +++ b/internal/worker/dialer.go @@ -66,6 +66,11 @@ func NewFallbackDialer(bus DialerBus, logger *zap.Logger, dialer net.Dialer) *Fa } func (d *FallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address string) (net.Conn, error) { + host, port, err := net.SplitHostPort(address) + if err != nil { + return nil, err + } + // Dial and cache the resolved IP if dial successful conn, err := d.dialer.DialContext(ctx, "tcp", address) if err == nil { @@ -73,11 +78,6 @@ func (d *FallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address s return conn, nil } - host, port, err := net.SplitHostPort(address) - if err != nil { - return nil, err - } - // If resolution fails, check the cache if cachedIP, ok := d.cache.Get(host); ok { d.logger.Warn("Failed to resolve host, using cached IP", zap.String("host", host)) diff --git a/stores/sql/mysql/migrations/main/schema.sql b/stores/sql/mysql/migrations/main/schema.sql index f400e9010..51a5c5629 100644 --- a/stores/sql/mysql/migrations/main/schema.sql +++ b/stores/sql/mysql/migrations/main/schema.sql @@ -97,7 +97,7 @@ CREATE TABLE `hosts` ( `lost_sectors` bigint unsigned DEFAULT NULL, `last_announcement` datetime(3) DEFAULT NULL, `net_address` varchar(191) DEFAULT NULL, - `subnets` varchar(255) NOT NULL DEFAULT '', + `resolved_addresses` varchar(255) NOT NULL DEFAULT '', PRIMARY KEY (`id`), UNIQUE KEY `public_key` (`public_key`), KEY `idx_hosts_public_key` (`public_key`), diff --git a/stores/sql/sqlite/migrations/main/schema.sql b/stores/sql/sqlite/migrations/main/schema.sql index af8ffaa33..647e6cfdd 100644 --- a/stores/sql/sqlite/migrations/main/schema.sql +++ b/stores/sql/sqlite/migrations/main/schema.sql @@ -12,7 +12,7 @@ CREATE INDEX `idx_archived_contracts_state` ON `archived_contracts`(`state`); CREATE INDEX `idx_archived_contracts_renewed_from` ON `archived_contracts`(`renewed_from`); -- dbHost -CREATE TABLE `hosts` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`public_key` blob NOT NULL UNIQUE,`settings` text,`price_table` text,`price_table_expiry` datetime,`total_scans` integer,`last_scan` integer,`last_scan_success` numeric,`second_to_last_scan_success` numeric,`scanned` numeric,`uptime` integer,`downtime` integer,`recent_downtime` integer,`recent_scan_failures` integer,`successful_interactions` real,`failed_interactions` real,`lost_sectors` integer,`last_announcement` datetime,`net_address` text,`subnets` text NOT NULL DEFAULT ''); +CREATE TABLE `hosts` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`public_key` blob NOT NULL UNIQUE,`settings` text,`price_table` text,`price_table_expiry` datetime,`total_scans` integer,`last_scan` integer,`last_scan_success` numeric,`second_to_last_scan_success` numeric,`scanned` numeric,`uptime` integer,`downtime` integer,`recent_downtime` integer,`recent_scan_failures` integer,`successful_interactions` real,`failed_interactions` real,`lost_sectors` integer,`last_announcement` datetime,`net_address` text,`resolved_addresses` text NOT NULL DEFAULT ''); CREATE INDEX `idx_hosts_recent_scan_failures` ON `hosts`(`recent_scan_failures`); CREATE INDEX `idx_hosts_recent_downtime` ON `hosts`(`recent_downtime`); CREATE INDEX `idx_hosts_scanned` ON `hosts`(`scanned`); From 2eea686d929e7c50d0c3084b5c711b6e1d57ea6a Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Wed, 14 Aug 2024 12:59:12 -0400 Subject: [PATCH 23/30] migrations changes --- .../migrations/main/migration_00014_hosts_resolvedaddresses.sql | 2 +- .../migrations/main/migration_00014_hosts_resolvedaddresses.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/stores/sql/mysql/migrations/main/migration_00014_hosts_resolvedaddresses.sql b/stores/sql/mysql/migrations/main/migration_00014_hosts_resolvedaddresses.sql index f67e46e2c..c7657d407 100644 --- a/stores/sql/mysql/migrations/main/migration_00014_hosts_resolvedaddresses.sql +++ b/stores/sql/mysql/migrations/main/migration_00014_hosts_resolvedaddresses.sql @@ -1,3 +1,3 @@ ALTER TABLE hosts DROP COLUMN subnets; -ALTER TABLE hosts ADD resolved_addresses varchar(255); +ALTER TABLE hosts ADD resolved_addresses varchar(255) NOT NULL DEFAULT ''; diff --git a/stores/sql/sqlite/migrations/main/migration_00014_hosts_resolvedaddresses.sql b/stores/sql/sqlite/migrations/main/migration_00014_hosts_resolvedaddresses.sql index f67e46e2c..126d9d75f 100644 --- a/stores/sql/sqlite/migrations/main/migration_00014_hosts_resolvedaddresses.sql +++ b/stores/sql/sqlite/migrations/main/migration_00014_hosts_resolvedaddresses.sql @@ -1,3 +1,3 @@ ALTER TABLE hosts DROP COLUMN subnets; -ALTER TABLE hosts ADD resolved_addresses varchar(255); +ALTER TABLE hosts ADD resolved_addresses TEXT NOT NULL DEFAULT ''; From dc7585804d653f0d2ae61ef56ab84067e4711b12 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Wed, 14 Aug 2024 14:02:21 -0400 Subject: [PATCH 24/30] update field names in query --- build/meta.go | 4 ++-- stores/sql/main.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/build/meta.go b/build/meta.go index e45d2af90..49aeb39eb 100644 --- a/build/meta.go +++ b/build/meta.go @@ -9,8 +9,8 @@ import ( ) const ( - commit = "?" - version = "?" + commit = "?" + version = "?" buildTime = 0 ) diff --git a/stores/sql/main.go b/stores/sql/main.go index b32ddb1c6..0dba1fbfe 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -1732,7 +1732,7 @@ func RecordHostScans(ctx context.Context, tx sql.Tx, scans []api.HostScan) error price_table_expiry = CASE WHEN ? AND (price_table_expiry IS NULL OR ? > price_table_expiry) THEN ? ELSE price_table_expiry END, successful_interactions = CASE WHEN ? THEN successful_interactions + 1 ELSE successful_interactions END, failed_interactions = CASE WHEN ? THEN failed_interactions + 1 ELSE failed_interactions END, - subnets = CASE WHEN ? THEN ? ELSE subnets END + resolved_addresses = CASE WHEN ? THEN ? ELSE resolved_addresses END WHERE public_key = ? `) if err != nil { @@ -2107,7 +2107,7 @@ func SearchHosts(ctx context.Context, tx sql.Tx, autopilot, filterMode, usabilit SELECT h.id, h.created_at, h.last_announcement, h.public_key, h.net_address, h.price_table, h.price_table_expiry, h.settings, h.total_scans, h.last_scan, h.last_scan_success, h.second_to_last_scan_success, h.uptime, h.downtime, h.successful_interactions, h.failed_interactions, COALESCE(h.lost_sectors, 0), - h.scanned, h.subnets, %s + h.scanned, h.resolved_addresses, %s FROM hosts h %s %s From 8c32308c42357d4c915d20df51b3b24f6e1ea849 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Fri, 16 Aug 2024 11:35:36 +0200 Subject: [PATCH 25/30] contractor: add compat code to HasRedundantIP --- autopilot/contractor/hostset.go | 29 +++++++++++++++++++---- autopilot/contractor/hostset_test.go | 35 +++++++++++----------------- internal/test/host.go | 2 +- worker/worker.go | 6 ----- 4 files changed, 39 insertions(+), 33 deletions(-) diff --git a/autopilot/contractor/hostset.go b/autopilot/contractor/hostset.go index bc3932767..0ae4ac6c2 100644 --- a/autopilot/contractor/hostset.go +++ b/autopilot/contractor/hostset.go @@ -1,9 +1,12 @@ package contractor import ( + "context" "errors" + "time" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/utils" "go.uber.org/zap" ) @@ -20,18 +23,31 @@ type ( ) func (hs *hostSet) HasRedundantIP(host api.Host) bool { + // compat code for hosts that have been scanned before ResolvedAddresses + // were introduced + if len(host.ResolvedAddresses) == 0 { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + host.ResolvedAddresses, _, _ = utils.ResolveHostIP(ctx, host.NetAddress) + } + + subnets, err := utils.AddressesToSubnets(host.ResolvedAddresses) + if err != nil { + hs.logger.Errorf("failed to parse host %v subnets: %v", host.PublicKey, err) + return true + } // validate host subnets - if len(host.Subnets) == 0 { + if len(subnets) == 0 { hs.logger.Errorf("host %v has no subnet, treating its IP %v as redundant", host.PublicKey, host.NetAddress) return true - } else if len(host.Subnets) > 2 { + } else if len(subnets) > 2 { hs.logger.Errorf("host %v has more than 2 subnets, treating its IP %v as redundant", host.PublicKey, errHostTooManySubnets) return true } // check if we know about this subnet var knownHost string - for _, subnet := range host.Subnets { + for _, subnet := range subnets { if knownHost = hs.subnetToHostKey[subnet]; knownHost != "" { break } @@ -45,7 +61,12 @@ func (hs *hostSet) HasRedundantIP(host api.Host) bool { } func (hs *hostSet) Add(host api.Host) { - for _, subnet := range host.Subnets { + subnets, err := utils.AddressesToSubnets(host.ResolvedAddresses) + if err != nil { + hs.logger.Errorf("failed to parse host %v subnets: %v", host.PublicKey, err) + return + } + for _, subnet := range subnets { hs.subnetToHostKey[subnet] = host.PublicKey.String() } } diff --git a/autopilot/contractor/hostset_test.go b/autopilot/contractor/hostset_test.go index 48021e1f7..fbd1bb2e2 100644 --- a/autopilot/contractor/hostset_test.go +++ b/autopilot/contractor/hostset_test.go @@ -9,19 +9,15 @@ import ( ) func TestHostSet(t *testing.T) { - logger, _ := zap.NewDevelopment() - sugarLogger := logger.Sugar() - hs := &hostSet{ subnetToHostKey: make(map[string]string), - logger: sugarLogger, + logger: zap.NewNop().Sugar(), } // Host with no subnets host1 := api.Host{ - PublicKey: types.GeneratePrivateKey().PublicKey(), - NetAddress: "192.168.1.1", - Subnets: []string{}, + PublicKey: types.GeneratePrivateKey().PublicKey(), + ResolvedAddresses: []string{}, } if !hs.HasRedundantIP(host1) { t.Fatalf("Expected host with no subnets to be considered redundant") @@ -29,9 +25,8 @@ func TestHostSet(t *testing.T) { // Host with more than 2 subnets host2 := api.Host{ - PublicKey: types.GeneratePrivateKey().PublicKey(), - NetAddress: "192.168.1.2", - Subnets: []string{"192.168.1.0/24", "10.0.0.0/24", "172.16.0.0/24"}, + PublicKey: types.GeneratePrivateKey().PublicKey(), + ResolvedAddresses: []string{"192.168.1.1", "10.0.0.1", "172.16.0.1"}, } if !hs.HasRedundantIP(host2) { t.Fatalf("Expected host with more than 2 subnets to be considered redundant") @@ -39,9 +34,8 @@ func TestHostSet(t *testing.T) { // New host with unique subnet host3 := api.Host{ - PublicKey: types.GeneratePrivateKey().PublicKey(), - NetAddress: "192.168.2.3", - Subnets: []string{"192.168.2.0/24"}, + PublicKey: types.GeneratePrivateKey().PublicKey(), + ResolvedAddresses: []string{"192.168.2.1"}, } if hs.HasRedundantIP(host3) { t.Fatal("Expected new host with unique subnet to not be considered redundant") @@ -50,9 +44,8 @@ func TestHostSet(t *testing.T) { // New host with same subnet but different public key host4 := api.Host{ - PublicKey: types.GeneratePrivateKey().PublicKey(), - NetAddress: "192.168.2.4", - Subnets: []string{"192.168.2.0/24"}, + PublicKey: types.GeneratePrivateKey().PublicKey(), + ResolvedAddresses: []string{"192.168.2.1"}, } if !hs.HasRedundantIP(host4) { t.Fatal("Expected host with same subnet but different public key to be considered redundant") @@ -65,9 +58,8 @@ func TestHostSet(t *testing.T) { // Host with two valid subnets host5 := api.Host{ - PublicKey: types.GeneratePrivateKey().PublicKey(), - NetAddress: "192.168.3.5", - Subnets: []string{"192.168.3.0/24", "10.0.0.0/24"}, + PublicKey: types.GeneratePrivateKey().PublicKey(), + ResolvedAddresses: []string{"192.168.3.1", "10.0.0.1"}, } if hs.HasRedundantIP(host5) { t.Fatal("Expected host with two valid subnets to not be considered redundant") @@ -76,9 +68,8 @@ func TestHostSet(t *testing.T) { // New host with one overlapping subnet host6 := api.Host{ - PublicKey: types.GeneratePrivateKey().PublicKey(), - NetAddress: "10.0.0.1", - Subnets: []string{"10.0.0.0/24", "172.16.0.0/24"}, + PublicKey: types.GeneratePrivateKey().PublicKey(), + ResolvedAddresses: []string{"10.0.0.1", "172.16.0.1"}, } if !hs.HasRedundantIP(host6) { t.Fatal("Expected host with one overlapping subnet to be considered redundant") diff --git a/internal/test/host.go b/internal/test/host.go index c08a11723..f5128b8fc 100644 --- a/internal/test/host.go +++ b/internal/test/host.go @@ -39,7 +39,7 @@ func NewHost(hk types.PublicKey, pt rhpv3.HostPriceTable, settings rhpv2.HostSet PriceTable: api.HostPriceTable{HostPriceTable: pt, Expiry: time.Now().Add(time.Minute)}, Settings: settings, Scanned: true, - ResolvedAddresses: []string{"38.135.51.0"}, + ResolvedAddresses: []string{"38.135.51.1"}, Subnets: []string{"38.135.51.0/24"}, } } diff --git a/worker/worker.go b/worker/worker.go index 0ed1e03ee..fe17f748d 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1437,11 +1437,6 @@ func (w *Worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, api.ErrHostOnPrivateNetwork } - subnets, err := utils.AddressesToSubnets(resolvedAddresses) - if err != nil { - return rhpv2.HostSettings{}, rhpv3.HostPriceTable{}, 0, fmt.Errorf("failed to convert addresses to subnets: %w", err) - } - // scan: first try settings, pt, duration, err := scan() if err != nil { @@ -1480,7 +1475,6 @@ func (w *Worker) scanHost(ctx context.Context, timeout time.Duration, hostKey ty HostKey: hostKey, PriceTable: pt, ResolvedAddresses: resolvedAddresses, - Subnets: subnets, // NOTE: A scan is considered successful if both fetching the price // table and the settings succeeded. Right now scanning can't fail From 50f500e8bd78c5f73741a0b8b500d9f217d795a8 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 16 Aug 2024 10:35:19 -0400 Subject: [PATCH 26/30] api: add json tags for HostScan and HostPriceTableUpdate --- api/host.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/api/host.go b/api/host.go index 180231700..d932229d6 100644 --- a/api/host.go +++ b/api/host.go @@ -182,13 +182,13 @@ type ( } HostScan struct { - HostKey types.PublicKey `json:"hostKey"` - PriceTable rhpv3.HostPriceTable - Settings rhpv2.HostSettings - ResolvedAddresses []string - Subnets []string - Success bool - Timestamp time.Time + HostKey types.PublicKey `json:"hostKey"` + PriceTable rhpv3.HostPriceTable `json:"priceTable"` + Settings rhpv2.HostSettings `json:"settings"` + ResolvedAddresses []string `json:"resolvedAddresses"` + Subnets []string `json:"subnets"` + Success bool `json:"success"` + Timestamp time.Time `json:"timestamp"` } HostPriceTable struct { @@ -198,9 +198,9 @@ type ( HostPriceTableUpdate struct { HostKey types.PublicKey `json:"hostKey"` - Success bool - Timestamp time.Time - PriceTable HostPriceTable + Success bool `json:"success"` + Timestamp time.Time `json:"timestamp"` + PriceTable HostPriceTable `json:"priceTable"` } HostCheck struct { From d211e4ca67028edb0f2bbb735a61da52d6735877 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 16 Aug 2024 10:38:33 -0400 Subject: [PATCH 27/30] contractor: update copy pasted comment --- autopilot/contractor/contractor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autopilot/contractor/contractor.go b/autopilot/contractor/contractor.go index b4a9bbd19..9c069e334 100644 --- a/autopilot/contractor/contractor.go +++ b/autopilot/contractor/contractor.go @@ -1202,7 +1202,7 @@ func performContractFormations(ctx *mCtx, bus Bus, w Worker, cr contractReviser, } gc := ctx.GougingChecker(cs) - // prepare a gouging checker + // prepare a logger logger := logger.With("hostKey", candidate.host.PublicKey). With("remainingBudget", remainingFunds). With("addresses", candidate.host.ResolvedAddresses) From 428653e1bc5606e60d258026b22d2035ac5d98e6 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 16 Aug 2024 10:40:48 -0400 Subject: [PATCH 28/30] utils: make error messages more specific in AddressesToSubnets --- internal/utils/net.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/utils/net.go b/internal/utils/net.go index ea3bee758..999896c48 100644 --- a/internal/utils/net.go +++ b/internal/utils/net.go @@ -41,7 +41,7 @@ func AddressesToSubnets(resolvedAddresses []string) ([]string, error) { for _, addr := range resolvedAddresses { parsed := net.ParseIP(addr) if parsed == nil { - return nil, errors.New("failed to parse address") + return nil, fmt.Errorf("failed to parse address: %s", addr) } // figure out the IP range @@ -54,7 +54,7 @@ func AddressesToSubnets(resolvedAddresses []string) ([]string, error) { cidr := fmt.Sprintf("%s/%d", parsed.String(), ipRange) _, ipnet, err := net.ParseCIDR(cidr) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to parse cidr: %w", err) } subnets = append(subnets, ipnet.String()) From 9274fb2a095e35f9bdc673fb07ae3aac13f9bbde Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 16 Aug 2024 10:45:46 -0400 Subject: [PATCH 29/30] worker: use debug level logging for dns cache misses --- internal/worker/dialer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/worker/dialer.go b/internal/worker/dialer.go index 479e0eb3b..1fe563cbf 100644 --- a/internal/worker/dialer.go +++ b/internal/worker/dialer.go @@ -81,7 +81,7 @@ func (d *FallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address s // If resolution fails, check the cache if cachedIP, ok := d.cache.Get(host); ok { - logger.Warn("Failed to resolve host, using cached IP", zap.Error(err)) + logger.Debug("Failed to resolve host, using cached IP", zap.Error(err)) conn, err := d.dialer.DialContext(ctx, "tcp", net.JoinHostPort(cachedIP, port)) if err == nil { return conn, nil @@ -91,7 +91,7 @@ func (d *FallbackDialer) Dial(ctx context.Context, hk types.PublicKey, address s } // Attempt to resolve using the bus - logger.Warn("Cache not available or cached IP stale, retrieving host resolved addresses from bus") + logger.Debug("Cache not available or cached IP stale, retrieving host resolved addresses from bus") hostInfo, err := d.bus.Host(ctx, hk) if err != nil { return nil, err From 6d42449ac41743a6fc8af9bfa566ebac7adbae89 Mon Sep 17 00:00:00 2001 From: Christopher Tarry Date: Fri, 16 Aug 2024 10:48:26 -0400 Subject: [PATCH 30/30] worker: change dialer argument order --- internal/worker/dialer.go | 2 +- worker/worker.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/worker/dialer.go b/internal/worker/dialer.go index 1fe563cbf..56e51ce42 100644 --- a/internal/worker/dialer.go +++ b/internal/worker/dialer.go @@ -55,7 +55,7 @@ type FallbackDialer struct { dialer net.Dialer } -func NewFallbackDialer(bus DialerBus, logger *zap.Logger, dialer net.Dialer) *FallbackDialer { +func NewFallbackDialer(bus DialerBus, dialer net.Dialer, logger *zap.Logger) *FallbackDialer { return &FallbackDialer{ cache: newHostCache(), diff --git a/worker/worker.go b/worker/worker.go index fe17f748d..b3c2583c7 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1290,7 +1290,7 @@ func New(cfg config.Worker, masterKey [32]byte, b Bus, l *zap.Logger) (*Worker, a := alerts.WithOrigin(b, fmt.Sprintf("worker.%s", cfg.ID)) shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) - dialer := iworker.NewFallbackDialer(b, l, net.Dialer{}) + dialer := iworker.NewFallbackDialer(b, net.Dialer{}, l) w := &Worker{ alerts: a, allowPrivateIPs: cfg.AllowPrivateIPs,