From 866bf8fbdc35979c44e1eadbdeecf9d4589dd53c Mon Sep 17 00:00:00 2001 From: Christopher Schinnerl Date: Fri, 21 Jun 2024 11:36:34 +0200 Subject: [PATCH] Migrate LoadSlabBuffer to raw SQL (#1323) --- stores/metadata.go | 11 ----- stores/metadata_test.go | 28 +++++++----- stores/slabbuffer.go | 92 +++++++++++++++++---------------------- stores/slabbuffer_test.go | 4 +- stores/sql.go | 2 +- stores/sql/database.go | 13 ++++++ stores/sql/main.go | 67 ++++++++++++++++++++++++++++ stores/sql/mysql/main.go | 4 ++ stores/sql/sqlite/main.go | 4 ++ 9 files changed, 146 insertions(+), 79 deletions(-) diff --git a/stores/metadata.go b/stores/metadata.go index db5d771e2..9a649cde8 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -187,14 +187,6 @@ type ( Shards []dbSector `gorm:"constraint:OnDelete:CASCADE"` // CASCADE to delete shards too } - dbBufferedSlab struct { - Model - - DBSlab dbSlab - - Filename string - } - dbSector struct { Model @@ -330,9 +322,6 @@ func (dbSector) TableName() string { return "sectors" } // TableName implements the gorm.Tabler interface. func (dbSlab) TableName() string { return "slabs" } -// TableName implements the gorm.Tabler interface. -func (dbBufferedSlab) TableName() string { return "buffered_slabs" } - // TableName implements the gorm.Tabler interface. func (dbSlice) TableName() string { return "slices" } diff --git a/stores/metadata_test.go b/stores/metadata_test.go index ab7bd0d8c..dc991ec02 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -2722,7 +2722,13 @@ func TestPartialSlab(t *testing.T) { t.Fatal("wrong data") } - var buffer dbBufferedSlab + type bufferedSlab struct { + ID uint + DBSlab dbSlab `gorm:"foreignKey:DBBufferedSlabID"` + Filename string + } + + var buffer bufferedSlab sk, _ := slabs[0].Key.MarshalBinary() if err := ss.db.Joins("DBSlab").Take(&buffer, "DBSlab.key = ?", secretKey(sk)).Error; err != nil { t.Fatal(err) @@ -2784,7 +2790,7 @@ func TestPartialSlab(t *testing.T) { } else if !bytes.Equal(data, slab2Data) { t.Fatal("wrong data") } - buffer = dbBufferedSlab{} + buffer = bufferedSlab{} sk, _ = slabs[0].Key.MarshalBinary() if err := ss.db.Joins("DBSlab").Take(&buffer, "DBSlab.key = ?", secretKey(sk)).Error; err != nil { t.Fatal(err) @@ -2825,13 +2831,13 @@ func TestPartialSlab(t *testing.T) { } else if !bytes.Equal(slab3Data, append(data1, data2...)) { t.Fatal("wrong data") } - buffer = dbBufferedSlab{} + buffer = bufferedSlab{} sk, _ = slabs[0].Key.MarshalBinary() if err := ss.db.Joins("DBSlab").Take(&buffer, "DBSlab.key = ?", secretKey(sk)).Error; err != nil { t.Fatal(err) } assertBuffer(buffer1Name, rhpv2.SectorSize, true, false) - buffer = dbBufferedSlab{} + buffer = bufferedSlab{} sk, _ = slabs[1].Key.MarshalBinary() if err := ss.db.Joins("DBSlab").Take(&buffer, "DBSlab.key = ?", secretKey(sk)).Error; err != nil { t.Fatal(err) @@ -2860,11 +2866,11 @@ func TestPartialSlab(t *testing.T) { assertBuffer(buffer1Name, rhpv2.SectorSize, true, true) assertBuffer(buffer2Name, 1, false, false) - var foo []dbBufferedSlab + var foo []bufferedSlab if err := ss.db.Find(&foo).Error; err != nil { t.Fatal(err) } - buffer = dbBufferedSlab{} + buffer = bufferedSlab{} if err := ss.db.Take(&buffer, "id = ?", packedSlabs[0].BufferID).Error; err != nil { t.Fatal(err) } @@ -2883,7 +2889,7 @@ func TestPartialSlab(t *testing.T) { t.Fatal(err) } - buffer = dbBufferedSlab{} + buffer = bufferedSlab{} if err := ss.db.Take(&buffer, "id = ?", packedSlabs[0].BufferID).Error; !errors.Is(err, gorm.ErrRecordNotFound) { t.Fatal("shouldn't be able to find buffer", err) } @@ -4131,10 +4137,8 @@ func TestSlabCleanup(t *testing.T) { } // create buffered slab - bs := dbBufferedSlab{ - Filename: "foo", - } - if err := ss.db.Create(&bs).Error; err != nil { + bsID := uint(1) + if err := ss.db.Exec("INSERT INTO buffered_slabs (filename) VALUES ('foo');").Error; err != nil { t.Fatal(err) } @@ -4223,7 +4227,7 @@ func TestSlabCleanup(t *testing.T) { // create another object that references a slab with buffer ek, _ = object.GenerateEncryptionKey().MarshalBinary() bufferedSlab := dbSlab{ - DBBufferedSlabID: bs.ID, + DBBufferedSlabID: bsID, DBContractSet: cs, Health: 1, Key: ek, diff --git a/stores/slabbuffer.go b/stores/slabbuffer.go index 5afca267e..77e2574a3 100644 --- a/stores/slabbuffer.go +++ b/stores/slabbuffer.go @@ -17,6 +17,7 @@ import ( "go.sia.tech/renterd/api" "go.sia.tech/renterd/object" sql "go.sia.tech/renterd/stores/sql" + "go.uber.org/zap" "lukechampine.com/frand" ) @@ -40,9 +41,11 @@ type SlabBuffer struct { type bufferGroupID [6]byte type SlabBufferManager struct { + alerts alerts.Alerter bufferedSlabCompletionThreshold int64 + db sql.Database dir string - s *SQLStore + logger *zap.SugaredLogger mu sync.Mutex completeBuffers map[bufferGroupID][]*SlabBuffer @@ -50,84 +53,67 @@ type SlabBufferManager struct { buffersByKey map[string]*SlabBuffer } -func newSlabBufferManager(sqlStore *SQLStore, slabBufferCompletionThreshold int64, partialSlabDir string) (*SlabBufferManager, error) { +func newSlabBufferManager(ctx context.Context, a alerts.Alerter, db sql.Database, logger *zap.SugaredLogger, slabBufferCompletionThreshold int64, partialSlabDir string) (*SlabBufferManager, error) { if slabBufferCompletionThreshold < 0 || slabBufferCompletionThreshold > 1<<22 { return nil, fmt.Errorf("invalid slabBufferCompletionThreshold %v", slabBufferCompletionThreshold) } // load existing buffers - var buffers []dbBufferedSlab - err := sqlStore.db. - Joins("DBSlab"). - Find(&buffers). - Error + buffers, orphans, err := db.LoadSlabBuffers(ctx) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to load slab buffers: %w", err) } + mgr := &SlabBufferManager{ + alerts: a, bufferedSlabCompletionThreshold: slabBufferCompletionThreshold, + db: db, dir: partialSlabDir, - s: sqlStore, - completeBuffers: make(map[bufferGroupID][]*SlabBuffer), - incompleteBuffers: make(map[bufferGroupID][]*SlabBuffer), - buffersByKey: make(map[string]*SlabBuffer), + logger: logger, + + completeBuffers: make(map[bufferGroupID][]*SlabBuffer), + incompleteBuffers: make(map[bufferGroupID][]*SlabBuffer), + buffersByKey: make(map[string]*SlabBuffer), } - for _, buffer := range buffers { - if buffer.DBSlab.ID == 0 { - // Buffer doesn't have a slab. We can delete it. - sqlStore.logger.Warn(fmt.Sprintf("buffer %v has no associated slab, deleting it", buffer.Filename)) - if err := sqlStore.db.Delete(&buffer).Error; err != nil { - return nil, fmt.Errorf("failed to delete buffer %v: %v", buffer.ID, err) - } - if err := os.RemoveAll(filepath.Join(partialSlabDir, buffer.Filename)); err != nil { - return nil, fmt.Errorf("failed to remove buffer file %v: %v", buffer.Filename, err) - } - continue - } - // Get the encryption key. - var ec object.EncryptionKey - if err := ec.UnmarshalBinary(buffer.DBSlab.Key); err != nil { - return nil, err + + for _, orphan := range orphans { + // Buffer doesn't have a slab. We can delete it. + logger.Warn(fmt.Sprintf("buffer '%v' has no associated slab, deleting it", orphan)) + if err := os.RemoveAll(filepath.Join(partialSlabDir, orphan)); err != nil { + return nil, fmt.Errorf("failed to remove buffer file %v: %v", orphan, err) } + } + + for _, buffer := range buffers { // Open the file. file, err := os.OpenFile(filepath.Join(partialSlabDir, buffer.Filename), os.O_RDWR, 0600) if err != nil { - _ = sqlStore.alerts.RegisterAlert(sqlStore.shutdownCtx, alerts.Alert{ + _ = a.RegisterAlert(ctx, alerts.Alert{ ID: types.HashBytes([]byte(buffer.Filename)), Severity: alerts.SeverityCritical, Message: "failed to read buffer file on startup", Data: map[string]interface{}{ "filename": buffer.Filename, - "slabKey": ec, + "slabKey": buffer.Key, }, Timestamp: time.Now(), }) - sqlStore.logger.Errorf("failed to open buffer file %v for slab %v: %v", buffer.Filename, buffer.DBSlab.Key, err) + logger.Errorf("failed to open buffer file %v for slab %v: %v", buffer.Filename, buffer.Key, err) continue } - // Get the size of the buffer by looking at all slices using it - var size int64 - err = sqlStore.db.Model(&dbSlab{}). - Joins("INNER JOIN slices sli ON slabs.id = sli.db_slab_id"). - Select("COALESCE(MAX(offset+length), 0) as Size"). - Where("slabs.db_buffered_slab_id = ?", buffer.ID). - Scan(&size). - Error - if err != nil { - return nil, err - } + // Create the slab buffer. sb := &SlabBuffer{ - dbID: buffer.ID, + dbID: uint(buffer.ID), filename: buffer.Filename, - slabKey: ec, - maxSize: int64(bufferedSlabSize(buffer.DBSlab.MinShards)), + slabKey: buffer.Key, + maxSize: int64(bufferedSlabSize(buffer.MinShards)), file: file, - size: size, + size: buffer.Size, } // Add the buffer to the manager. - gid := bufferGID(buffer.DBSlab.MinShards, buffer.DBSlab.TotalShards, uint32(buffer.DBSlab.DBContractSetID)) - if size >= int64(sb.maxSize-slabBufferCompletionThreshold) { + gid := bufferGID(buffer.MinShards, buffer.TotalShards, uint32(buffer.ContractSetID)) + if sb.size >= int64(sb.maxSize-slabBufferCompletionThreshold) { mgr.completeBuffers[gid] = append(mgr.completeBuffers[gid], sb) } else { mgr.incompleteBuffers[gid] = append(mgr.incompleteBuffers[gid], sb) @@ -204,7 +190,7 @@ func (mgr *SlabBufferManager) AddPartialSlab(ctx context.Context, data []byte, m // If there is still data left, create a new buffer. if len(data) > 0 { var sb *SlabBuffer - err = mgr.s.bMain.Transaction(ctx, func(tx sql.DatabaseTx) error { + err = mgr.db.Transaction(ctx, func(tx sql.DatabaseTx) error { sb, err = createSlabBuffer(ctx, tx, contractSet, mgr.dir, minShards, totalShards) return err }) @@ -319,7 +305,7 @@ func (mgr *SlabBufferManager) SlabsForUpload(ctx context.Context, lockingDuratio data := make([]byte, buffer.size) _, err := buffer.file.ReadAt(data, 0) if err != nil { - mgr.s.alerts.RegisterAlert(ctx, alerts.Alert{ + mgr.alerts.RegisterAlert(ctx, alerts.Alert{ ID: types.HashBytes([]byte(buffer.filename)), Severity: alerts.SeverityCritical, Message: "failed to read data from buffer", @@ -329,7 +315,7 @@ func (mgr *SlabBufferManager) SlabsForUpload(ctx context.Context, lockingDuratio }, Timestamp: time.Now(), }) - mgr.s.logger.Error(ctx, fmt.Sprintf("failed to read buffer %v: %s", buffer.filename, err)) + mgr.logger.Error(ctx, fmt.Sprintf("failed to read buffer %v: %s", buffer.filename, err)) return nil, err } slabs = append(slabs, api.PackedSlab{ @@ -361,9 +347,9 @@ func (mgr *SlabBufferManager) RemoveBuffers(fileNames ...string) { // an error because the buffers are not meant to be used anymore // anyway. if err := buffers[i].file.Close(); err != nil { - mgr.s.logger.Errorf("failed to close buffer %v: %v", buffers[i].filename, err) + mgr.logger.Errorf("failed to close buffer %v: %v", buffers[i].filename, err) } else if err := os.RemoveAll(filepath.Join(mgr.dir, buffers[i].filename)); err != nil { - mgr.s.logger.Errorf("failed to remove buffer %v: %v", buffers[i].filename, err) + mgr.logger.Errorf("failed to remove buffer %v: %v", buffers[i].filename, err) } delete(mgr.buffersByKey, buffers[i].slabKey.String()) buffers[i] = buffers[len(buffers)-1] diff --git a/stores/slabbuffer_test.go b/stores/slabbuffer_test.go index bb0a7601d..4425fcff1 100644 --- a/stores/slabbuffer_test.go +++ b/stores/slabbuffer_test.go @@ -13,7 +13,7 @@ func TestRecordAppendToCompletedBuffer(t *testing.T) { defer ss.Close() completionThreshold := int64(1000) - mgr, err := newSlabBufferManager(ss.SQLStore, completionThreshold, t.TempDir()) + mgr, err := newSlabBufferManager(context.Background(), ss.alerts, ss.bMain, ss.logger, completionThreshold, t.TempDir()) if err != nil { t.Fatal(err) } @@ -66,7 +66,7 @@ func TestMarkBufferCompleteTwice(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) defer ss.Close() - mgr, err := newSlabBufferManager(ss.SQLStore, 0, t.TempDir()) + mgr, err := newSlabBufferManager(context.Background(), ss.alerts, ss.bMain, ss.logger, 0, t.TempDir()) if err != nil { t.Fatal(err) } diff --git a/stores/sql.go b/stores/sql.go index ea3716d25..7cad8f5af 100644 --- a/stores/sql.go +++ b/stores/sql.go @@ -279,7 +279,7 @@ func NewSQLStore(cfg Config) (*SQLStore, modules.ConsensusChangeID, error) { shutdownCtxCancel: shutdownCtxCancel, } - ss.slabBufferMgr, err = newSlabBufferManager(ss, cfg.SlabBufferCompletionThreshold, cfg.PartialSlabDir) + ss.slabBufferMgr, err = newSlabBufferManager(shutdownCtx, cfg.Alerts, dbMain, l.Named("slabbuffers"), cfg.SlabBufferCompletionThreshold, cfg.PartialSlabDir) if err != nil { return nil, modules.ConsensusChangeID{}, err } diff --git a/stores/sql/database.go b/stores/sql/database.go index b16573e9f..88a560487 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -17,6 +17,9 @@ type ( Database interface { io.Closer + // LoadSlabBuffers loads the slab buffers from the database. + LoadSlabBuffers(ctx context.Context) ([]LoadedSlabBuffer, []string, error) + // Migrate runs all missing migrations on the database. Migrate(ctx context.Context) error @@ -304,6 +307,16 @@ type ( WalletMetrics(ctx context.Context, start time.Time, n uint64, interval time.Duration, opts api.WalletMetricsQueryOpts) ([]api.WalletMetric, error) } + LoadedSlabBuffer struct { + ID int64 + ContractSetID int64 + Filename string + Key object.EncryptionKey + MinShards uint8 + Size int64 + TotalShards uint8 + } + UsedContract struct { ID int64 FCID FileContractID diff --git a/stores/sql/main.go b/stores/sql/main.go index 268743261..48600dc08 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -531,6 +531,73 @@ func InsertObject(ctx context.Context, tx sql.Tx, key string, dirID, bucketID, s return res.LastInsertId() } +func LoadSlabBuffers(ctx context.Context, db *sql.DB) (bufferedSlabs []LoadedSlabBuffer, orphanedBuffers []string, err error) { + err = db.Transaction(ctx, func(tx sql.Tx) error { + // collect all buffers + rows, err := db.Query(ctx, ` + SELECT bs.id, bs.filename, sla.db_contract_set_id, sla.key, sla.min_shards, sla.total_shards + FROM buffered_slabs bs + INNER JOIN slabs sla ON sla.db_buffered_slab_id = bs.id + `) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var bs LoadedSlabBuffer + var sk SecretKey + if err := rows.Scan(&bs.ID, &bs.Filename, &bs.ContractSetID, &sk, &bs.MinShards, &bs.TotalShards); err != nil { + return fmt.Errorf("failed to scan buffered slab: %w", err) + } else if err := bs.Key.UnmarshalBinary(sk[:]); err != nil { + return fmt.Errorf("failed to unmarshal secret key: %w", err) + } + bufferedSlabs = append(bufferedSlabs, bs) + } + + // fill in sizes + for i := range bufferedSlabs { + err = tx.QueryRow(ctx, ` + SELECT COALESCE(MAX(offset+length), 0) + FROM slabs sla + INNER JOIN slices sli ON sla.id = sli.db_slab_id + WHERE sla.db_buffered_slab_id = ? + `, bufferedSlabs[i].ID).Scan(&bufferedSlabs[i].Size) + if err != nil { + return fmt.Errorf("failed to fetch buffered slab size: %w", err) + } + } + + // find orphaned buffers and delete them + rows, err = tx.Query(ctx, ` + SELECT bs.id, bs.filename + FROM buffered_slabs bs + LEFT JOIN slabs ON slabs.db_buffered_slab_id = bs.id + WHERE slabs.id IS NULL + `) + if err != nil { + return fmt.Errorf("failed to fetch orphaned buffers: %w", err) + } + var toDelete []int64 + for rows.Next() { + var id int64 + var filename string + if err := rows.Scan(&id, &filename); err != nil { + return fmt.Errorf("failed to scan orphaned buffer: %w", err) + } + orphanedBuffers = append(orphanedBuffers, filename) + toDelete = append(toDelete, id) + } + for _, id := range toDelete { + if _, err := tx.Exec(ctx, "DELETE FROM buffered_slabs WHERE id = ?", id); err != nil { + return fmt.Errorf("failed to delete orphaned buffer: %w", err) + } + } + return nil + }) + return +} + func UpdateMetadata(ctx context.Context, tx sql.Tx, objID int64, md api.ObjectUserMetadata) error { if err := DeleteMetadata(ctx, tx, objID); err != nil { return err diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index e251dc611..0f8769b16 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -60,6 +60,10 @@ func (b *MainDatabase) DB() *sql.DB { return b.db } +func (b *MainDatabase) LoadSlabBuffers(ctx context.Context) ([]ssql.LoadedSlabBuffer, []string, error) { + return ssql.LoadSlabBuffers(ctx, b.db) +} + func (b *MainDatabase) MakeDirsForPath(ctx context.Context, tx sql.Tx, path string) (int64, error) { mtx := b.wrapTxn(tx) return mtx.MakeDirsForPath(ctx, path) diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 74d9999b2..b3c3cb6b7 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -59,6 +59,10 @@ func (b *MainDatabase) DB() *sql.DB { return b.db } +func (b *MainDatabase) LoadSlabBuffers(ctx context.Context) ([]ssql.LoadedSlabBuffer, []string, error) { + return ssql.LoadSlabBuffers(ctx, b.db) +} + func (b *MainDatabase) MakeDirsForPath(ctx context.Context, tx sql.Tx, path string) (int64, error) { mtx := b.wrapTxn(tx) return mtx.MakeDirsForPath(ctx, path)