diff --git a/.golangci.yml b/.golangci.yml index ad04bb78e..9aad5bd19 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -102,6 +102,9 @@ linters: - typecheck - whitespace - tagliatelle + - unused + - unparam + - deadcode issues: # Maximum issues count per one linter. Set to 0 to disable. Default is 50. diff --git a/alerts/alerts_test.go b/alerts/alerts_test.go index 2cc20c57b..ff927ccdc 100644 --- a/alerts/alerts_test.go +++ b/alerts/alerts_test.go @@ -22,27 +22,29 @@ type testWebhookStore struct { listed int } -func (s *testWebhookStore) DeleteWebhook(wb webhooks.Webhook) error { +func (s *testWebhookStore) DeleteWebhook(_ context.Context, wb webhooks.Webhook) error { s.mu.Lock() defer s.mu.Unlock() s.deleted++ return nil } -func (s *testWebhookStore) AddWebhook(wb webhooks.Webhook) error { +func (s *testWebhookStore) AddWebhook(_ context.Context, wb webhooks.Webhook) error { s.mu.Lock() defer s.mu.Unlock() s.added++ return nil } -func (s *testWebhookStore) Webhooks() ([]webhooks.Webhook, error) { +func (s *testWebhookStore) Webhooks(_ context.Context) ([]webhooks.Webhook, error) { s.mu.Lock() defer s.mu.Unlock() s.listed++ return nil, nil } +var _ webhooks.WebhookStore = (*testWebhookStore)(nil) + func TestWebhooks(t *testing.T) { store := &testWebhookStore{} mgr, err := webhooks.NewManager(zap.NewNop().Sugar(), store) @@ -75,7 +77,7 @@ func TestWebhooks(t *testing.T) { if hookID := wh.String(); hookID != fmt.Sprintf("%v.%v.%v", wh.URL, wh.Module, "") { t.Fatalf("wrong result for wh.String(): %v != %v", wh.String(), hookID) } - err = mgr.Register(wh) + err = mgr.Register(context.Background(), wh) if err != nil { t.Fatal(err) } @@ -110,7 +112,7 @@ func TestWebhooks(t *testing.T) { } // unregister hook - if err := mgr.Delete(webhooks.Webhook{ + if err := mgr.Delete(context.Background(), webhooks.Webhook{ Event: hooks[0].Event, Module: hooks[0].Module, URL: hooks[0].URL, diff --git a/autopilot/accounts.go b/autopilot/accounts.go index 974222dbe..690c2b35d 100644 --- a/autopilot/accounts.go +++ b/autopilot/accounts.go @@ -141,7 +141,7 @@ func (a *accounts) refillWorkerAccounts(ctx context.Context, w Worker) { go func(contract api.ContractMetadata) { rCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() - accountID, refilled, rerr := refillWorkerAccount(rCtx, a.a, w, workerID, contract) + accountID, refilled, rerr := refillWorkerAccount(rCtx, a.a, w, contract) if rerr != nil { if rerr.Is(errMaxDriftExceeded) { // register the alert if error is errMaxDriftExceeded @@ -184,7 +184,7 @@ func (err *refillError) Is(target error) bool { return errors.Is(err.err, target) } -func refillWorkerAccount(ctx context.Context, a AccountStore, w Worker, workerID string, contract api.ContractMetadata) (accountID rhpv3.Account, refilled bool, rerr *refillError) { +func refillWorkerAccount(ctx context.Context, a AccountStore, w Worker, contract api.ContractMetadata) (accountID rhpv3.Account, refilled bool, rerr *refillError) { wrapErr := func(err error, keysAndValues ...interface{}) *refillError { if err == nil { return nil diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index eb08c9456..c89049286 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -322,11 +322,11 @@ func (ap *Autopilot) Run() error { } // migration - ap.m.tryPerformMigrations(ap.shutdownCtx, ap.workers) + ap.m.tryPerformMigrations(ap.workers) // pruning if ap.state.cfg.Contracts.Prune { - ap.c.tryPerformPruning(ap.shutdownCtx, ap.workers) + ap.c.tryPerformPruning(ap.workers) } else { ap.logger.Debug("pruning disabled") } diff --git a/autopilot/contract_spending.go b/autopilot/contract_spending.go index ba144e173..cbd10f86c 100644 --- a/autopilot/contract_spending.go +++ b/autopilot/contract_spending.go @@ -20,7 +20,7 @@ func (c *contractor) contractSpending(ctx context.Context, contract api.Contract return total, nil } -func (c *contractor) currentPeriodSpending(contracts []api.Contract, currentPeriod uint64) (types.Currency, error) { +func (c *contractor) currentPeriodSpending(contracts []api.Contract, currentPeriod uint64) types.Currency { totalCosts := make(map[types.FileContractID]types.Currency) for _, c := range contracts { totalCosts[c.ID] = c.TotalCost @@ -41,22 +41,19 @@ func (c *contractor) currentPeriodSpending(contracts []api.Contract, currentPeri for _, contract := range filtered { totalAllocated = totalAllocated.Add(contract.TotalCost) } - return totalAllocated, nil + return totalAllocated } -func (c *contractor) remainingFunds(contracts []api.Contract) (types.Currency, error) { +func (c *contractor) remainingFunds(contracts []api.Contract) types.Currency { state := c.ap.State() // find out how much we spent in the current period - spent, err := c.currentPeriodSpending(contracts, state.period) - if err != nil { - return types.ZeroCurrency, err - } + spent := c.currentPeriodSpending(contracts, state.period) // figure out remaining funds var remaining types.Currency if state.cfg.Contracts.Allowance.Cmp(spent) > 0 { remaining = state.cfg.Contracts.Allowance.Sub(spent) } - return remaining, nil + return remaining } diff --git a/autopilot/contractor.go b/autopilot/contractor.go index 8dfc702f3..83e12a206 100644 --- a/autopilot/contractor.go +++ b/autopilot/contractor.go @@ -276,7 +276,7 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) ( // min score to pass checks var minScore float64 if len(hosts) > 0 { - minScore = c.calculateMinScore(ctx, candidates, state.cfg.Contracts.Amount) + minScore = c.calculateMinScore(candidates, state.cfg.Contracts.Amount) } else { c.logger.Warn("could not calculate min score, no hosts found") } @@ -324,10 +324,7 @@ func (c *contractor) performContractMaintenance(ctx context.Context, w Worker) ( } // calculate remaining funds - remaining, err := c.remainingFunds(contracts) - if err != nil { - return false, err - } + remaining := c.remainingFunds(contracts) // calculate 'limit' amount of contracts we want to renew var limit int @@ -1140,7 +1137,7 @@ func (c *contractor) initialContractFunding(settings rhpv2.HostSettings, txnFee, return funding } -func (c *contractor) refreshFundingEstimate(ctx context.Context, cfg api.AutopilotConfig, ci contractInfo, fee types.Currency) (types.Currency, error) { +func (c *contractor) refreshFundingEstimate(cfg api.AutopilotConfig, ci contractInfo, fee types.Currency) types.Currency { // refresh with 1.2x the funds refreshAmount := ci.contract.TotalCost.Mul64(6).Div64(5) @@ -1159,7 +1156,7 @@ func (c *contractor) refreshFundingEstimate(ctx context.Context, cfg api.Autopil "fcid", ci.contract.ID, "refreshAmount", refreshAmount, "refreshAmountCapped", refreshAmountCapped) - return refreshAmountCapped, nil + return refreshAmountCapped } func (c *contractor) renewFundingEstimate(ctx context.Context, ci contractInfo, fee types.Currency, renewing bool) (types.Currency, error) { @@ -1249,7 +1246,7 @@ func (c *contractor) renewFundingEstimate(ctx context.Context, ci contractInfo, return cappedEstimatedCost, nil } -func (c *contractor) calculateMinScore(ctx context.Context, candidates []scoredHost, numContracts uint64) float64 { +func (c *contractor) calculateMinScore(candidates []scoredHost, numContracts uint64) float64 { // return early if there's no hosts if len(candidates) == 0 { c.logger.Warn("min host score is set to the smallest non-zero float because there are no candidate hosts") @@ -1475,11 +1472,7 @@ func (c *contractor) refreshContract(ctx context.Context, w Worker, ci contractI // calculate the renter funds var renterFunds types.Currency if isOutOfFunds(state.cfg, ci.priceTable, ci.contract) { - renterFunds, err = c.refreshFundingEstimate(ctx, state.cfg, ci, state.fee) - if err != nil { - c.logger.Errorw(fmt.Sprintf("could not get refresh funding estimate, err: %v", err), "hk", hk, "fcid", fcid) - return api.ContractMetadata{}, true, err - } + renterFunds = c.refreshFundingEstimate(state.cfg, ci, state.fee) } else { renterFunds = rev.ValidRenterPayout() // don't increase funds } @@ -1599,7 +1592,7 @@ func (c *contractor) formContract(ctx context.Context, w Worker, host hostdb.Hos return formedContract, true, nil } -func (c *contractor) tryPerformPruning(ctx context.Context, wp *workerPool) { +func (c *contractor) tryPerformPruning(wp *workerPool) { c.mu.Lock() if c.pruning || c.ap.isStopped() { c.mu.Unlock() diff --git a/autopilot/contractor_test.go b/autopilot/contractor_test.go index a0f63425b..575605612 100644 --- a/autopilot/contractor_test.go +++ b/autopilot/contractor_test.go @@ -1,7 +1,6 @@ package autopilot import ( - "context" "math" "testing" @@ -19,19 +18,19 @@ func TestCalculateMinScore(t *testing.T) { } // Test with 100 hosts which makes for a random set size of 250 - minScore := c.calculateMinScore(context.Background(), candidates, 100) + minScore := c.calculateMinScore(candidates, 100) if minScore != 0.002 { t.Fatalf("expected minScore to be 0.002 but was %v", minScore) } // Test with 0 hosts - minScore = c.calculateMinScore(context.Background(), []scoredHost{}, 100) + minScore = c.calculateMinScore([]scoredHost{}, 100) if minScore != math.SmallestNonzeroFloat64 { t.Fatalf("expected minScore to be math.SmallestNonzeroFLoat64 but was %v", minScore) } // Test with 300 hosts which is 50 more than we have - minScore = c.calculateMinScore(context.Background(), candidates, 300) + minScore = c.calculateMinScore(candidates, 300) if minScore != math.SmallestNonzeroFloat64 { t.Fatalf("expected minScore to be math.SmallestNonzeroFLoat64 but was %v", minScore) } diff --git a/autopilot/hostscore.go b/autopilot/hostscore.go index b15857d19..e8d9ca9b9 100644 --- a/autopilot/hostscore.go +++ b/autopilot/hostscore.go @@ -36,7 +36,7 @@ func hostScore(cfg api.AutopilotConfig, h hostdb.Host, storedData uint64, expect Collateral: collateralScore(cfg, h.PriceTable.HostPriceTable, uint64(allocationPerHost)), Interactions: interactionScore(h), Prices: priceAdjustmentScore(hostPeriodCost, cfg), - StorageRemaining: storageRemainingScore(cfg, h.Settings, storedData, expectedRedundancy, allocationPerHost), + StorageRemaining: storageRemainingScore(h.Settings, storedData, allocationPerHost), Uptime: uptimeScore(h), Version: versionScore(h.Settings), } @@ -74,7 +74,7 @@ func priceAdjustmentScore(hostCostPerPeriod types.Currency, cfg api.AutopilotCon panic("unreachable") } -func storageRemainingScore(cfg api.AutopilotConfig, h rhpv2.HostSettings, storedData uint64, expectedRedundancy, allocationPerHost float64) float64 { +func storageRemainingScore(h rhpv2.HostSettings, storedData uint64, allocationPerHost float64) float64 { // hostExpectedStorage is the amount of storage that we expect to be able to // store on this host overall, which should include the stored data that is // already on the host. @@ -291,7 +291,7 @@ func uploadCostForScore(cfg api.AutopilotConfig, h hostdb.Host, bytes uint64) ty return uploadSectorCostRHPv3.Mul64(numSectors) } -func downloadCostForScore(cfg api.AutopilotConfig, h hostdb.Host, bytes uint64) types.Currency { +func downloadCostForScore(h hostdb.Host, bytes uint64) types.Currency { rsc := h.PriceTable.BaseCost().Add(h.PriceTable.ReadSectorCost(rhpv2.SectorSize)) downloadSectorCostRHPv3, _ := rsc.Total() numSectors := bytesToSectors(bytes) @@ -314,7 +314,7 @@ func hostPeriodCostForScore(h hostdb.Host, cfg api.AutopilotConfig, expectedRedu hostCollateral := rhpv2.ContractFormationCollateral(cfg.Contracts.Period, storagePerHost, h.Settings) hostContractPrice := contractPriceForScore(h) hostUploadCost := uploadCostForScore(cfg, h, uploadPerHost) - hostDownloadCost := downloadCostForScore(cfg, h, downloadPerHost) + hostDownloadCost := downloadCostForScore(h, downloadPerHost) hostStorageCost := storageCostForScore(cfg, h, storagePerHost) siafundFee := hostCollateral. Add(hostContractPrice). diff --git a/autopilot/migrator.go b/autopilot/migrator.go index c55b9c734..89ab16a28 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -98,7 +98,7 @@ func (m *migrator) slabMigrationEstimate(remaining int) time.Duration { return time.Duration(totalNumMS) * time.Millisecond } -func (m *migrator) tryPerformMigrations(ctx context.Context, wp *workerPool) { +func (m *migrator) tryPerformMigrations(wp *workerPool) { m.mu.Lock() if m.migrating || m.ap.isStopped() { m.mu.Unlock() diff --git a/autopilot/scanner.go b/autopilot/scanner.go index 230400619..bb21e5022 100644 --- a/autopilot/scanner.go +++ b/autopilot/scanner.go @@ -163,9 +163,9 @@ func (s *scanner) isInterrupted() bool { } } -func (s *scanner) tryPerformHostScan(ctx context.Context, w scanWorker, force bool) bool { +func (s *scanner) tryPerformHostScan(ctx context.Context, w scanWorker, force bool) { if s.ap.isStopped() { - return false + return } scanType := "host scan" @@ -185,7 +185,7 @@ func (s *scanner) tryPerformHostScan(ctx context.Context, w scanWorker, force bo s.interruptScanChan = make(chan struct{}) } else if s.scanning || !s.isScanRequired() { s.mu.Unlock() - return false + return } s.scanningLastStart = time.Now() s.scanning = true @@ -229,7 +229,7 @@ func (s *scanner) tryPerformHostScan(ctx context.Context, w scanWorker, force bo s.logger.Debugf("%s finished after %v", st, time.Since(s.scanningLastStart)) s.mu.Unlock() }(scanType) - return true + return } func (s *scanner) tryUpdateTimeout() { diff --git a/autopilot/scanner_test.go b/autopilot/scanner_test.go index d5833d1fb..6214ec4a1 100644 --- a/autopilot/scanner_test.go +++ b/autopilot/scanner_test.go @@ -87,7 +87,7 @@ func TestScanner(t *testing.T) { // init new scanner b := &mockBus{hosts: hosts} w := &mockWorker{blockChan: make(chan struct{})} - s := newTestScanner(b, w) + s := newTestScanner(b) // assert it started a host scan s.tryPerformHostScan(context.Background(), w, false) @@ -139,7 +139,7 @@ func (s *scanner) isScanning() bool { return s.scanning } -func newTestScanner(b *mockBus, w *mockWorker) *scanner { +func newTestScanner(b *mockBus) *scanner { ap := &Autopilot{} ap.shutdownCtx, ap.shutdownCtxCancel = context.WithCancel(context.Background()) return &scanner{ diff --git a/bus/bus.go b/bus/bus.go index 05770eb96..d8b3fdfc5 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -189,7 +189,7 @@ type ( EphemeralAccountStore interface { Accounts(context.Context) ([]api.Account, error) SaveAccounts(context.Context, []api.Account) error - SetUncleanShutdown() error + SetUncleanShutdown(context.Context) error } MetricsStore interface { @@ -2022,7 +2022,7 @@ func (b *bus) webhookHandlerDelete(jc jape.Context) { if jc.Decode(&wh) != nil { return } - err := b.hooks.Delete(wh) + err := b.hooks.Delete(jc.Request.Context(), wh) if errors.Is(err, webhooks.ErrWebhookNotFound) { jc.Error(fmt.Errorf("webhook for URL %v and event %v.%v not found", wh.URL, wh.Module, wh.Event), http.StatusNotFound) return @@ -2044,7 +2044,7 @@ func (b *bus) webhookHandlerPost(jc jape.Context) { if jc.Decode(&req) != nil { return } - err := b.hooks.Register(webhooks.Webhook{ + err := b.hooks.Register(jc.Request.Context(), webhooks.Webhook{ Event: req.Event, Module: req.Module, URL: req.URL, @@ -2412,7 +2412,7 @@ func New(s Syncer, am *alerts.Manager, hm *webhooks.Manager, cm ChainManager, tp // mark the shutdown as unclean, this will be overwritten when/if the // accounts are saved on shutdown - if err := eas.SetUncleanShutdown(); err != nil { + if err := eas.SetUncleanShutdown(ctx); err != nil { return nil, fmt.Errorf("failed to mark account shutdown as unclean: %w", err) } return b, nil diff --git a/bus/client/objects.go b/bus/client/objects.go index 23011a9ba..6a17691e2 100644 --- a/bus/client/objects.go +++ b/bus/client/objects.go @@ -112,7 +112,7 @@ func (c *Client) SearchObjects(ctx context.Context, bucket string, opts api.Sear } func (c *Client) renameObjects(ctx context.Context, bucket, from, to, mode string, force bool) (err error) { - err = c.c.POST("/objects/rename", api.ObjectsRenameRequest{ + err = c.c.WithContext(ctx).POST("/objects/rename", api.ObjectsRenameRequest{ Bucket: bucket, Force: force, From: from, diff --git a/bus/contractlocking_test.go b/bus/contractlocking_test.go index a00198cc9..120ca9ca2 100644 --- a/bus/contractlocking_test.go +++ b/bus/contractlocking_test.go @@ -154,7 +154,7 @@ func TestContractKeepalive(t *testing.T) { func TestContractRelease(t *testing.T) { locks := newContractLocks() - verify := func(fcid types.FileContractID, lockID uint64, lockedUntil time.Time, delta time.Duration) { + verify := func(fcid types.FileContractID, lockID uint64) { t.Helper() lock := locks.lockForContractID(fcid, false) if lock.heldByID != lockID { @@ -168,7 +168,7 @@ func TestContractRelease(t *testing.T) { if err != nil { t.Fatal(err) } - verify(fcid, lockID, time.Now().Add(time.Minute), 3*time.Second) + verify(fcid, lockID) // Acquire it again but release the contract within a second. var wg sync.WaitGroup @@ -185,14 +185,14 @@ func TestContractRelease(t *testing.T) { if err != nil { t.Fatal(err) } - verify(fcid, lockID, time.Now().Add(time.Minute), 3*time.Second) + verify(fcid, lockID) // Release one more time. Should decrease the references to 0 and reset // fields. if err := locks.Release(fcid, lockID); err != nil { t.Error(err) } - verify(fcid, 0, time.Time{}, 0) + verify(fcid, 0) // Try to release lock again. Is a no-op. if err := locks.Release(fcid, lockID); err != nil { diff --git a/cmd/renterd/config.go b/cmd/renterd/config.go index 47668ff94..f9008a4d5 100644 --- a/cmd/renterd/config.go +++ b/cmd/renterd/config.go @@ -41,6 +41,7 @@ func readInput(context string) string { } // wrapANSI wraps the output in ANSI escape codes if enabled. +// nolint: unparam func wrapANSI(prefix, output, suffix string) string { if enableANSI { return prefix + output + suffix diff --git a/internal/node/node.go b/internal/node/node.go index e94cfbb4d..d105cbfb2 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -137,11 +137,14 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht cancelSubscribe := make(chan struct{}) go func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + subscribeErr := cs.ConsensusSetSubscribe(sqlStore, ccid, cancelSubscribe) if errors.Is(subscribeErr, modules.ErrInvalidConsensusChangeID) { l.Warn("Invalid consensus change ID detected - resyncing consensus") // Reset the consensus state within the database and rescan. - if err := sqlStore.ResetConsensusSubscription(); err != nil { + if err := sqlStore.ResetConsensusSubscription(ctx); err != nil { l.Fatal(fmt.Sprintf("Failed to reset consensus subscription of SQLStore: %v", err)) return } @@ -177,11 +180,8 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht } shutdownFn := func(ctx context.Context) error { + close(cancelSubscribe) return errors.Join( - func() error { - close(cancelSubscribe) - return nil - }(), g.Close(), cs.Close(), tp.Close(), diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index dd3cd6e31..2346f7019 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -1098,7 +1098,7 @@ func TestParallelUpload(t *testing.T) { w := cluster.Worker tt := cluster.tt - upload := func() error { + upload := func() { t.Helper() // prepare some data - make sure it's more than one sector data := make([]byte, rhpv2.SectorSize) @@ -1107,7 +1107,6 @@ func TestParallelUpload(t *testing.T) { // upload the data path := fmt.Sprintf("/dir/data_%v", hex.EncodeToString(data[:16])) tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(data), api.DefaultBucketName, path, api.UploadObjectOptions{})) - return nil } // Upload in parallel @@ -1116,10 +1115,7 @@ func TestParallelUpload(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - if err := upload(); err != nil { - t.Error(err) - return - } + upload() }() } wg.Wait() diff --git a/stores/accounts.go b/stores/accounts.go index d519df9dd..69f4aeff8 100644 --- a/stores/accounts.go +++ b/stores/accounts.go @@ -55,7 +55,7 @@ func (a dbAccount) convert() api.Account { // Accounts returns all accounts from the db. func (s *SQLStore) Accounts(ctx context.Context) ([]api.Account, error) { var dbAccounts []dbAccount - if err := s.db.Find(&dbAccounts).Error; err != nil { + if err := s.db.WithContext(ctx).Find(&dbAccounts).Error; err != nil { return nil, err } accounts := make([]api.Account, len(dbAccounts)) @@ -69,8 +69,10 @@ func (s *SQLStore) Accounts(ctx context.Context) ([]api.Account, error) { // also sets the 'requires_sync' flag. That way, the autopilot will know to sync // all accounts after an unclean shutdown and the bus will know not to apply // drift. -func (s *SQLStore) SetUncleanShutdown() error { - return s.db.Model(&dbAccount{}). +func (s *SQLStore) SetUncleanShutdown(ctx context.Context) error { + return s.db. + WithContext(ctx). + Model(&dbAccount{}). Where("TRUE"). Updates(map[string]interface{}{ "clean_shutdown": false, @@ -95,7 +97,7 @@ func (s *SQLStore) SaveAccounts(ctx context.Context, accounts []api.Account) err RequiresSync: acc.RequiresSync, } } - return s.db.Clauses(clause.OnConflict{ + return s.db.WithContext(ctx).Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "account_id"}}, UpdateAll: true, }).Create(&dbAccounts).Error diff --git a/stores/autopilot.go b/stores/autopilot.go index 6dc88a692..5a5c5ed2d 100644 --- a/stores/autopilot.go +++ b/stores/autopilot.go @@ -34,6 +34,7 @@ func (c dbAutopilot) convert() api.Autopilot { func (s *SQLStore) Autopilots(ctx context.Context) ([]api.Autopilot, error) { var entities []dbAutopilot err := s.db. + WithContext(ctx). Model(&dbAutopilot{}). Find(&entities). Error @@ -51,6 +52,7 @@ func (s *SQLStore) Autopilots(ctx context.Context) ([]api.Autopilot, error) { func (s *SQLStore) Autopilot(ctx context.Context, id string) (api.Autopilot, error) { var entity dbAutopilot err := s.db. + WithContext(ctx). Model(&dbAutopilot{}). Where("identifier = ?", id). First(&entity). @@ -73,10 +75,12 @@ func (s *SQLStore) UpdateAutopilot(ctx context.Context, ap api.Autopilot) error } // upsert - return s.db.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "identifier"}}, - UpdateAll: true, - }).Create(&dbAutopilot{ + return s.db. + WithContext(ctx). + Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "identifier"}}, + UpdateAll: true, + }).Create(&dbAutopilot{ Identifier: ap.ID, Config: ap.Config, CurrentPeriod: ap.CurrentPeriod, diff --git a/stores/hostdb.go b/stores/hostdb.go index 37aa18ee8..fd23abf4a 100644 --- a/stores/hostdb.go +++ b/stores/hostdb.go @@ -87,12 +87,6 @@ type ( Hosts []dbHost `gorm:"many2many:host_allowlist_entry_hosts;constraint:OnDelete:CASCADE"` } - // dbHostAllowlistEntryHost is a join table between dbAllowlistEntry and dbHost. - dbHostAllowlistEntryHost struct { - DBAllowlistEntryID uint `gorm:"primaryKey"` - DBHostID uint `gorm:"primaryKey;index"` - } - // dbBlocklistEntry defines a table that stores the host blocklist. dbBlocklistEntry struct { Model @@ -100,12 +94,6 @@ type ( Hosts []dbHost `gorm:"many2many:host_blocklist_entry_hosts;constraint:OnDelete:CASCADE"` } - // dbHostBlocklistEntryHost is a join table between dbBlocklistEntry and dbHost. - dbHostBlocklistEntryHost struct { - DBBlocklistEntryID uint `gorm:"primaryKey"` - DBHostID uint `gorm:"primaryKey;index"` - } - dbConsensusInfo struct { Model CCID []byte @@ -278,15 +266,9 @@ func (dbHost) TableName() string { return "hosts" } // TableName implements the gorm.Tabler interface. func (dbAllowlistEntry) TableName() string { return "host_allowlist_entries" } -// TableName implements the gorm.Tabler interface. -func (dbHostAllowlistEntryHost) TableName() string { return "host_allowlist_entry_hosts" } - // TableName implements the gorm.Tabler interface. func (dbBlocklistEntry) TableName() string { return "host_blocklist_entries" } -// TableName implements the gorm.Tabler interface. -func (dbHostBlocklistEntryHost) TableName() string { return "host_blocklist_entry_hosts" } - // convert converts a host into a hostdb.Host. func (h dbHost) convert() hostdb.Host { var lastScan time.Time @@ -427,6 +409,7 @@ func (ss *SQLStore) Host(ctx context.Context, hostKey types.PublicKey) (hostdb.H var h dbHost tx := ss.db. + WithContext(ctx). Where(&dbHost{PublicKey: publicKey(hostKey)}). Preload("Allowlist"). Preload("Blocklist"). @@ -456,6 +439,7 @@ func (ss *SQLStore) HostsForScanning(ctx context.Context, maxLastScan time.Time, var hostAddresses []hostdb.HostAddress err := ss.db. + WithContext(ctx). Model(&dbHost{}). Where("last_scan < ?", maxLastScan.UnixNano()). Offset(offset). @@ -546,6 +530,7 @@ func (ss *SQLStore) RemoveOfflineHosts(ctx context.Context, minRecentFailures ui // fetch all hosts outside of the transaction var hosts []dbHost if err := ss.db. + WithContext(ctx). Model(&dbHost{}). Where("recent_downtime >= ? AND recent_scan_failures >= ?", maxDowntime, minRecentFailures). Find(&hosts). @@ -561,7 +546,7 @@ func (ss *SQLStore) RemoveOfflineHosts(ctx context.Context, minRecentFailures ui // remove every host one by one var errs []error for _, h := range hosts { - if err := ss.retryTransaction(func(tx *gorm.DB) error { + if err := ss.retryTransaction(ctx, func(tx *gorm.DB) error { // fetch host contracts hcs, err := contractsForHost(tx, h) if err != nil { @@ -575,7 +560,7 @@ func (ss *SQLStore) RemoveOfflineHosts(ctx context.Context, minRecentFailures ui } // archive host contracts - if err := archiveContracts(ctx, tx, hcs, toArchive); err != nil { + if err := archiveContracts(tx, hcs, toArchive); err != nil { return err } @@ -609,7 +594,7 @@ func (ss *SQLStore) UpdateHostAllowlistEntries(ctx context.Context, add, remove // clear allowlist if clear { - return ss.retryTransaction(func(tx *gorm.DB) error { + return ss.retryTransaction(ctx, func(tx *gorm.DB) error { return tx.Where("TRUE").Delete(&dbAllowlistEntry{}).Error }) } @@ -624,7 +609,7 @@ func (ss *SQLStore) UpdateHostAllowlistEntries(ctx context.Context, add, remove toDelete[i] = publicKey(entry) } - return ss.retryTransaction(func(tx *gorm.DB) error { + return ss.retryTransaction(ctx, func(tx *gorm.DB) error { if len(toInsert) > 0 { if err := tx.Create(&toInsert).Error; err != nil { return err @@ -648,7 +633,7 @@ func (ss *SQLStore) UpdateHostBlocklistEntries(ctx context.Context, add, remove // clear blocklist if clear { - return ss.retryTransaction(func(tx *gorm.DB) error { + return ss.retryTransaction(ctx, func(tx *gorm.DB) error { return tx.Where("TRUE").Delete(&dbBlocklistEntry{}).Error }) } @@ -658,7 +643,7 @@ func (ss *SQLStore) UpdateHostBlocklistEntries(ctx context.Context, add, remove toInsert = append(toInsert, dbBlocklistEntry{Entry: entry}) } - return ss.retryTransaction(func(tx *gorm.DB) error { + return ss.retryTransaction(ctx, func(tx *gorm.DB) error { if len(toInsert) > 0 { if err := tx.Create(&toInsert).Error; err != nil { return err @@ -676,6 +661,7 @@ func (ss *SQLStore) UpdateHostBlocklistEntries(ctx context.Context, add, remove func (ss *SQLStore) HostAllowlist(ctx context.Context) (allowlist []types.PublicKey, err error) { var pubkeys []publicKey err = ss.db. + WithContext(ctx). Model(&dbAllowlistEntry{}). Pluck("entry", &pubkeys). Error @@ -688,6 +674,7 @@ func (ss *SQLStore) HostAllowlist(ctx context.Context) (allowlist []types.Public func (ss *SQLStore) HostBlocklist(ctx context.Context) (blocklist []string, err error) { err = ss.db. + WithContext(ctx). Model(&dbBlocklistEntry{}). Pluck("entry", &blocklist). Error @@ -719,7 +706,7 @@ func (ss *SQLStore) RecordHostScans(ctx context.Context, scans []hostdb.HostScan end = len(hks) } var batchHosts []dbHost - if err := ss.db.Where("public_key IN (?)", hks[i:end]). + if err := ss.db.WithContext(ctx).Where("public_key IN (?)", hks[i:end]). Find(&batchHosts).Error; err != nil { return err } @@ -732,7 +719,7 @@ func (ss *SQLStore) RecordHostScans(ctx context.Context, scans []hostdb.HostScan // Write the interactions and update to the hosts atomically within a single // transaction. - return ss.retryTransaction(func(tx *gorm.DB) error { + return ss.retryTransaction(ctx, func(tx *gorm.DB) error { // Handle scans for _, scan := range scans { host, exists := hostMap[publicKey(scan.HostKey)] @@ -841,7 +828,7 @@ func (ss *SQLStore) RecordPriceTables(ctx context.Context, priceTableUpdate []ho end = len(hks) } var batchHosts []dbHost - if err := ss.db.Where("public_key IN (?)", hks[i:end]). + if err := ss.db.WithContext(ctx).Where("public_key IN (?)", hks[i:end]). Find(&batchHosts).Error; err != nil { return err } @@ -854,7 +841,7 @@ func (ss *SQLStore) RecordPriceTables(ctx context.Context, priceTableUpdate []ho // Write the interactions and update to the hosts atomically within a single // transaction. - return ss.retryTransaction(func(tx *gorm.DB) error { + return ss.retryTransaction(ctx, func(tx *gorm.DB) error { // Handle price table updates for _, ptu := range priceTableUpdate { host, exists := hostMap[publicKey(ptu.HostKey)] @@ -1086,7 +1073,7 @@ func updateBlocklist(tx *gorm.DB, hk types.PublicKey, allowlist []dbAllowlistEnt } func (s *SQLStore) ResetLostSectors(ctx context.Context, hk types.PublicKey) error { - return s.retryTransaction(func(tx *gorm.DB) error { + return s.retryTransaction(ctx, func(tx *gorm.DB) error { return tx.Model(&dbHost{}). Where("public_key", publicKey(hk)). Update("lost_sectors", 0). diff --git a/stores/metadata.go b/stores/metadata.go index 0733ad567..c543695bd 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -32,8 +32,9 @@ const ( // 10/30 erasure coding and takes <1s to execute on an SSD in SQLite. refreshHealthBatchSize = 10000 + // sectorInsertionBatchSize is the number of sectors per batch when we + // upsert sectors. sectorInsertionBatchSize = 500 - sectorQueryBatchSize = 100 refreshHealthMinHealthValidity = 12 * time.Hour refreshHealthMaxHealthValidity = 72 * time.Hour @@ -480,6 +481,7 @@ func (raw rawObject) toSlabSlice() (slice object.SlabSlice, _ error) { func (s *SQLStore) Bucket(ctx context.Context, bucket string) (api.Bucket, error) { var b dbBucket err := s.db. + WithContext(ctx). Model(&dbBucket{}). Where("name = ?", bucket). Take(&b). @@ -498,7 +500,7 @@ func (s *SQLStore) Bucket(ctx context.Context, bucket string) (api.Bucket, error func (s *SQLStore) CreateBucket(ctx context.Context, bucket string, policy api.BucketPolicy) error { // Create bucket. - return s.retryTransaction(func(tx *gorm.DB) error { + return s.retryTransaction(ctx, func(tx *gorm.DB) error { res := tx.Clauses(clause.OnConflict{ DoNothing: true, }). @@ -520,7 +522,7 @@ func (s *SQLStore) UpdateBucketPolicy(ctx context.Context, bucket string, policy if err != nil { return err } - return s.retryTransaction(func(tx *gorm.DB) error { + return s.retryTransaction(ctx, func(tx *gorm.DB) error { return tx. Model(&dbBucket{}). Where("name", bucket). @@ -534,7 +536,7 @@ func (s *SQLStore) UpdateBucketPolicy(ctx context.Context, bucket string, policy func (s *SQLStore) DeleteBucket(ctx context.Context, bucket string) error { // Delete bucket. - return s.retryTransaction(func(tx *gorm.DB) error { + return s.retryTransaction(ctx, func(tx *gorm.DB) error { var b dbBucket if err := tx.Take(&b, "name = ?", bucket).Error; errors.Is(err, gorm.ErrRecordNotFound) { return api.ErrBucketNotFound @@ -561,6 +563,7 @@ func (s *SQLStore) DeleteBucket(ctx context.Context, bucket string) error { func (s *SQLStore) ListBuckets(ctx context.Context) ([]api.Bucket, error) { var buckets []dbBucket err := s.db. + WithContext(ctx). Model(&dbBucket{}). Find(&buckets). Error @@ -583,10 +586,12 @@ func (s *SQLStore) ListBuckets(ctx context.Context) ([]api.Bucket, error) { // reduce locking and make sure all results are consistent, everything is done // within a single transaction. func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error) { + db := s.db.WithContext(ctx) + // fetch bucket id if a bucket was specified var bucketID uint if opts.Bucket != "" { - err := s.db.Model(&dbBucket{}).Select("id").Where("name = ?", opts.Bucket).Take(&bucketID).Error + err := db.Model(&dbBucket{}).Select("id").Where("name = ?", opts.Bucket).Take(&bucketID).Error if err != nil { return api.ObjectsStatsResponse{}, err } @@ -598,7 +603,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) MinHealth float64 TotalObjectsSize uint64 } - objInfoQuery := s.db. + objInfoQuery := db. Model(&dbObject{}). Select("COUNT(*) AS NumObjects, COALESCE(MIN(health), 1) as MinHealth, SUM(size) AS TotalObjectsSize") if opts.Bucket != "" { @@ -611,7 +616,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) // number of unfinished objects var unfinishedObjects uint64 - unfinishedObjectsQuery := s.db. + unfinishedObjectsQuery := db. Model(&dbMultipartUpload{}). Select("COUNT(*)") if opts.Bucket != "" { @@ -624,7 +629,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) // size of unfinished objects var totalUnfinishedObjectsSize uint64 - totalUnfinishedObjectsSizeQuery := s.db. + totalUnfinishedObjectsSizeQuery := db. Model(&dbMultipartPart{}). Joins("INNER JOIN multipart_uploads mu ON multipart_parts.db_multipart_upload_id = mu.id"). Select("COALESCE(SUM(size), 0)") @@ -637,7 +642,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) } var totalSectors int64 - totalSectorsQuery := s.db. + totalSectorsQuery := db. Table("slabs sla"). Select("COALESCE(SUM(total_shards), 0)"). Where("db_buffered_slab_id IS NULL") @@ -657,7 +662,7 @@ func (s *SQLStore) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) } var totalUploaded int64 - err = s.db. + err = db. Model(&dbContract{}). Select("COALESCE(SUM(size), 0)"). Scan(&totalUploaded). @@ -707,7 +712,7 @@ func (s *SQLStore) AddContract(ctx context.Context, c rhpv2.ContractRevision, co return api.ContractMetadata{}, err } var added dbContract - if err = s.retryTransaction(func(tx *gorm.DB) error { + if err = s.retryTransaction(ctx, func(tx *gorm.DB) error { added, err = addContract(tx, c, contractPrice, totalCost, startHeight, types.FileContractID{}, cs) return err }); err != nil { @@ -719,12 +724,14 @@ func (s *SQLStore) AddContract(ctx context.Context, c rhpv2.ContractRevision, co } func (s *SQLStore) Contracts(ctx context.Context, opts api.ContractsOpts) ([]api.ContractMetadata, error) { + db := s.db.WithContext(ctx) + // helper to check whether a contract set exists hasContractSet := func() error { if opts.ContractSet == "" { return nil } - err := s.db.Where("name", opts.ContractSet).Take(&dbContractSet{}).Error + err := db.Where("name", opts.ContractSet).Take(&dbContractSet{}).Error if errors.Is(err, gorm.ErrRecordNotFound) { return api.ErrContractSetNotFound } @@ -737,13 +744,13 @@ func (s *SQLStore) Contracts(ctx context.Context, opts api.ContractsOpts) ([]api Host dbHost `gorm:"embedded"` Name string } - tx := s.db + tx := db if opts.ContractSet == "" { // no filter, use all contracts tx = tx.Table("contracts") } else { // filter contracts by contract set first - tx = tx.Table("(?) contracts", s.db.Model(&dbContract{}). + tx = tx.Table("(?) contracts", db.Model(&dbContract{}). Select("contracts.*"). Joins("INNER JOIN hosts h ON h.id = contracts.host_id"). Joins("INNER JOIN contract_set_contracts csc ON csc.db_contract_id = contracts.id"). @@ -806,7 +813,7 @@ func (s *SQLStore) AddRenewedContract(ctx context.Context, c rhpv2.ContractRevis return api.ContractMetadata{}, err } var renewed dbContract - if err := s.retryTransaction(func(tx *gorm.DB) error { + if err := s.retryTransaction(ctx, func(tx *gorm.DB) error { // Fetch contract we renew from. oldContract, err := contract(tx, fileContractID(renewedFrom)) if err != nil { @@ -846,7 +853,7 @@ func (s *SQLStore) AddRenewedContract(ctx context.Context, c rhpv2.ContractRevis func (s *SQLStore) AncestorContracts(ctx context.Context, id types.FileContractID, startHeight uint64) ([]api.ArchivedContract, error) { var ancestors []dbArchivedContract - err := s.db.Raw("WITH RECURSIVE ancestors AS (SELECT * FROM archived_contracts WHERE renewed_to = ? UNION ALL SELECT archived_contracts.* FROM ancestors, archived_contracts WHERE archived_contracts.renewed_to = ancestors.fcid) SELECT * FROM ancestors WHERE start_height >= ?", fileContractID(id), startHeight). + err := s.db.WithContext(ctx).Raw("WITH RECURSIVE ancestors AS (SELECT * FROM archived_contracts WHERE renewed_to = ? UNION ALL SELECT archived_contracts.* FROM ancestors, archived_contracts WHERE archived_contracts.renewed_to = ancestors.fcid) SELECT * FROM ancestors WHERE start_height >= ?", fileContractID(id), startHeight). Scan(&ancestors). Error if err != nil { @@ -877,8 +884,8 @@ func (s *SQLStore) ArchiveContracts(ctx context.Context, toArchive map[types.Fil } // archive them - if err := s.retryTransaction(func(tx *gorm.DB) error { - return archiveContracts(ctx, tx, cs, toArchive) + if err := s.retryTransaction(ctx, func(tx *gorm.DB) error { + return archiveContracts(tx, cs, toArchive) }); err != nil { return err } @@ -890,6 +897,7 @@ func (s *SQLStore) ArchiveAllContracts(ctx context.Context, reason string) error // fetch contract ids var fcids []fileContractID if err := s.db. + WithContext(ctx). Model(&dbContract{}). Pluck("fcid", &fcids). Error; err != nil { @@ -920,6 +928,7 @@ func (s *SQLStore) ContractRoots(ctx context.Context, id types.FileContractID) ( var dbRoots []hash256 if err = s.db. + WithContext(ctx). Raw(` SELECT sec.root FROM contracts c @@ -938,7 +947,7 @@ WHERE c.fcid = ? func (s *SQLStore) ContractSets(ctx context.Context) ([]string, error) { var sets []string - err := s.db.Raw("SELECT name FROM contract_sets"). + err := s.db.WithContext(ctx).Raw("SELECT name FROM contract_sets"). Scan(&sets). Error return sets, err @@ -953,7 +962,7 @@ func (s *SQLStore) ContractSizes(ctx context.Context) (map[types.FileContractID] var nullContracts []size var dataContracts []size - if err := s.retryTransaction(func(tx *gorm.DB) error { + if err := s.retryTransaction(ctx, func(tx *gorm.DB) error { // first, we fetch all contracts without sectors and consider their // entire size as prunable if err := tx. @@ -1003,6 +1012,7 @@ func (s *SQLStore) ContractSize(ctx context.Context, id types.FileContractID) (a } if err := s.db. + WithContext(ctx). Raw(` SELECT contract_size as size, CASE WHEN contract_size > sector_size THEN contract_size - sector_size ELSE 0 END as prunable FROM ( SELECT MAX(c.size) as contract_size, COUNT(cs.db_sector_id) * ? as sector_size FROM contracts c LEFT JOIN contract_sectors cs ON cs.db_contract_id = c.id WHERE c.fcid = ? @@ -1029,7 +1039,7 @@ func (s *SQLStore) SetContractSet(ctx context.Context, name string, contractIds var diff []fileContractID var nContractsAfter int - err := s.retryTransaction(func(tx *gorm.DB) error { + err := s.retryTransaction(ctx, func(tx *gorm.DB) error { // fetch contract set var cs dbContractSet err := tx. @@ -1097,6 +1107,7 @@ func (s *SQLStore) SetContractSet(ctx context.Context, name string, contractIds func (s *SQLStore) RemoveContractSet(ctx context.Context, name string) error { return s.db. + WithContext(ctx). Where(dbContractSet{Name: name}). Delete(&dbContractSet{}). Error @@ -1106,6 +1117,7 @@ func (s *SQLStore) RenewedContract(ctx context.Context, renewedFrom types.FileCo var contract dbContract err = s.db. + WithContext(ctx). Where(&dbContract{ContractCommon: ContractCommon{RenewedFrom: fileContractID(renewedFrom)}}). Joins("Host"). Take(&contract). @@ -1126,6 +1138,7 @@ func (s *SQLStore) SearchObjects(ctx context.Context, bucket, substring string, var objects []api.ObjectMetadata err := s.db. + WithContext(ctx). Select("o.object_id as Name, o.size as Size, o.health as Health, o.mime_type as MimeType, o.etag as ETag, o.created_at as ModTime"). Model(&dbObject{}). Table("objects o"). @@ -1259,6 +1272,7 @@ FROM ( case api.ObjectSortByHealth: var markerHealth float64 if err = s.db. + WithContext(ctx). Raw(fmt.Sprintf(`SELECT Health FROM (%s WHERE oname >= ? ORDER BY oname LIMIT 1) as n`, objectsQuery), append(objectsQueryParams, marker)...). Scan(&markerHealth). Error; err != nil { @@ -1275,6 +1289,7 @@ FROM ( case api.ObjectSortBySize: var markerSize float64 if err = s.db. + WithContext(ctx). Raw(fmt.Sprintf(`SELECT Size FROM (%s WHERE oname >= ? ORDER BY oname LIMIT 1) as n`, objectsQuery), append(objectsQueryParams, marker)...). Scan(&markerSize). Error; err != nil { @@ -1315,6 +1330,7 @@ FROM ( parameters := append(append(objectsQueryParams, markerParams...), limit, offset) if err = s.db. + WithContext(ctx). Raw(query, parameters...). Scan(&rows). Error; err != nil { @@ -1335,8 +1351,8 @@ FROM ( } func (s *SQLStore) Object(ctx context.Context, bucket, path string) (obj api.Object, err error) { - err = s.db.Transaction(func(tx *gorm.DB) error { - obj, err = s.object(ctx, tx, bucket, path) + err = s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { + obj, err = s.object(tx, bucket, path) return err }) return @@ -1367,7 +1383,7 @@ func (s *SQLStore) RecordContractSpending(ctx context.Context, records []api.Con } metrics := make([]api.ContractMetric, 0, len(squashedRecords)) for fcid, newSpending := range squashedRecords { - err := s.retryTransaction(func(tx *gorm.DB) error { + err := s.retryTransaction(ctx, func(tx *gorm.DB) error { var contract dbContract err := tx.Model(&dbContract{}). Where("fcid = ?", fileContractID(fcid)). @@ -1470,7 +1486,7 @@ func fetchUsedContracts(tx *gorm.DB, usedContracts map[types.PublicKey]map[types } func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew string, force bool) error { - return s.retryTransaction(func(tx *gorm.DB) error { + return s.retryTransaction(ctx, func(tx *gorm.DB) error { if force { // delete potentially existing object at destination if _, err := s.deleteObject(tx, bucket, keyNew); err != nil { @@ -1492,7 +1508,7 @@ func (s *SQLStore) RenameObject(ctx context.Context, bucket, keyOld, keyNew stri } func (s *SQLStore) RenameObjects(ctx context.Context, bucket, prefixOld, prefixNew string, force bool) error { - return s.retryTransaction(func(tx *gorm.DB) error { + return s.retryTransaction(ctx, func(tx *gorm.DB) error { if force { // delete potentially existing objects at destination inner := tx.Raw("SELECT ? FROM objects WHERE object_id LIKE ? AND SUBSTR(object_id, 1, ?) = ? AND ?", @@ -1538,7 +1554,7 @@ func (s *SQLStore) AddPartialSlab(ctx context.Context, data []byte, minShards, t } func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath, dstPath, mimeType string, metadata api.ObjectUserMetadata) (om api.ObjectMetadata, err error) { - err = s.retryTransaction(func(tx *gorm.DB) error { + err = s.retryTransaction(ctx, func(tx *gorm.DB) error { if srcBucket != dstBucket || srcPath != dstPath { _, err = s.deleteObject(tx, dstBucket, dstPath) if err != nil { @@ -1625,7 +1641,7 @@ func (s *SQLStore) CopyObject(ctx context.Context, srcBucket, dstBucket, srcPath func (s *SQLStore) DeleteHostSector(ctx context.Context, hk types.PublicKey, root types.Hash256) (int, error) { var deletedSectors int - err := s.retryTransaction(func(tx *gorm.DB) error { + err := s.retryTransaction(ctx, func(tx *gorm.DB) error { // Fetch contract_sectors to delete. var sectors []dbContractSector err := tx.Raw(` @@ -1710,7 +1726,7 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, usedContracts := o.Contracts() // UpdateObject is ACID. - return s.retryTransaction(func(tx *gorm.DB) error { + return s.retryTransaction(ctx, func(tx *gorm.DB) error { // Try to delete. We want to get rid of the object and its slices if it // exists. // @@ -1783,7 +1799,7 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, func (s *SQLStore) RemoveObject(ctx context.Context, bucket, key string) error { var rowsAffected int64 var err error - err = s.retryTransaction(func(tx *gorm.DB) error { + err = s.retryTransaction(ctx, func(tx *gorm.DB) error { rowsAffected, err = s.deleteObject(tx, bucket, key) if err != nil { return fmt.Errorf("RemoveObject: failed to delete object: %w", err) @@ -1802,7 +1818,7 @@ func (s *SQLStore) RemoveObject(ctx context.Context, bucket, key string) error { func (s *SQLStore) RemoveObjects(ctx context.Context, bucket, prefix string) error { var rowsAffected int64 var err error - rowsAffected, err = s.deleteObjects(bucket, prefix) + rowsAffected, err = s.deleteObjects(ctx, bucket, prefix) if err != nil { return err } @@ -1852,7 +1868,7 @@ func (ss *SQLStore) UpdateSlab(ctx context.Context, s object.Slab, contractSet s usedContracts := s.Contracts() // Update slab. - return ss.retryTransaction(func(tx *gorm.DB) (err error) { + return ss.retryTransaction(ctx, func(tx *gorm.DB) (err error) { // update slab if err := tx.Model(&dbSlab{}). Where("key", key). @@ -1986,7 +2002,7 @@ LIMIT ? for { var rowsAffected int64 - err := s.retryTransaction(func(tx *gorm.DB) error { + err := s.retryTransaction(ctx, func(tx *gorm.DB) error { var res *gorm.DB if isSQLite(s.db) { res = tx.Exec("UPDATE slabs SET health = inner.health, health_valid_until = (?) FROM (?) AS inner WHERE slabs.id=inner.id", sqlRandomTimestamp(s.db, now, refreshHealthMinHealthValidity, refreshHealthMaxHealthValidity), healthQuery) @@ -2036,7 +2052,7 @@ func (s *SQLStore) UnhealthySlabs(ctx context.Context, healthCutoff float64, set Health float64 } - if err := s.retryTransaction(func(tx *gorm.DB) error { + if err := s.retryTransaction(ctx, func(tx *gorm.DB) error { return tx.Select("slabs.key, slabs.health"). Joins("INNER JOIN contract_sets cs ON slabs.db_contract_set_id = cs.id"). Model(&dbSlab{}). @@ -2229,19 +2245,19 @@ func (s *SQLStore) createSlices(tx *gorm.DB, objID, multiPartID *uint, contractS } // object retrieves an object from the store. -func (s *SQLStore) object(ctx context.Context, tx *gorm.DB, bucket, path string) (api.Object, error) { +func (s *SQLStore) object(tx *gorm.DB, bucket, path string) (api.Object, error) { // fetch raw object data - raw, err := s.objectRaw(ctx, tx, bucket, path) + raw, err := s.objectRaw(tx, bucket, path) if errors.Is(err, gorm.ErrRecordNotFound) || len(raw) == 0 { return api.Object{}, api.ErrObjectNotFound } // hydrate raw object data - return s.objectHydrate(ctx, tx, bucket, path, raw) + return s.objectHydrate(tx, bucket, path, raw) } // objectHydrate hydrates a raw object and returns an api.Object. -func (s *SQLStore) objectHydrate(ctx context.Context, tx *gorm.DB, bucket, path string, obj rawObject) (api.Object, error) { +func (s *SQLStore) objectHydrate(tx *gorm.DB, bucket, path string, obj rawObject) (api.Object, error) { // parse object key var key object.EncryptionKey if err := key.UnmarshalBinary(obj[0].ObjectKey); err != nil { @@ -2296,7 +2312,7 @@ func (s *SQLStore) objectHydrate(ctx context.Context, tx *gorm.DB, bucket, path } // fetch object metadata - metadata, err := s.objectMetadata(ctx, tx, bucket, path) + metadata, err := s.objectMetadata(tx, bucket, path) if err != nil { return api.Object{}, err } @@ -2322,7 +2338,7 @@ func (s *SQLStore) objectHydrate(ctx context.Context, tx *gorm.DB, bucket, path // ObjectMetadata returns an object's metadata func (s *SQLStore) ObjectMetadata(ctx context.Context, bucket, path string) (api.Object, error) { var resp api.Object - err := s.db.Transaction(func(tx *gorm.DB) error { + err := s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var obj dbObject err := tx.Model(&dbObject{}). Joins("INNER JOIN buckets b ON objects.db_bucket_id = b.id"). @@ -2335,7 +2351,7 @@ func (s *SQLStore) ObjectMetadata(ctx context.Context, bucket, path string) (api } else if err != nil { return err } - oum, err := s.objectMetadata(ctx, tx, bucket, path) + oum, err := s.objectMetadata(tx, bucket, path) if err != nil { return err } @@ -2355,7 +2371,7 @@ func (s *SQLStore) ObjectMetadata(ctx context.Context, bucket, path string) (api return resp, err } -func (s *SQLStore) objectMetadata(ctx context.Context, tx *gorm.DB, bucket, path string) (api.ObjectUserMetadata, error) { +func (s *SQLStore) objectMetadata(tx *gorm.DB, bucket, path string) (api.ObjectUserMetadata, error) { var rows []dbObjectUserMetadata err := tx. Model(&dbObjectUserMetadata{}). @@ -2386,12 +2402,12 @@ func newObjectMetadata(name, etag, mimeType string, health float64, modTime time } } -func (s *SQLStore) objectRaw(ctx context.Context, txn *gorm.DB, bucket string, path string) (rows rawObject, err error) { +func (s *SQLStore) objectRaw(txn *gorm.DB, bucket string, path string) (rows rawObject, err error) { // NOTE: we LEFT JOIN here because empty objects are valid and need to be // included in the result set, when we convert the rawObject before // returning it we'll check for SlabID and/or SectorID being 0 and act // accordingly - err = s.db. + err = txn. Select("o.id as ObjectID, o.health as ObjectHealth, sli.object_index as ObjectIndex, o.key as ObjectKey, o.object_id as ObjectName, o.size as ObjectSize, o.mime_type as ObjectMimeType, o.created_at as ObjectModTime, o.etag as ObjectETag, sli.object_index, sli.offset as SliceOffset, sli.length as SliceLength, sla.id as SlabID, sla.health as SlabHealth, sla.key as SlabKey, sla.min_shards as SlabMinShards, bs.id IS NOT NULL AS SlabBuffered, sec.slab_index as SectorIndex, sec.root as SectorRoot, sec.latest_host as LatestHost, c.fcid as FCID, h.public_key as HostKey"). Model(&dbObject{}). Table("objects o"). @@ -2411,40 +2427,9 @@ func (s *SQLStore) objectRaw(ctx context.Context, txn *gorm.DB, bucket string, p return } -func (s *SQLStore) objectHealth(ctx context.Context, tx *gorm.DB, objectID uint) (health float64, err error) { - if err = tx. - Select("objects.health"). - Model(&dbObject{}). - Table("objects"). - Where("id", objectID). - Scan(&health). - Error; errors.Is(err, gorm.ErrRecordNotFound) { - err = api.ErrObjectNotFound - } - return -} - // contract retrieves a contract from the store. func (s *SQLStore) contract(ctx context.Context, id fileContractID) (dbContract, error) { - return contract(s.db, id) -} - -// contracts retrieves all contracts in the given set. -func (s *SQLStore) contracts(ctx context.Context, set string) ([]dbContract, error) { - var cs dbContractSet - err := s.db. - Where(&dbContractSet{Name: set}). - Preload("Contracts.Host"). - Take(&cs). - Error - - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, fmt.Errorf("%w '%s'", api.ErrContractSetNotFound, set) - } else if err != nil { - return nil, err - } - - return cs.Contracts, nil + return contract(s.db.WithContext(ctx), id) } // PackedSlabsForUpload returns up to 'limit' packed slabs that are ready for @@ -2452,7 +2437,7 @@ func (s *SQLStore) contracts(ctx context.Context, set string) ([]dbContract, err // again. func (s *SQLStore) PackedSlabsForUpload(ctx context.Context, lockingDuration time.Duration, minShards, totalShards uint8, set string, limit int) ([]api.PackedSlab, error) { var contractSetID uint - if err := s.db.Raw("SELECT id FROM contract_sets WHERE name = ?", set). + if err := s.db.WithContext(ctx).Raw("SELECT id FROM contract_sets WHERE name = ?", set). Scan(&contractSetID).Error; err != nil { return nil, err } @@ -2466,7 +2451,7 @@ func (s *SQLStore) ObjectsBySlabKey(ctx context.Context, bucket string, slabKey return nil, err } - err = s.retryTransaction(func(tx *gorm.DB) error { + err = s.retryTransaction(ctx, func(tx *gorm.DB) error { return tx.Raw(` SELECT DISTINCT obj.object_id as Name, obj.size as Size, obj.mime_type as MimeType, sla.health as Health FROM slabs sla @@ -2502,7 +2487,7 @@ func (s *SQLStore) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.Uplo } } var fileName string - err := s.retryTransaction(func(tx *gorm.DB) error { + err := s.retryTransaction(ctx, func(tx *gorm.DB) error { for _, slab := range slabs { var err error fileName, err = s.markPackedSlabUploaded(tx, slab) @@ -2678,14 +2663,14 @@ func addContract(tx *gorm.DB, c rhpv2.ContractRevision, contractPrice, totalCost // archival reason // // NOTE: this function archives the contracts without setting a renewed ID -func archiveContracts(ctx context.Context, tx *gorm.DB, contracts []dbContract, toArchive map[types.FileContractID]string) error { +func archiveContracts(tx *gorm.DB, contracts []dbContract, toArchive map[types.FileContractID]string) error { var toInvalidate []fileContractID for _, contract := range contracts { toInvalidate = append(toInvalidate, contract.FCID) } // Invalidate the health on the slabs before deleting the contracts to avoid // breaking the relations beforehand. - if err := invalidateSlabHealthByFCID(ctx, tx, toInvalidate); err != nil { + if err := invalidateSlabHealthByFCID(tx, toInvalidate); err != nil { return fmt.Errorf("invalidating slab health failed: %w", err) } for _, contract := range contracts { @@ -2762,12 +2747,12 @@ func (s *SQLStore) deleteObject(tx *gorm.DB, bucket string, path string) (int64, // deletion goes from largest to smallest. That's because the batch size is // dynamically increased and the smaller objects get the faster we can delete // them meaning it makes sense to increase the batch size over time. -func (s *SQLStore) deleteObjects(bucket string, path string) (numDeleted int64, _ error) { +func (s *SQLStore) deleteObjects(ctx context.Context, bucket string, path string) (numDeleted int64, _ error) { batchSizeIdx := 0 for { var duration time.Duration var rowsAffected int64 - if err := s.retryTransaction(func(tx *gorm.DB) error { + if err := s.retryTransaction(ctx, func(tx *gorm.DB) error { start := time.Now() res := tx.Exec(` DELETE FROM objects @@ -2809,7 +2794,7 @@ func (s *SQLStore) deleteObjects(bucket string, path string) (numDeleted int64, return numDeleted, nil } -func invalidateSlabHealthByFCID(ctx context.Context, tx *gorm.DB, fcids []fileContractID) error { +func invalidateSlabHealthByFCID(tx *gorm.DB, fcids []fileContractID) error { if len(fcids) == 0 { return nil } @@ -2833,22 +2818,18 @@ func invalidateSlabHealthByFCID(ctx context.Context, tx *gorm.DB, fcids []fileCo } else if resp.RowsAffected < refreshHealthBatchSize { break // done } - - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(time.Second): - } + time.Sleep(time.Second) } return nil } func (s *SQLStore) invalidateSlabHealthByFCID(ctx context.Context, fcids []fileContractID) error { - return s.retryTransaction(func(tx *gorm.DB) error { - return invalidateSlabHealthByFCID(ctx, tx, fcids) + return s.retryTransaction(ctx, func(tx *gorm.DB) error { + return invalidateSlabHealthByFCID(tx, fcids) }) } +// nolint:unparam func sqlConcat(db *gorm.DB, a, b string) string { if isSQLite(db) { return fmt.Sprintf("%s || %s", a, b) @@ -2863,6 +2844,7 @@ func sqlRandomTimestamp(db *gorm.DB, now time.Time, min, max time.Duration) clau return gorm.Expr("FLOOR(? + RAND() * (? - ?))", now.Add(min).Unix(), int(max.Seconds()), int(min.Seconds())) } +// nolint:unparam func sqlWhereBucket(objTable string, bucket string) clause.Expr { return gorm.Expr(fmt.Sprintf("%s.db_bucket_id = (SELECT id FROM buckets WHERE buckets.name = ?)", objTable), bucket) } diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 55bf93573..c6ac1cd52 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -24,10 +24,10 @@ import ( "lukechampine.com/frand" ) -func generateMultisigUC(m, n uint64, salt string) types.UnlockConditions { +func randomMultisigUC() types.UnlockConditions { uc := types.UnlockConditions{ - PublicKeys: make([]types.UnlockKey, n), - SignaturesRequired: uint64(m), + PublicKeys: make([]types.UnlockKey, 2), + SignaturesRequired: 1, } for i := range uc.PublicKeys { uc.PublicKeys[i].Algorithm = types.SpecifierEd25519 @@ -225,7 +225,7 @@ func TestSQLContractStore(t *testing.T) { } // Create random unlock conditions for the host. - uc := generateMultisigUC(1, 2, "salt") + uc := randomMultisigUC() uc.PublicKeys[1].Key = hk[:] uc.Timelock = 192837 @@ -520,11 +520,11 @@ func TestRenewedContract(t *testing.T) { } // Create random unlock conditions for the hosts. - uc := generateMultisigUC(1, 2, "salt") + uc := randomMultisigUC() uc.PublicKeys[1].Key = hk[:] uc.Timelock = 192837 - uc2 := generateMultisigUC(1, 2, "salt") + uc2 := randomMultisigUC() uc2.PublicKeys[1].Key = hk2[:] uc2.Timelock = 192837 @@ -874,7 +874,7 @@ func TestArchiveContracts(t *testing.T) { } func testContractRevision(fcid types.FileContractID, hk types.PublicKey) rhpv2.ContractRevision { - uc := generateMultisigUC(1, 2, "salt") + uc := randomMultisigUC() uc.PublicKeys[1].Key = hk[:] uc.Timelock = 192837 return rhpv2.ContractRevision{ @@ -1868,7 +1868,7 @@ func TestUnhealthySlabsNoContracts(t *testing.T) { // delete the sector - we manually invalidate the slabs for the contract // before deletion. - err = invalidateSlabHealthByFCID(context.Background(), ss.db, []fileContractID{fileContractID(fcid1)}) + err = invalidateSlabHealthByFCID(ss.db, []fileContractID{fileContractID(fcid1)}) if err != nil { t.Fatal(err) } @@ -3286,7 +3286,7 @@ func TestBucketObjects(t *testing.T) { // See if we can fetch the object by slab. var ec object.EncryptionKey - if obj, err := ss.objectRaw(context.Background(), ss.db, b1, "/bar"); err != nil { + if obj, err := ss.objectRaw(ss.db, b1, "/bar"); err != nil { t.Fatal(err) } else if err := ec.UnmarshalBinary(obj[0].SlabKey); err != nil { t.Fatal(err) @@ -3391,7 +3391,7 @@ func TestMarkSlabUploadedAfterRenew(t *testing.T) { // renew the contract. fcidRenewed := types.FileContractID{2, 2, 2, 2, 2} - uc := generateMultisigUC(1, 2, "salt") + uc := randomMultisigUC() rev := rhpv2.ContractRevision{ Revision: types.FileContractRevision{ ParentID: fcidRenewed, diff --git a/stores/metrics.go b/stores/metrics.go index 333ed8a42..c8369f630 100644 --- a/stores/metrics.go +++ b/stores/metrics.go @@ -450,11 +450,11 @@ func (s *SQLStore) contractMetrics(ctx context.Context, start time.Time, n uint6 if opts.ContractID == (types.FileContractID{}) && opts.HostKey == (types.PublicKey{}) { // if neither contract nor host filters were set, we return the // aggregate spending for each period - metrics, err = s.findAggregatedContractPeriods(start, n, interval) + metrics, err = s.findAggregatedContractPeriods(ctx, start, n, interval) } else { // otherwise we return the first metric for each period like we usually // do - err = s.findPeriods(dbContractMetric{}.TableName(), &metrics, start, n, interval, whereExpr) + err = s.findPeriods(ctx, dbContractMetric{}.TableName(), &metrics, start, n, interval, whereExpr) } if err != nil { return nil, fmt.Errorf("failed to fetch contract metrics: %w", err) @@ -478,7 +478,7 @@ func (s *SQLStore) contractPruneMetrics(ctx context.Context, start time.Time, n } var metrics []dbContractPruneMetric - err := s.findPeriods(dbContractPruneMetric{}.TableName(), &metrics, start, n, interval, whereExpr) + err := s.findPeriods(ctx, dbContractPruneMetric{}.TableName(), &metrics, start, n, interval, whereExpr) if err != nil { return nil, fmt.Errorf("failed to fetch contract metrics: %w", err) } @@ -498,7 +498,7 @@ func (s *SQLStore) contractSetChurnMetrics(ctx context.Context, start time.Time, whereExpr = gorm.Expr("? AND reason = ?", whereExpr, opts.Reason) } var metrics []dbContractSetChurnMetric - err := s.findPeriods(dbContractSetChurnMetric{}.TableName(), &metrics, start, n, interval, whereExpr) + err := s.findPeriods(ctx, dbContractSetChurnMetric{}.TableName(), &metrics, start, n, interval, whereExpr) if err != nil { return nil, fmt.Errorf("failed to fetch contract set churn metrics: %w", err) } @@ -515,7 +515,7 @@ func (s *SQLStore) contractSetMetrics(ctx context.Context, start time.Time, n ui } var metrics []dbContractSetMetric - err := s.findPeriods(dbContractSetMetric{}.TableName(), &metrics, start, n, interval, whereExpr) + err := s.findPeriods(ctx, dbContractSetMetric{}.TableName(), &metrics, start, n, interval, whereExpr) if err != nil { return nil, fmt.Errorf("failed to fetch contract set metrics: %w", err) } @@ -536,7 +536,7 @@ func normaliseTimestamp(start time.Time, interval time.Duration, t unixTimeMS) u return unixTimeMS(time.UnixMilli(normalizedMS)) } -func (s *SQLStore) findAggregatedContractPeriods(start time.Time, n uint64, interval time.Duration) ([]dbContractMetric, error) { +func (s *SQLStore) findAggregatedContractPeriods(ctx context.Context, start time.Time, n uint64, interval time.Duration) ([]dbContractMetric, error) { if n > api.MetricMaxIntervals { return nil, api.ErrMaxIntervalsExceeded } @@ -548,7 +548,7 @@ func (s *SQLStore) findAggregatedContractPeriods(start time.Time, n uint64, inte } var metricsWithPeriod []metricWithPeriod - err := s.dbMetrics.Transaction(func(tx *gorm.DB) error { + err := s.dbMetrics.WithContext(ctx).Transaction(func(tx *gorm.DB) error { var fcids []fileContractID if err := tx.Raw("SELECT DISTINCT fcid FROM contracts WHERE contracts.timestamp >= ? AND contracts.timestamp < ?", unixTimeMS(start), unixTimeMS(end)). Scan(&fcids).Error; err != nil { @@ -599,12 +599,12 @@ func (s *SQLStore) findAggregatedContractPeriods(start time.Time, n uint64, inte // split into intervals and the row with the lowest timestamp for each interval // is returned. The result is then joined with the original table to retrieve // only the metrics we want. -func (s *SQLStore) findPeriods(table string, dst interface{}, start time.Time, n uint64, interval time.Duration, whereExpr clause.Expr) error { +func (s *SQLStore) findPeriods(ctx context.Context, table string, dst interface{}, start time.Time, n uint64, interval time.Duration, whereExpr clause.Expr) error { if n > api.MetricMaxIntervals { return api.ErrMaxIntervalsExceeded } end := start.Add(time.Duration(n) * interval) - return s.dbMetrics.Raw(fmt.Sprintf(` + return s.dbMetrics.WithContext(ctx).Raw(fmt.Sprintf(` WITH RECURSIVE periods AS ( SELECT ? AS period_start UNION ALL @@ -637,7 +637,7 @@ func (s *SQLStore) findPeriods(table string, dst interface{}, start time.Time, n } func (s *SQLStore) walletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) (metrics []dbWalletMetric, err error) { - err = s.findPeriods(dbWalletMetric{}.TableName(), &metrics, start, n, interval, gorm.Expr("TRUE")) + err = s.findPeriods(ctx, dbWalletMetric{}.TableName(), &metrics, start, n, interval, gorm.Expr("TRUE")) if err != nil { return nil, fmt.Errorf("failed to fetch wallet metrics: %w", err) } @@ -660,7 +660,7 @@ func (s *SQLStore) performanceMetrics(ctx context.Context, start time.Time, n ui } var metrics []dbPerformanceMetric - err := s.findPeriods(dbPerformanceMetric{}.TableName(), &metrics, start, n, interval, whereExpr) + err := s.findPeriods(ctx, dbPerformanceMetric{}.TableName(), &metrics, start, n, interval, whereExpr) if err != nil { return nil, fmt.Errorf("failed to fetch performance metrics: %w", err) } diff --git a/stores/migrations.go b/stores/migrations.go index b0304090e..6ccc75964 100644 --- a/stores/migrations.go +++ b/stores/migrations.go @@ -68,7 +68,7 @@ func performMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { m := gormigrate.New(db, gormigrate.DefaultOptions, migrations) // Set init function. - m.InitSchema(initSchema(db, dbIdentifier, logger)) + m.InitSchema(initSchema(dbIdentifier, logger)) // Perform migrations. if err := m.Migrate(); err != nil { diff --git a/stores/migrations_metrics.go b/stores/migrations_metrics.go index fc3164bee..25895c4f2 100644 --- a/stores/migrations_metrics.go +++ b/stores/migrations_metrics.go @@ -27,7 +27,7 @@ func performMetricsMigrations(db *gorm.DB, logger *zap.SugaredLogger) error { m := gormigrate.New(db, gormigrate.DefaultOptions, migrations) // Set init function. - m.InitSchema(initSchema(db, dbIdentifier, logger)) + m.InitSchema(initSchema(dbIdentifier, logger)) // Perform migrations. if err := m.Migrate(); err != nil { diff --git a/stores/migrations_utils.go b/stores/migrations_utils.go index 46d7f3dc4..0692b367f 100644 --- a/stores/migrations_utils.go +++ b/stores/migrations_utils.go @@ -10,7 +10,7 @@ import ( // initSchema is executed only on a clean database. Otherwise the individual // migrations are executed. -func initSchema(db *gorm.DB, name string, logger *zap.SugaredLogger) gormigrate.InitSchemaFunc { +func initSchema(name string, logger *zap.SugaredLogger) gormigrate.InitSchemaFunc { return func(tx *gorm.DB) error { logger.Infof("initializing '%s' schema", name) diff --git a/stores/multipart.go b/stores/multipart.go index 76c30c734..3da5f7992 100644 --- a/stores/multipart.go +++ b/stores/multipart.go @@ -56,7 +56,7 @@ func (s *SQLStore) CreateMultipartUpload(ctx context.Context, bucket, path strin return api.MultipartCreateResponse{}, err } var uploadID string - err = s.retryTransaction(func(tx *gorm.DB) error { + err = s.retryTransaction(ctx, func(tx *gorm.DB) error { // Get bucket id. var bucketID uint err := tx.Table("(SELECT id from buckets WHERE buckets.name = ?) bucket_id", bucket). @@ -108,7 +108,7 @@ func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractS } } } - return s.retryTransaction(func(tx *gorm.DB) error { + return s.retryTransaction(ctx, func(tx *gorm.DB) error { // Fetch contract set. var cs dbContractSet if err := tx.Take(&cs, "name = ?", contractSet).Error; err != nil { @@ -160,7 +160,7 @@ func (s *SQLStore) AddMultipartPart(ctx context.Context, bucket, path, contractS } func (s *SQLStore) MultipartUpload(ctx context.Context, uploadID string) (resp api.MultipartUpload, err error) { - err = s.retryTransaction(func(tx *gorm.DB) error { + err = s.retryTransaction(ctx, func(tx *gorm.DB) error { var dbUpload dbMultipartUpload err := tx. Model(&dbMultipartUpload{}). @@ -201,7 +201,7 @@ func (s *SQLStore) MultipartUploads(ctx context.Context, bucket, prefix, keyMark prefixExpr = gorm.Expr("SUBSTR(object_id, 1, ?) = ?", utf8.RuneCountInString(prefix), prefix) } - err = s.retryTransaction(func(tx *gorm.DB) error { + err = s.retryTransaction(ctx, func(tx *gorm.DB) error { var dbUploads []dbMultipartUpload err := tx. Model(&dbMultipartUpload{}). @@ -243,7 +243,7 @@ func (s *SQLStore) MultipartUploadParts(ctx context.Context, bucket, object stri limit++ } - err := s.retryTransaction(func(tx *gorm.DB) error { + err := s.retryTransaction(ctx, func(tx *gorm.DB) error { var dbParts []dbMultipartPart err := tx. Model(&dbMultipartPart{}). @@ -277,7 +277,7 @@ func (s *SQLStore) MultipartUploadParts(ctx context.Context, bucket, object stri } func (s *SQLStore) AbortMultipartUpload(ctx context.Context, bucket, path string, uploadID string) error { - return s.retryTransaction(func(tx *gorm.DB) error { + return s.retryTransaction(ctx, func(tx *gorm.DB) error { // delete multipart upload optimistically res := tx. Where("upload_id", uploadID). @@ -326,7 +326,7 @@ func (s *SQLStore) CompleteMultipartUpload(ctx context.Context, bucket, path str } } var eTag string - err = s.retryTransaction(func(tx *gorm.DB) error { + err = s.retryTransaction(ctx, func(tx *gorm.DB) error { // Delete potentially existing object. _, err := s.deleteObject(tx, bucket, path) if err != nil { diff --git a/stores/slabbuffer.go b/stores/slabbuffer.go index e1c7290ea..2d16c8e33 100644 --- a/stores/slabbuffer.go +++ b/stores/slabbuffer.go @@ -204,7 +204,7 @@ func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, m // If there is still data left, create a new buffer. if len(data) > 0 { var sb *SlabBuffer - err = mgr.s.retryTransaction(func(tx *gorm.DB) error { + err = mgr.s.retryTransaction(ctx, func(tx *gorm.DB) error { sb, err = createSlabBuffer(tx, contractSet, mgr.dir, minShards, totalShards) return err }) diff --git a/stores/sql.go b/stores/sql.go index 5d9d9cea8..f62dba97f 100644 --- a/stores/sql.go +++ b/stores/sql.go @@ -2,7 +2,6 @@ package stores import ( "context" - "database/sql" "embed" "errors" "fmt" @@ -446,7 +445,7 @@ func (ss *SQLStore) applyUpdates(force bool) error { ss.logger.Error(fmt.Sprintf("failed to fetch blocklist, err: %v", err)) } - err := ss.retryTransaction(func(tx *gorm.DB) (err error) { + err := ss.retryTransaction(context.Background(), func(tx *gorm.DB) (err error) { if len(ss.unappliedAnnouncements) > 0 { if err = insertAnnouncements(tx, ss.unappliedAnnouncements); err != nil { return fmt.Errorf("%w; failed to insert %d announcements", err, len(ss.unappliedAnnouncements)) @@ -514,9 +513,10 @@ func (ss *SQLStore) applyUpdates(force bool) error { return nil } -func (s *SQLStore) retryTransaction(fc func(tx *gorm.DB) error, opts ...*sql.TxOptions) error { +func (s *SQLStore) retryTransaction(ctx context.Context, fc func(tx *gorm.DB) error) error { abortRetry := func(err error) bool { if err == nil || + errors.Is(err, context.Canceled) || errors.Is(err, gorm.ErrRecordNotFound) || errors.Is(err, errInvalidNumberOfShards) || errors.Is(err, errShardRootChanged) || @@ -539,7 +539,7 @@ func (s *SQLStore) retryTransaction(fc func(tx *gorm.DB) error, opts ...*sql.TxO } var err error for i := 0; i < len(s.retryTransactionIntervals); i++ { - err = s.db.Transaction(fc, opts...) + err = s.db.WithContext(ctx).Transaction(fc) if abortRetry(err) { return err } @@ -566,10 +566,10 @@ func initConsensusInfo(db *gorm.DB) (dbConsensusInfo, modules.ConsensusChangeID, return ci, ccid, nil } -func (s *SQLStore) ResetConsensusSubscription() error { +func (s *SQLStore) ResetConsensusSubscription(ctx context.Context) error { // empty tables and reinit consensus_infos var ci dbConsensusInfo - err := s.retryTransaction(func(tx *gorm.DB) error { + err := s.retryTransaction(ctx, func(tx *gorm.DB) error { if err := s.db.Exec("DELETE FROM consensus_infos").Error; err != nil { return err } else if err := s.db.Exec("DELETE FROM siacoin_elements").Error; err != nil { diff --git a/stores/sql_test.go b/stores/sql_test.go index 17c296075..842f3c9df 100644 --- a/stores/sql_test.go +++ b/stores/sql_test.go @@ -292,7 +292,7 @@ func TestConsensusReset(t *testing.T) { }) // Reset the consensus. - if err := ss.ResetConsensusSubscription(); err != nil { + if err := ss.ResetConsensusSubscription(context.Background()); err != nil { t.Fatal(err) } diff --git a/stores/webhooks.go b/stores/webhooks.go index f3fc26057..4db325698 100644 --- a/stores/webhooks.go +++ b/stores/webhooks.go @@ -1,6 +1,8 @@ package stores import ( + "context" + "go.sia.tech/renterd/webhooks" "gorm.io/gorm" "gorm.io/gorm/clause" @@ -20,8 +22,8 @@ func (dbWebhook) TableName() string { return "webhooks" } -func (s *SQLStore) DeleteWebhook(wb webhooks.Webhook) error { - return s.retryTransaction(func(tx *gorm.DB) error { +func (s *SQLStore) DeleteWebhook(ctx context.Context, wb webhooks.Webhook) error { + return s.retryTransaction(ctx, func(tx *gorm.DB) error { res := tx.Exec("DELETE FROM webhooks WHERE module = ? AND event = ? AND url = ?", wb.Module, wb.Event, wb.URL) if res.Error != nil { @@ -33,8 +35,8 @@ func (s *SQLStore) DeleteWebhook(wb webhooks.Webhook) error { }) } -func (s *SQLStore) AddWebhook(wb webhooks.Webhook) error { - return s.retryTransaction(func(tx *gorm.DB) error { +func (s *SQLStore) AddWebhook(ctx context.Context, wb webhooks.Webhook) error { + return s.retryTransaction(ctx, func(tx *gorm.DB) error { return tx.Clauses(clause.OnConflict{ DoNothing: true, }).Create(&dbWebhook{ @@ -45,9 +47,9 @@ func (s *SQLStore) AddWebhook(wb webhooks.Webhook) error { }) } -func (s *SQLStore) Webhooks() ([]webhooks.Webhook, error) { +func (s *SQLStore) Webhooks(ctx context.Context) ([]webhooks.Webhook, error) { var dbWebhooks []dbWebhook - if err := s.db.Find(&dbWebhooks).Error; err != nil { + if err := s.db.WithContext(ctx).Find(&dbWebhooks).Error; err != nil { return nil, err } var whs []webhooks.Webhook diff --git a/stores/webhooks_test.go b/stores/webhooks_test.go index ad1973125..b306eef2c 100644 --- a/stores/webhooks_test.go +++ b/stores/webhooks_test.go @@ -1,6 +1,7 @@ package stores import ( + "context" "testing" "github.com/google/go-cmp/cmp" @@ -23,10 +24,10 @@ func TestWebhooks(t *testing.T) { } // Add hook. - if err := ss.AddWebhook(wh1); err != nil { + if err := ss.AddWebhook(context.Background(), wh1); err != nil { t.Fatal(err) } - whs, err := ss.Webhooks() + whs, err := ss.Webhooks(context.Background()) if err != nil { t.Fatal(err) } else if len(whs) != 1 { @@ -36,10 +37,10 @@ func TestWebhooks(t *testing.T) { } // Add it again. Should be a no-op. - if err := ss.AddWebhook(wh1); err != nil { + if err := ss.AddWebhook(context.Background(), wh1); err != nil { t.Fatal(err) } - whs, err = ss.Webhooks() + whs, err = ss.Webhooks(context.Background()) if err != nil { t.Fatal(err) } else if len(whs) != 1 { @@ -49,10 +50,10 @@ func TestWebhooks(t *testing.T) { } // Add another. - if err := ss.AddWebhook(wh2); err != nil { + if err := ss.AddWebhook(context.Background(), wh2); err != nil { t.Fatal(err) } - whs, err = ss.Webhooks() + whs, err = ss.Webhooks(context.Background()) if err != nil { t.Fatal(err) } else if len(whs) != 2 { @@ -64,10 +65,10 @@ func TestWebhooks(t *testing.T) { } // Remove one. - if err := ss.DeleteWebhook(wh1); err != nil { + if err := ss.DeleteWebhook(context.Background(), wh1); err != nil { t.Fatal(err) } - whs, err = ss.Webhooks() + whs, err = ss.Webhooks(context.Background()) if err != nil { t.Fatal(err) } else if len(whs) != 1 { diff --git a/webhooks/webhooks.go b/webhooks/webhooks.go index e3d388de8..665f8a2c4 100644 --- a/webhooks/webhooks.go +++ b/webhooks/webhooks.go @@ -19,9 +19,9 @@ var ErrWebhookNotFound = errors.New("Webhook not found") type ( WebhookStore interface { - DeleteWebhook(wh Webhook) error - AddWebhook(wh Webhook) error - Webhooks() ([]Webhook, error) + DeleteWebhook(ctx context.Context, wh Webhook) error + AddWebhook(ctx context.Context, wh Webhook) error + Webhooks(ctx context.Context) ([]Webhook, error) } Broadcaster interface { @@ -122,10 +122,10 @@ func (m *Manager) Close() error { return nil } -func (m *Manager) Delete(wh Webhook) error { +func (m *Manager) Delete(ctx context.Context, wh Webhook) error { m.mu.Lock() defer m.mu.Unlock() - if err := m.store.DeleteWebhook(wh); errors.Is(err, gorm.ErrRecordNotFound) { + if err := m.store.DeleteWebhook(ctx, wh); errors.Is(err, gorm.ErrRecordNotFound) { return ErrWebhookNotFound } else if err != nil { return err @@ -157,7 +157,7 @@ func (m *Manager) Info() ([]Webhook, []WebhookQueueInfo) { return hooks, queueInfos } -func (m *Manager) Register(wh Webhook) error { +func (m *Manager) Register(ctx context.Context, wh Webhook) error { ctx, cancel := context.WithTimeout(m.shutdownCtx, webhookTimeout) defer cancel() @@ -170,7 +170,7 @@ func (m *Manager) Register(wh Webhook) error { } // Add Webhook. - if err := m.store.AddWebhook(wh); err != nil { + if err := m.store.AddWebhook(ctx, wh); err != nil { return err } m.mu.Lock() @@ -214,11 +214,6 @@ func (w Webhook) String() string { } func NewManager(logger *zap.SugaredLogger, store WebhookStore) (*Manager, error) { - hooks, err := store.Webhooks() - if err != nil { - return nil, err - } - shutdownCtx, shutdownCtxCancel := context.WithCancel(context.Background()) m := &Manager{ logger: logger.Named("webhooks"), @@ -230,7 +225,10 @@ func NewManager(logger *zap.SugaredLogger, store WebhookStore) (*Manager, error) queues: make(map[string]*eventQueue), webhooks: make(map[string]Webhook), } - + hooks, err := store.Webhooks(shutdownCtx) + if err != nil { + return nil, err + } for _, hook := range hooks { m.webhooks[hook.String()] = hook } diff --git a/worker/download.go b/worker/download.go index 6f070acbd..83d4bec3e 100644 --- a/worker/download.go +++ b/worker/download.go @@ -495,11 +495,11 @@ func (mgr *downloadManager) refreshDownloaders(contracts []api.ContractMetadata) host := mgr.hm.Host(c.HostKey, c.ID, c.SiamuxAddr) downloader := newDownloader(mgr.shutdownCtx, host) mgr.downloaders[c.HostKey] = downloader - go downloader.processQueue(mgr.hm) + go downloader.processQueue() } } -func (mgr *downloadManager) newSlabDownload(ctx context.Context, slice object.SlabSlice, migration bool) *slabDownload { +func (mgr *downloadManager) newSlabDownload(slice object.SlabSlice, migration bool) *slabDownload { // calculate the offset and length offset, length := slice.SectorRegion() @@ -529,7 +529,7 @@ func (mgr *downloadManager) newSlabDownload(ctx context.Context, slice object.Sl func (mgr *downloadManager) downloadSlab(ctx context.Context, slice object.SlabSlice, migration bool) ([][]byte, bool, error) { // prepare new download - slab := mgr.newSlabDownload(ctx, slice, migration) + slab := mgr.newSlabDownload(slice, migration) // execute download return slab.download(ctx) diff --git a/worker/downloader.go b/worker/downloader.go index 24be245fc..46dac61e3 100644 --- a/worker/downloader.go +++ b/worker/downloader.go @@ -245,7 +245,7 @@ func (d *downloader) processBatch(batch []*sectorDownloadReq) chan struct{} { return doneChan } -func (d *downloader) processQueue(hp HostManager) { +func (d *downloader) processQueue() { outer: for { // wait for work diff --git a/worker/rhpv2.go b/worker/rhpv2.go index 02cdce4ff..9f05904a4 100644 --- a/worker/rhpv2.go +++ b/worker/rhpv2.go @@ -277,7 +277,7 @@ func (w *worker) FetchSignedRevision(ctx context.Context, hostIP string, hostKey func (w *worker) PruneContract(ctx context.Context, hostIP string, hostKey types.PublicKey, fcid types.FileContractID, lastKnownRevisionNumber uint64) (deleted, remaining uint64, err error) { err = w.withContractLock(ctx, fcid, lockingPriorityPruning, func() error { return w.withTransportV2(ctx, hostKey, hostIP, func(t *rhpv2.Transport) error { - return w.withRevisionV2(ctx, defaultLockTimeout, t, hostKey, fcid, lastKnownRevisionNumber, func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) (err error) { + return w.withRevisionV2(defaultLockTimeout, t, hostKey, fcid, lastKnownRevisionNumber, func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) (err error) { // perform gouging checks gc, err := GougingCheckerFromContext(ctx, false) if err != nil { @@ -510,7 +510,7 @@ func (w *worker) deleteContractRoots(t *rhpv2.Transport, rev *rhpv2.ContractRevi func (w *worker) FetchContractRoots(ctx context.Context, hostIP string, hostKey types.PublicKey, fcid types.FileContractID, lastKnownRevisionNumber uint64) (roots []types.Hash256, err error) { err = w.withTransportV2(ctx, hostKey, hostIP, func(t *rhpv2.Transport) error { - return w.withRevisionV2(ctx, defaultLockTimeout, t, hostKey, fcid, lastKnownRevisionNumber, func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) (err error) { + return w.withRevisionV2(defaultLockTimeout, t, hostKey, fcid, lastKnownRevisionNumber, func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) (err error) { gc, err := GougingCheckerFromContext(ctx, false) if err != nil { return err @@ -641,7 +641,7 @@ func (w *worker) withTransportV2(ctx context.Context, hostKey types.PublicKey, h return fn(t) } -func (w *worker) withRevisionV2(ctx context.Context, lockTimeout time.Duration, t *rhpv2.Transport, hk types.PublicKey, fcid types.FileContractID, lastKnownRevisionNumber uint64, fn func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) error) error { +func (w *worker) withRevisionV2(lockTimeout time.Duration, t *rhpv2.Transport, hk types.PublicKey, fcid types.FileContractID, lastKnownRevisionNumber uint64, fn func(t *rhpv2.Transport, rev rhpv2.ContractRevision, settings rhpv2.HostSettings) error) error { renterKey := w.deriveRenterKey(hk) // execute lock RPC diff --git a/worker/rhpv3.go b/worker/rhpv3.go index 203d2c3da..8db6dc9d5 100644 --- a/worker/rhpv3.go +++ b/worker/rhpv3.go @@ -214,7 +214,7 @@ type transportPoolV3 struct { pool map[string]*transportV3 } -func newTransportPoolV3(w *worker) *transportPoolV3 { +func newTransportPoolV3() *transportPoolV3 { return &transportPoolV3{ pool: make(map[string]*transportV3), } @@ -402,7 +402,7 @@ func (w *worker) initTransportPool() { if w.transportPoolV3 != nil { panic("transport pool already initialized") // developer error } - w.transportPoolV3 = newTransportPoolV3(w) + w.transportPoolV3 = newTransportPoolV3() } // ForHost returns an account to use for a given host. If the account diff --git a/worker/upload.go b/worker/upload.go index ab84e2b37..d146b920e 100644 --- a/worker/upload.go +++ b/worker/upload.go @@ -404,7 +404,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []a } // create the upload - upload, err := mgr.newUpload(ctx, up.rs.TotalShards, contracts, up.bh, lockPriority) + upload, err := mgr.newUpload(up.rs.TotalShards, contracts, up.bh, lockPriority) if err != nil { return false, "", err } @@ -565,7 +565,7 @@ func (mgr *uploadManager) UploadPackedSlab(ctx context.Context, rs api.Redundanc shards := encryptPartialSlab(ps.Data, ps.Key, uint8(rs.MinShards), uint8(rs.TotalShards)) // create the upload - upload, err := mgr.newUpload(ctx, len(shards), contracts, bh, lockPriority) + upload, err := mgr.newUpload(len(shards), contracts, bh, lockPriority) if err != nil { return err } @@ -610,7 +610,7 @@ func (mgr *uploadManager) UploadShards(ctx context.Context, s *object.Slab, shar defer cancel() // create the upload - upload, err := mgr.newUpload(ctx, len(shards), contracts, bh, lockPriority) + upload, err := mgr.newUpload(len(shards), contracts, bh, lockPriority) if err != nil { return err } @@ -682,7 +682,7 @@ func (mgr *uploadManager) candidates(allowed map[types.PublicKey]struct{}) (cand return } -func (mgr *uploadManager) newUpload(ctx context.Context, totalShards int, contracts []api.ContractMetadata, bh uint64, lockPriority int) (*upload, error) { +func (mgr *uploadManager) newUpload(totalShards int, contracts []api.ContractMetadata, bh uint64, lockPriority int) (*upload, error) { mgr.mu.Lock() defer mgr.mu.Unlock() diff --git a/worker/worker.go b/worker/worker.go index 99323b501..0868c347c 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -43,7 +43,6 @@ const ( lockingPriorityActiveContractRevision = 100 lockingPriorityRenew = 80 - lockingPriorityPriceTable = 60 lockingPriorityFunding = 40 lockingPrioritySyncing = 30 lockingPriorityPruning = 20 @@ -1546,7 +1545,7 @@ func discardTxnOnErr(ctx context.Context, bus Bus, l *zap.SugaredLogger, txn typ ctx, cancel := context.WithTimeout(ctx, 10*time.Second) if dErr := bus.WalletDiscard(ctx, txn); dErr != nil { - l.Errorf("%w: failed to discard txn: %v", *err, dErr) + l.Errorf("%w: %v, failed to discard txn: %v", *err, errContext, dErr) } cancel() }