Skip to content

Commit

Permalink
Add subnets to the host and populate it when scanning a host (#1344)
Browse files Browse the repository at this point in the history
This PR adds a `subnets` column to the `hosts` table. This in an attempt
to get rid of the redundant IP churn we saw on `arequipa`. This is under
the assumption that the churn is related to the way we deal with
intermittent dns resolve issues, which up until now we've treated as
redundant IPs because the `ipFilter` returns a simple yes/no bool.

Fixes #1342

---------

Co-authored-by: Christopher Schinnerl <chris@sia.tech>
  • Loading branch information
peterjan and ChrisSchinnerl authored Jun 28, 2024
1 parent 4e0adda commit 6b69cd5
Show file tree
Hide file tree
Showing 22 changed files with 282 additions and 459 deletions.
6 changes: 4 additions & 2 deletions api/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ type (
Blocked bool `json:"blocked"`
Checks map[string]HostCheck `json:"checks"`
StoredData uint64 `json:"storedData"`
Subnets []string `json:"subnets"`
}

HostAddress struct {
Expand All @@ -181,10 +182,11 @@ type (

HostScan struct {
HostKey types.PublicKey `json:"hostKey"`
PriceTable rhpv3.HostPriceTable
Settings rhpv2.HostSettings
Subnets []string
Success bool
Timestamp time.Time
Settings rhpv2.HostSettings
PriceTable rhpv3.HostPriceTable
}

HostPriceTable struct {
Expand Down
4 changes: 4 additions & 0 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ var (
// be scanned since it is on a private network.
ErrHostOnPrivateNetwork = errors.New("host is on a private network")

// ErrHostTooManyAddresses is returned by the worker API when a host has
// more than two addresses of the same type.
ErrHostTooManyAddresses = errors.New("host has more than two addresses, or two of the same type")

// ErrMultiRangeNotSupported is returned by the worker API when a request
// tries to download multiple ranges at once.
ErrMultiRangeNotSupported = errors.New("multipart ranges are not supported")
Expand Down
2 changes: 0 additions & 2 deletions autopilot/contractor/contract_spending.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@ func (c *Contractor) currentPeriodSpending(contracts []api.Contract, currentPeri

// filter contracts in the current period
var filtered []api.ContractMetadata
c.mu.Lock()
for _, contract := range contracts {
if contract.WindowStart <= currentPeriod {
filtered = append(filtered, contract.ContractMetadata)
}
}
c.mu.Unlock()

// calculate the money allocated
var totalAllocated types.Currency
Expand Down
96 changes: 54 additions & 42 deletions autopilot/contractor/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math"
"sort"
"strings"
"sync"
"time"

"github.com/montanaflynn/stats"
Expand Down Expand Up @@ -110,20 +109,17 @@ type Worker interface {

type (
Contractor struct {
alerter alerts.Alerter
bus Bus
churn *accumulatedChurn
resolver *ipResolver
logger *zap.SugaredLogger
alerter alerts.Alerter
bus Bus
churn *accumulatedChurn
logger *zap.SugaredLogger

revisionBroadcastInterval time.Duration
revisionLastBroadcast map[types.FileContractID]time.Time
revisionSubmissionBuffer uint64

firstRefreshFailure map[types.FileContractID]time.Time

mu sync.Mutex

shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
}
Expand All @@ -134,9 +130,8 @@ type (
}

contractInfo struct {
host api.Host
contract api.Contract
settings rhpv2.HostSettings
priceTable rhpv3.HostPriceTable
usable bool
recoverable bool
}
Expand Down Expand Up @@ -184,8 +179,6 @@ func New(bus Bus, alerter alerts.Alerter, logger *zap.SugaredLogger, revisionSub

firstRefreshFailure: make(map[types.FileContractID]time.Time),

resolver: newIPResolver(ctx, resolverLookupTimeout, logger.Named("resolver")),

shutdownCtx: ctx,
shutdownCtxCancel: cancel,
}
Expand Down Expand Up @@ -246,13 +239,15 @@ func (c *Contractor) performContractMaintenance(ctx *mCtx, w Worker) (bool, erro
if err != nil && !strings.Contains(err.Error(), api.ErrContractSetNotFound.Error()) {
return false, err
}
hasContractInSet := make(map[types.PublicKey]types.FileContractID)
isInCurrentSet := make(map[types.FileContractID]struct{})
for _, c := range currentSet {
hasContractInSet[c.HostKey] = c.ID
isInCurrentSet[c.ID] = struct{}{}
}
c.logger.Infof("contract set '%s' holds %d contracts", ctx.ContractSet(), len(currentSet))

// fetch all contracts from the worker.
// fetch all contracts from the worker
start := time.Now()
resp, err := w.Contracts(ctx, timeoutHostRevision)
if err != nil {
Expand Down Expand Up @@ -295,6 +290,19 @@ func (c *Contractor) performContractMaintenance(ctx *mCtx, w Worker) (bool, erro
return false, err
}

// resolve host IPs on the fly for hosts that have a contract in the set but
// no subnet information, this was added to minimize churn immediately after
// adding 'subnets' to the host table
for _, h := range hosts {
if fcid, ok := hasContractInSet[h.PublicKey]; ok && len(h.Subnets) == 0 {
h.Subnets, _, err = utils.ResolveHostIP(ctx, h.NetAddress)
if err != nil {
c.logger.Warnw("failed to resolve host IP for a host with a contract in the set", "hk", h.PublicKey, "fcid", fcid, "err", err)
continue
}
}
}

// check if any used hosts have lost data to warn the user
var toDismiss []types.Hash256
for _, h := range hosts {
Expand Down Expand Up @@ -416,6 +424,7 @@ func (c *Contractor) performContractMaintenance(ctx *mCtx, w Worker) (bool, erro
// check if we need to form contracts and add them to the contract set
var formed []api.ContractMetadata
if uint64(len(updatedSet)) < threshold && !ctx.state.SkipContractFormations {
// form contracts
formed, err = c.runContractFormations(ctx, w, candidates, usedHosts, unusableHosts, ctx.WantedContracts()-uint64(len(updatedSet)), &remaining)
if err != nil {
c.logger.Errorf("failed to form contracts, err: %v", err) // continue
Expand Down Expand Up @@ -460,7 +469,7 @@ func (c *Contractor) performContractMaintenance(ctx *mCtx, w Worker) (bool, erro
return c.computeContractSetChanged(mCtx, currentSet, updatedSet, formed, refreshed, renewed, toStopUsing, contractData), nil
}

func (c *Contractor) computeContractSetChanged(ctx *mCtx, oldSet, newSet []api.ContractMetadata, formed []api.ContractMetadata, refreshed, renewed []renewal, toStopUsing map[types.FileContractID]string, contractData map[types.FileContractID]uint64) bool {
func (c *Contractor) computeContractSetChanged(ctx *mCtx, oldSet, newSet, formed []api.ContractMetadata, refreshed, renewed []renewal, toStopUsing map[types.FileContractID]string, contractData map[types.FileContractID]uint64) bool {
name := ctx.ContractSet()

// build set lookups
Expand Down Expand Up @@ -700,7 +709,7 @@ LOOP:
if contract.Revision == nil {
if !inSet || remainingKeepLeeway == 0 {
toStopUsing[fcid] = errContractNoRevision.Error()
} else if !ctx.AllowRedundantIPs() && ipFilter.IsRedundantIP(contract.HostIP, contract.HostKey) {
} else if ctx.ShouldFilterRedundantIPs() && ipFilter.HasRedundantIP(host) {
toStopUsing[fcid] = fmt.Sprintf("%v; %v", api.ErrUsabilityHostRedundantIP, errContractNoRevision)
hostChecks[contract.HostKey].Usability.RedundantIP = true
} else {
Expand All @@ -711,7 +720,7 @@ LOOP:
}

// decide whether the contract is still good
ci := contractInfo{contract: contract, priceTable: host.PriceTable.HostPriceTable, settings: host.Settings}
ci := contractInfo{contract: contract, host: host}
usable, recoverable, refresh, renew, reasons := c.isUsableContract(ctx.AutopilotConfig(), ctx.state.RS, ci, inSet, bh, ipFilter)
ci.usable = usable
ci.recoverable = recoverable
Expand Down Expand Up @@ -768,9 +777,6 @@ func (c *Contractor) runContractFormations(ctx *mCtx, w Worker, candidates score
default:
}

// convenience variables
shouldFilter := !ctx.AllowRedundantIPs()

c.logger.Infow(
"run contract formations",
"usedHosts", len(usedHosts),
Expand All @@ -786,15 +792,25 @@ func (c *Contractor) runContractFormations(ctx *mCtx, w Worker, candidates score
)
}()

// build a new host filter
filter := c.newIPFilter()
for _, h := range candidates {
if _, used := usedHosts[h.host.PublicKey]; used {
_ = filter.HasRedundantIP(h.host)
}
}

// select candidates
wanted := int(addLeeway(missing, leewayPctCandidateHosts))
selected := candidates.randSelectByScore(wanted)

// print warning if we couldn't find enough hosts were found
c.logger.Infof("looking for %d candidate hosts", wanted)
if len(selected) < wanted {
msg := "no candidate hosts found"
if len(selected) > 0 {
var msg string
if len(selected) == 0 {
msg = "no candidate hosts found"
} else {
msg = fmt.Sprintf("only found %d candidate host(s) out of the %d we wanted", len(selected), wanted)
}
if len(candidates) >= wanted {
Expand All @@ -814,16 +830,6 @@ func (c *Contractor) runContractFormations(ctx *mCtx, w Worker, candidates score
// prepare a gouging checker
gc := ctx.GougingChecker(cs)

// prepare an IP filter that contains all used hosts
ipFilter := c.newIPFilter()
if shouldFilter {
for _, h := range candidates {
if _, used := usedHosts[h.host.PublicKey]; used {
_ = ipFilter.IsRedundantIP(h.host.NetAddress, h.host.PublicKey)
}
}
}

// calculate min/max contract funds
minInitialContractFunds, maxInitialContractFunds := initialContractFundingMinMax(ctx.AutopilotConfig())

Expand Down Expand Up @@ -863,12 +869,16 @@ LOOP:
}

// check if we already have a contract with a host on that subnet
if shouldFilter && ipFilter.IsRedundantIP(host.NetAddress, host.PublicKey) {
if ctx.ShouldFilterRedundantIPs() && filter.HasRedundantIP(host) {
continue
}

// form the contract
formedContract, proceed, err := c.formContract(ctx, w, host, minInitialContractFunds, maxInitialContractFunds, budget)
if err == nil {
if err != nil {
// remove the host from the filter
filter.Remove(host)
} else {
// add contract to contract set
formed = append(formed, formedContract)
missing--
Expand Down Expand Up @@ -1081,7 +1091,7 @@ func (c *Contractor) refreshFundingEstimate(cfg api.AutopilotConfig, ci contract
// check for a sane minimum that is equal to the initial contract funding
// but without an upper cap.
minInitialContractFunds, _ := initialContractFundingMinMax(cfg)
minimum := c.initialContractFunding(ci.settings, txnFeeEstimate, minInitialContractFunds, types.ZeroCurrency)
minimum := c.initialContractFunding(ci.host.Settings, txnFeeEstimate, minInitialContractFunds, types.ZeroCurrency)
refreshAmountCapped := refreshAmount
if refreshAmountCapped.Cmp(minimum) < 0 {
refreshAmountCapped = minimum
Expand Down Expand Up @@ -1223,11 +1233,12 @@ func (c *Contractor) renewContract(ctx *mCtx, w Worker, ci contractInfo, budget
if ci.contract.Revision == nil {
return api.ContractMetadata{}, true, errors.New("can't renew contract without a revision")
}
log := c.logger.With("to_renew", ci.contract.ID, "hk", ci.contract.HostKey, "hostVersion", ci.settings.Version, "hostRelease", ci.settings.Release)
log := c.logger.With("to_renew", ci.contract.ID, "hk", ci.contract.HostKey, "hostVersion", ci.host.Settings.Version, "hostRelease", ci.host.Settings.Release)

// convenience variables
contract := ci.contract
settings := ci.settings
settings := ci.host.Settings
pt := ci.host.PriceTable.HostPriceTable
fcid := contract.ID
rev := contract.Revision
hk := contract.HostKey
Expand Down Expand Up @@ -1257,7 +1268,7 @@ func (c *Contractor) renewContract(ctx *mCtx, w Worker, ci contractInfo, budget
}

// calculate the expected new storage
expectedNewStorage := renterFundsToExpectedStorage(renterFunds, endHeight-cs.BlockHeight, ci.priceTable)
expectedNewStorage := renterFundsToExpectedStorage(renterFunds, endHeight-cs.BlockHeight, pt)

// renew the contract
resp, err := w.RHPRenew(ctx, fcid, endHeight, hk, contract.SiamuxAddr, settings.Address, ctx.state.Address, renterFunds, types.ZeroCurrency, *budget, expectedNewStorage, settings.WindowSize)
Expand Down Expand Up @@ -1300,11 +1311,12 @@ func (c *Contractor) refreshContract(ctx *mCtx, w Worker, ci contractInfo, budge
if ci.contract.Revision == nil {
return api.ContractMetadata{}, true, errors.New("can't refresh contract without a revision")
}
log := c.logger.With("to_renew", ci.contract.ID, "hk", ci.contract.HostKey, "hostVersion", ci.settings.Version, "hostRelease", ci.settings.Release)
log := c.logger.With("to_renew", ci.contract.ID, "hk", ci.contract.HostKey, "hostVersion", ci.host.Settings.Version, "hostRelease", ci.host.Settings.Release)

// convenience variables
contract := ci.contract
settings := ci.settings
settings := ci.host.Settings
pt := ci.host.PriceTable.HostPriceTable
fcid := contract.ID
rev := contract.Revision
hk := contract.HostKey
Expand All @@ -1317,7 +1329,7 @@ func (c *Contractor) refreshContract(ctx *mCtx, w Worker, ci contractInfo, budge

// calculate the renter funds
var renterFunds types.Currency
if isOutOfFunds(ctx.AutopilotConfig(), ci.priceTable, ci.contract) {
if isOutOfFunds(ctx.AutopilotConfig(), pt, ci.contract) {
renterFunds = c.refreshFundingEstimate(ctx.AutopilotConfig(), ci, ctx.state.Fee)
} else {
renterFunds = rev.ValidRenterPayout() // don't increase funds
Expand All @@ -1329,11 +1341,11 @@ func (c *Contractor) refreshContract(ctx *mCtx, w Worker, ci contractInfo, budge
return api.ContractMetadata{}, false, fmt.Errorf("insufficient budget: %s < %s", budget.String(), renterFunds.String())
}

expectedNewStorage := renterFundsToExpectedStorage(renterFunds, contract.EndHeight()-cs.BlockHeight, ci.priceTable)
expectedNewStorage := renterFundsToExpectedStorage(renterFunds, contract.EndHeight()-cs.BlockHeight, pt)
unallocatedCollateral := contract.RemainingCollateral()

// a refresh should always result in a contract that has enough collateral
minNewCollateral := minRemainingCollateral(ctx.AutopilotConfig(), ctx.state.RS, renterFunds, settings, ci.priceTable).Mul64(2)
minNewCollateral := minRemainingCollateral(ctx.AutopilotConfig(), ctx.state.RS, renterFunds, settings, pt).Mul64(2)

// maxFundAmount is the remaining funds of the contract to refresh plus the
// budget since the previous contract was in the same period
Expand Down
4 changes: 2 additions & 2 deletions autopilot/contractor/hostfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (u *unusableHostsBreakdown) keysAndValues() []interface{} {
// - refresh -> should be refreshed
// - renew -> should be renewed
func (c *Contractor) isUsableContract(cfg api.AutopilotConfig, rs api.RedundancySettings, ci contractInfo, inSet bool, bh uint64, f *ipFilter) (usable, recoverable, refresh, renew bool, reasons []string) {
contract, s, pt := ci.contract, ci.settings, ci.priceTable
contract, s, pt := ci.contract, ci.host.Settings, ci.host.PriceTable.HostPriceTable

usable = true
if bh > contract.EndHeight() {
Expand Down Expand Up @@ -144,7 +144,7 @@ func (c *Contractor) isUsableContract(cfg api.AutopilotConfig, rs api.Redundancy

// IP check should be last since it modifies the filter
shouldFilter := !cfg.Hosts.AllowRedundantIPs && (usable || recoverable)
if shouldFilter && f.IsRedundantIP(contract.HostIP, contract.HostKey) {
if shouldFilter && f.HasRedundantIP(ci.host) {
reasons = append(reasons, api.ErrUsabilityHostRedundantIP.Error())
usable = false
recoverable = false // do not use in the contract set, but keep it around for downloads
Expand Down
Loading

0 comments on commit 6b69cd5

Please sign in to comment.