diff --git a/autopilot/contractor/alerts.go b/autopilot/contractor/alerts.go index d242bc8f5..661d5393a 100644 --- a/autopilot/contractor/alerts.go +++ b/autopilot/contractor/alerts.go @@ -8,6 +8,13 @@ import ( "go.sia.tech/renterd/api" ) +const ( + // alertLostSectorsThresholdPct defines the the threshold at which we + // register the lost sectors alert. A value of 0.01 means that we register + // the alert if the host lost 1% (or more) of its stored data. + alertLostSectorsThresholdPct = 0.01 +) + var ( alertChurnID = alerts.RandomAlertID() // constant until restarted alertLostSectorsID = alerts.RandomAlertID() // constant until restarted @@ -47,3 +54,7 @@ func newLostSectorsAlert(hk types.PublicKey, lostSectors uint64) alerts.Alert { Timestamp: time.Now(), } } + +func registerLostSectorsAlert(dataLost, dataStored uint64) bool { + return dataLost > 0 && float64(dataLost) >= float64(dataStored)*alertLostSectorsThresholdPct +} diff --git a/autopilot/contractor/alerts_test.go b/autopilot/contractor/alerts_test.go new file mode 100644 index 000000000..489e2c43b --- /dev/null +++ b/autopilot/contractor/alerts_test.go @@ -0,0 +1,26 @@ +package contractor + +import ( + "testing" + + rhpv2 "go.sia.tech/core/rhp/v2" +) + +func TestRegisterLostSectorsAlert(t *testing.T) { + for _, tc := range []struct { + dataLost uint64 + dataStored uint64 + expected bool + }{ + {0, 0, false}, + {0, rhpv2.SectorSize, false}, + {rhpv2.SectorSize, 0, true}, + {rhpv2.SectorSize, 99 * rhpv2.SectorSize, true}, + {rhpv2.SectorSize, 100 * rhpv2.SectorSize, true}, // exactly 1% + {rhpv2.SectorSize, 101 * rhpv2.SectorSize, false}, // just short of 1% + } { + if result := registerLostSectorsAlert(tc.dataLost, tc.dataStored); result != tc.expected { + t.Fatalf("unexpected result for dataLost=%d, dataStored=%d: %v", tc.dataLost, tc.dataStored, result) + } + } +} diff --git a/autopilot/contractor/contractor.go b/autopilot/contractor/contractor.go index a27e741f8..842cdb547 100644 --- a/autopilot/contractor/contractor.go +++ b/autopilot/contractor/contractor.go @@ -298,7 +298,7 @@ func (c *Contractor) performContractMaintenance(ctx *mCtx, w Worker) (bool, erro // check if any used hosts have lost data to warn the user var toDismiss []types.Hash256 for _, h := range hosts { - if h.Interactions.LostSectors > 0 { + if registerLostSectorsAlert(h.Interactions.LostSectors*rhpv2.SectorSize, h.StoredData) { c.alerter.RegisterAlert(ctx, newLostSectorsAlert(h.PublicKey, h.Interactions.LostSectors)) } else { toDismiss = append(toDismiss, alerts.IDForHost(alertLostSectorsID, h.PublicKey)) diff --git a/go.mod b/go.mod index e10febd5a..87a484dcb 100644 --- a/go.mod +++ b/go.mod @@ -20,7 +20,7 @@ require ( go.sia.tech/jape v0.11.2-0.20240124024603-93559895d640 go.sia.tech/mux v1.2.0 go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca - go.sia.tech/web/renterd v0.51.0 + go.sia.tech/web/renterd v0.51.1 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.22.0 golang.org/x/sys v0.19.0 diff --git a/go.sum b/go.sum index eb5afaf45..b93675f2d 100644 --- a/go.sum +++ b/go.sum @@ -1657,8 +1657,8 @@ go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca h1:aZMg2AKevn7jKx+wlusWQf go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca/go.mod h1:h/1afFwpxzff6/gG5i1XdAgPK7dEY6FaibhK7N5F86Y= go.sia.tech/web v0.0.0-20240403135501-82ff3a2a3e7c h1:os2ZFJojHi0ckCNbr8c2GnWGm0ftvHkQUJOfBRGGIfk= go.sia.tech/web v0.0.0-20240403135501-82ff3a2a3e7c/go.mod h1:nGEhGmI8zV/BcC3LOCC5JLVYpidNYJIvLGIqVRWQBCg= -go.sia.tech/web/renterd v0.51.0 h1:hQfq6vOMll2lseQMaK9tUtc6RscO3zgLOzhCk9myHTk= -go.sia.tech/web/renterd v0.51.0/go.mod h1:FgXrdmAnu591a3h96RB/15pMZ74xO9457g902uE06BM= +go.sia.tech/web/renterd v0.51.1 h1:R+EvTdmyCrN7P4tjATyjUAUKCLMu7CmdXYAqb3eON9w= +go.sia.tech/web/renterd v0.51.1/go.mod h1:FgXrdmAnu591a3h96RB/15pMZ74xO9457g902uE06BM= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= diff --git a/stores/hostdb_test.go b/stores/hostdb_test.go index 6adf19968..ca3c07130 100644 --- a/stores/hostdb_test.go +++ b/stores/hostdb_test.go @@ -34,6 +34,7 @@ func (s *SQLStore) insertTestAnnouncement(hk types.PublicKey, a hostdb.Announcem // SQLite DB. func TestSQLHostDB(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() if ss.ccid != modules.ConsensusChangeBeginning { t.Fatal("wrong ccid", ss.ccid, modules.ConsensusChangeBeginning) } diff --git a/stores/metadata.go b/stores/metadata.go index 87a63c7bc..4d8b6e097 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -1813,11 +1813,11 @@ func (s *SQLStore) UpdateObject(ctx context.Context, bucket, path, contractSet, }) } -func (s *SQLStore) RemoveObject(ctx context.Context, bucket, key string) error { +func (s *SQLStore) RemoveObject(ctx context.Context, bucket, path string) error { var rowsAffected int64 var err error err = s.retryTransaction(ctx, func(tx *gorm.DB) error { - rowsAffected, err = s.deleteObject(tx, bucket, key) + rowsAffected, err = s.deleteObject(tx, bucket, path) if err != nil { return fmt.Errorf("RemoveObject: failed to delete object: %w", err) } @@ -1827,7 +1827,7 @@ func (s *SQLStore) RemoveObject(ctx context.Context, bucket, key string) error { return err } if rowsAffected == 0 { - return fmt.Errorf("%w: key: %s", api.ErrObjectNotFound, key) + return fmt.Errorf("%w: key: %s", api.ErrObjectNotFound, path) } return nil } @@ -2748,6 +2748,10 @@ func (s *SQLStore) pruneSlabsLoop() { }) } else { s.alerts.DismissAlerts(s.shutdownCtx, pruneSlabsAlertID) + + s.mu.Lock() + s.lastPrunedAt = time.Now() + s.mu.Unlock() } cancel() } diff --git a/stores/metadata_test.go b/stores/metadata_test.go index abda57b95..aa12e3fb8 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -18,12 +18,52 @@ import ( rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/internal/test" "go.sia.tech/renterd/object" "gorm.io/gorm" "gorm.io/gorm/schema" "lukechampine.com/frand" ) +func (s *SQLStore) RemoveObjectBlocking(ctx context.Context, bucket, path string) error { + ts := time.Now() + if err := s.RemoveObject(ctx, bucket, path); err != nil { + return err + } + return s.waitForPruneLoop(ts) +} + +func (s *SQLStore) RemoveObjectsBlocking(ctx context.Context, bucket, prefix string) error { + ts := time.Now() + if err := s.RemoveObjects(ctx, bucket, prefix); err != nil { + return err + } + return s.waitForPruneLoop(ts) +} + +func (s *SQLStore) UpdateObjectBlocking(ctx context.Context, bucket, path, contractSet, eTag, mimeType string, metadata api.ObjectUserMetadata, o object.Object) error { + var ts time.Time + _, err := s.Object(ctx, bucket, path) + if err == nil { + ts = time.Now() + } + if err := s.UpdateObject(ctx, bucket, path, contractSet, eTag, mimeType, metadata, o); err != nil { + return err + } + return s.waitForPruneLoop(ts) +} + +func (s *SQLStore) waitForPruneLoop(ts time.Time) error { + return test.Retry(100, 100*time.Millisecond, func() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.lastPrunedAt.Before(ts) { + return errors.New("slabs have not been pruned yet") + } + return nil + }) +} + func randomMultisigUC() types.UnlockConditions { uc := types.UnlockConditions{ PublicKeys: make([]types.UnlockKey, 2), @@ -194,7 +234,7 @@ func TestObjectMetadata(t *testing.T) { } // remove the object - if err := ss.RemoveObject(context.Background(), api.DefaultBucketName, t.Name()); err != nil { + if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, t.Name()); err != nil { t.Fatal(err) } @@ -1183,8 +1223,7 @@ func TestSQLMetadataStore(t *testing.T) { fullObj, err = ss.addTestObject(objID, obj1) if err != nil { t.Fatal(err) - } - if !reflect.DeepEqual(*fullObj.Object, obj1) { + } else if !reflect.DeepEqual(*fullObj.Object, obj1) { t.Fatal("object mismatch") } @@ -1219,18 +1258,18 @@ func TestSQLMetadataStore(t *testing.T) { } return nil } - ss.Retry(100, 100*time.Millisecond, func() error { - return countCheck(1, 1, 1, 1) - }) + if err := countCheck(1, 1, 1, 1); err != nil { + t.Fatal(err) + } // Delete the object. Due to the cascade this should delete everything // but the sectors. - if err := ss.RemoveObject(ctx, api.DefaultBucketName, objID); err != nil { + if err := ss.RemoveObjectBlocking(ctx, api.DefaultBucketName, objID); err != nil { + t.Fatal(err) + } + if err := countCheck(0, 0, 0, 0); err != nil { t.Fatal(err) } - ss.Retry(100, 100*time.Millisecond, func() error { - return countCheck(0, 0, 0, 0) - }) } // TestObjectHealth verifies the object's health is returned correctly by all @@ -2030,7 +2069,7 @@ func TestContractSectors(t *testing.T) { } // Delete the object. - if err := ss.RemoveObject(context.Background(), api.DefaultBucketName, t.Name()); err != nil { + if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, t.Name()); err != nil { t.Fatal(err) } @@ -2942,37 +2981,30 @@ func TestContractSizes(t *testing.T) { } // remove the first object - if err := ss.RemoveObject(context.Background(), api.DefaultBucketName, "obj_1"); err != nil { + if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, "obj_1"); err != nil { t.Fatal(err) } // assert there's one sector that can be pruned and assert it's from fcid 1 - ss.Retry(100, 100*time.Millisecond, func() error { - if n := prunableData(nil); n != rhpv2.SectorSize { - return fmt.Errorf("unexpected amount of prunable data %v", n) - } - if n := prunableData(&fcids[1]); n != 0 { - return fmt.Errorf("expected no prunable data %v", n) - } - return nil - }) + if n := prunableData(nil); n != rhpv2.SectorSize { + t.Fatalf("unexpected amount of prunable data %v", n) + } else if n := prunableData(&fcids[1]); n != 0 { + t.Fatalf("expected no prunable data %v", n) + } // remove the second object - if err := ss.RemoveObject(context.Background(), api.DefaultBucketName, "obj_2"); err != nil { + if err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, "obj_2"); err != nil { t.Fatal(err) } // assert there's now two sectors that can be pruned - ss.Retry(100, 100*time.Millisecond, func() error { - if n := prunableData(nil); n != rhpv2.SectorSize*2 { - return fmt.Errorf("unexpected amount of prunable data %v", n) - } else if n := prunableData(&fcids[0]); n != rhpv2.SectorSize { - return fmt.Errorf("unexpected amount of prunable data %v", n) - } else if n := prunableData(&fcids[1]); n != rhpv2.SectorSize { - return fmt.Errorf("unexpected amount of prunable data %v", n) - } - return nil - }) + if n := prunableData(nil); n != rhpv2.SectorSize*2 { + t.Fatalf("unexpected amount of prunable data %v", n) + } else if n := prunableData(&fcids[0]); n != rhpv2.SectorSize { + t.Fatalf("unexpected amount of prunable data %v", n) + } else if n := prunableData(&fcids[1]); n != rhpv2.SectorSize { + t.Fatalf("unexpected amount of prunable data %v", n) + } if size, err := ss.ContractSize(context.Background(), fcids[0]); err != nil { t.Fatal("unexpected err", err) @@ -3244,9 +3276,9 @@ func TestBucketObjects(t *testing.T) { } // Delete foo/baz in bucket 1 but first try bucket 2 since that should fail. - if err := ss.RemoveObject(context.Background(), b2, "/foo/baz"); !errors.Is(err, api.ErrObjectNotFound) { + if err := ss.RemoveObjectBlocking(context.Background(), b2, "/foo/baz"); !errors.Is(err, api.ErrObjectNotFound) { t.Fatal(err) - } else if err := ss.RemoveObject(context.Background(), b1, "/foo/baz"); err != nil { + } else if err := ss.RemoveObjectBlocking(context.Background(), b1, "/foo/baz"); err != nil { t.Fatal(err) } else if entries, _, err := ss.ObjectEntries(context.Background(), b1, "/foo/", "", "", "", "", 0, -1); err != nil { t.Fatal(err) @@ -3263,7 +3295,7 @@ func TestBucketObjects(t *testing.T) { t.Fatal(err) } else if len(entries) != 2 { t.Fatal("expected 2 entries", len(entries)) - } else if err := ss.RemoveObjects(context.Background(), b2, "/"); err != nil { + } else if err := ss.RemoveObjectsBlocking(context.Background(), b2, "/"); err != nil { t.Fatal(err) } else if entries, _, err := ss.ObjectEntries(context.Background(), b2, "/", "", "", "", "", 0, -1); err != nil { t.Fatal(err) @@ -3349,6 +3381,7 @@ func TestCopyObject(t *testing.T) { func TestMarkSlabUploadedAfterRenew(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() // create host. hks, err := ss.addTestHosts(1) @@ -3662,6 +3695,7 @@ func newTestShard(hk types.PublicKey, fcid types.FileContractID, root types.Hash func TestUpdateSlabSanityChecks(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() // create hosts and contracts. hks, err := ss.addTestHosts(5) @@ -3726,8 +3760,7 @@ func TestUpdateSlabSanityChecks(t *testing.T) { func TestSlabHealthInvalidation(t *testing.T) { // create db - cfg := defaultTestSQLStoreConfig - ss := newTestSQLStore(t, cfg) + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) defer ss.Close() // define a helper to assert the health validity of a given slab @@ -4093,37 +4126,29 @@ func TestSlabCleanup(t *testing.T) { } // delete the object - err := ss.RemoveObject(context.Background(), api.DefaultBucketName, obj1.ObjectID) + err := ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, obj1.ObjectID) if err != nil { t.Fatal(err) } // check slice count var slabCntr int64 - ss.Retry(100, 100*time.Millisecond, func() error { - if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { - return err - } else if slabCntr != 1 { - return fmt.Errorf("expected 1 slabs, got %v", slabCntr) - } - return nil - }) + if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { + t.Fatal(err) + } else if slabCntr != 1 { + t.Fatalf("expected 1 slabs, got %v", slabCntr) + } // delete second object - err = ss.RemoveObject(context.Background(), api.DefaultBucketName, obj2.ObjectID) + err = ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, obj2.ObjectID) if err != nil { t.Fatal(err) + } else if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { + t.Fatal(err) + } else if slabCntr != 0 { + t.Fatalf("expected 0 slabs, got %v", slabCntr) } - ss.Retry(100, 100*time.Millisecond, func() error { - if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { - return err - } else if slabCntr != 0 { - return fmt.Errorf("expected 0 slabs, got %v", slabCntr) - } - return nil - }) - // create another object that references a slab with buffer ek, _ = object.GenerateEncryptionKey().MarshalBinary() bufferedSlab := dbSlab{ @@ -4158,19 +4183,14 @@ func TestSlabCleanup(t *testing.T) { } // delete third object - err = ss.RemoveObject(context.Background(), api.DefaultBucketName, obj3.ObjectID) + err = ss.RemoveObjectBlocking(context.Background(), api.DefaultBucketName, obj3.ObjectID) if err != nil { t.Fatal(err) + } else if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { + t.Fatal(err) + } else if slabCntr != 1 { + t.Fatalf("expected 1 slabs, got %v", slabCntr) } - - ss.Retry(100, 100*time.Millisecond, func() error { - if err := ss.db.Model(&dbSlab{}).Count(&slabCntr).Error; err != nil { - return err - } else if slabCntr != 1 { - return fmt.Errorf("expected 1 slabs, got %v", slabCntr) - } - return nil - }) } func TestUpsertSectors(t *testing.T) { @@ -4573,8 +4593,6 @@ func TestTypeCurrency(t *testing.T) { // same transaction, deadlocks become more likely due to the gap locks MySQL // uses. func TestUpdateObjectParallel(t *testing.T) { - cfg := defaultTestSQLStoreConfig - dbURI, _, _, _ := DBConfigFromEnv() if dbURI == "" { // it's pretty much impossile to optimise for both sqlite and mysql at @@ -4584,7 +4602,7 @@ func TestUpdateObjectParallel(t *testing.T) { // can revisit this t.SkipNow() } - ss := newTestSQLStore(t, cfg) + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) ss.retryTransactionIntervals = []time.Duration{0} // don't retry defer ss.Close() diff --git a/stores/multipart_test.go b/stores/multipart_test.go index 50272fcda..4bd202a51 100644 --- a/stores/multipart_test.go +++ b/stores/multipart_test.go @@ -85,6 +85,7 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) { } // Complete the upload. Check that the number of slices stays the same. + ts := time.Now() var nSlicesBefore int64 var nSlicesAfter int64 if err := ss.db.Model(&dbSlice{}).Count(&nSlicesBefore).Error; err != nil { @@ -97,6 +98,8 @@ func TestMultipartUploadWithUploadPackingRegression(t *testing.T) { t.Fatal(err) } else if nSlicesBefore != nSlicesAfter { t.Fatalf("expected number of slices to stay the same, but got %v before and %v after", nSlicesBefore, nSlicesAfter) + } else if err := ss.waitForPruneLoop(ts); err != nil { + t.Fatal(err) } // Fetch the object. diff --git a/stores/sql.go b/stores/sql.go index ce4a4f3ec..c46dc7fe3 100644 --- a/stores/sql.go +++ b/stores/sql.go @@ -106,6 +106,7 @@ type ( mu sync.Mutex allowListCnt uint64 blockListCnt uint64 + lastPrunedAt time.Time closed bool knownContracts map[types.FileContractID]struct{} @@ -275,6 +276,7 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) { ID: types.BlockID(ci.BlockID), }, + lastPrunedAt: time.Now(), retryTransactionIntervals: cfg.RetryTransactionIntervals, shutdownCtx: shutdownCtx, diff --git a/stores/sql_test.go b/stores/sql_test.go index 01ce8fe6b..18b7f5609 100644 --- a/stores/sql_test.go +++ b/stores/sql_test.go @@ -215,7 +215,7 @@ func newTestLogger() logger.Interface { } func (s *testSQLStore) addTestObject(path string, o object.Object) (api.Object, error) { - if err := s.UpdateObject(context.Background(), api.DefaultBucketName, path, testContractSet, testETag, testMimeType, testMetadata, o); err != nil { + if err := s.UpdateObjectBlocking(context.Background(), api.DefaultBucketName, path, testContractSet, testETag, testMimeType, testMetadata, o); err != nil { return api.Object{}, err } else if obj, err := s.Object(context.Background(), api.DefaultBucketName, path); err != nil { return api.Object{}, err