Skip to content

Commit 503ffa7

Browse files
authored
Merge pull request #308 from SiaFoundation/nate/refactor-sector-prune
Refactor sector prune
2 parents 6f0a7b6 + 5cf05b7 commit 503ffa7

File tree

7 files changed

+252
-191
lines changed

7 files changed

+252
-191
lines changed

persist/sqlite/contracts.go

Lines changed: 88 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -137,26 +137,43 @@ func (u *updateContractsTxn) ContractRelevant(id types.FileContractID) (bool, er
137137
return err == nil, err
138138
}
139139

140-
func (s *Store) batchExpireContractSectors(height uint64) (removed []contractSectorRef, pruned int, err error) {
140+
func deleteExpiredContractSectors(tx txn, height uint64) (sectorIDs []int64, err error) {
141+
const query = `DELETE FROM contract_sector_roots
142+
WHERE id IN (SELECT csr.id FROM contract_sector_roots csr
143+
INNER JOIN contracts c ON (csr.contract_id=c.id)
144+
-- past proof window or not confirmed and past the rebroadcast height
145+
WHERE c.window_end < $1 OR c.contract_status=$2 LIMIT $3)
146+
RETURNING sector_id;`
147+
rows, err := tx.Query(query, height, contracts.ContractStatusRejected, sqlSectorBatchSize)
148+
if err != nil {
149+
return nil, err
150+
}
151+
defer rows.Close()
152+
for rows.Next() {
153+
var id int64
154+
if err := rows.Scan(&id); err != nil {
155+
return nil, err
156+
}
157+
sectorIDs = append(sectorIDs, id)
158+
}
159+
return sectorIDs, nil
160+
}
161+
162+
func (s *Store) batchExpireContractSectors(height uint64) (expired int, removed []types.Hash256, err error) {
141163
err = s.transaction(func(tx txn) (err error) {
142-
removed, err = expiredContractSectors(tx, height, sqlSectorBatchSize)
164+
sectorIDs, err := deleteExpiredContractSectors(tx, height)
143165
if err != nil {
144-
return fmt.Errorf("failed to select sectors: %w", err)
166+
return fmt.Errorf("failed to delete contract sectors: %w", err)
145167
}
168+
expired = len(sectorIDs)
146169

147-
refs := make([]contractSectorRootRef, 0, len(removed))
148-
for _, sector := range removed {
149-
refs = append(refs, contractSectorRootRef{
150-
dbID: sector.ID,
151-
sectorID: sector.SectorID,
152-
})
170+
// decrement the contract metrics
171+
if err := incrementNumericStat(tx, metricContractSectors, -len(sectorIDs), time.Now()); err != nil {
172+
return fmt.Errorf("failed to decrement contract sectors: %w", err)
153173
}
154174

155-
pruned, err = deleteContractSectors(tx, refs)
156-
if err != nil {
157-
return fmt.Errorf("failed to prune sectors: %w", err)
158-
}
159-
return nil
175+
removed, err = pruneSectors(tx, sectorIDs)
176+
return err
160177
})
161178
return
162179
}
@@ -298,9 +315,8 @@ func (s *Store) ReviseContract(revision contracts.SignedRevision, roots []types.
298315
return fmt.Errorf("failed to trim sectors: %w", err)
299316
}
300317
sectors -= change.A
301-
removed := roots[len(roots)-int(change.A):]
302-
for _, root := range removed {
303-
if !trimmed[root] {
318+
for i, root := range roots[len(roots)-int(change.A):] {
319+
if trimmed[i] != root {
304320
return fmt.Errorf("inconsistent sector trim: expected %s to be trimmed", root)
305321
}
306322
}
@@ -519,28 +535,16 @@ func (s *Store) UpdateContractState(ccID modules.ConsensusChangeID, height uint6
519535
// ExpireContractSectors expires all sectors that are no longer covered by an
520536
// active contract.
521537
func (s *Store) ExpireContractSectors(height uint64) error {
522-
var totalRemoved int
523-
contractExpired := make(map[types.FileContractID]int)
524-
defer func() {
525-
for contractID, removed := range contractExpired {
526-
s.log.Debug("expired contract sectors", zap.Stringer("contractID", contractID), zap.Uint64("height", height), zap.Int("expired", removed))
527-
}
528-
if totalRemoved > 0 {
529-
s.log.Debug("removed contract sectors", zap.Uint64("height", height), zap.Int("removed", totalRemoved))
530-
}
531-
}()
538+
log := s.log.Named("ExpireContractSectors").With(zap.Uint64("height", height))
532539
// delete in batches to avoid holding a lock on the database for too long
533540
for i := 0; ; i++ {
534541
expired, removed, err := s.batchExpireContractSectors(height)
535542
if err != nil {
536543
return fmt.Errorf("failed to prune sectors: %w", err)
537-
} else if len(expired) == 0 {
544+
} else if expired == 0 {
538545
return nil
539546
}
540-
for _, ref := range expired {
541-
contractExpired[ref.ContractID]++
542-
}
543-
totalRemoved += removed
547+
log.Debug("removed sectors", zap.Int("expired", expired), zap.Stringers("removed", removed), zap.Int("batch", i))
544548
jitterSleep(time.Millisecond) // allow other transactions to run
545549
}
546550
}
@@ -561,6 +565,7 @@ func getContract(tx txn, contractID int64) (contracts.Contract, error) {
561565
return contract, err
562566
}
563567

568+
// appendSector appends a new sector root to a contract.
564569
func appendSector(tx txn, contractID int64, root types.Hash256, index uint64) error {
565570
var sectorID int64
566571
err := tx.QueryRow(`INSERT INTO contract_sector_roots (contract_id, sector_id, root_index) SELECT $1, id, $2 FROM stored_sectors WHERE sector_root=$3 RETURNING sector_id`, contractID, index, sqlHash256(root)).Scan(&sectorID)
@@ -572,6 +577,7 @@ func appendSector(tx txn, contractID int64, root types.Hash256, index uint64) er
572577
return nil
573578
}
574579

580+
// updateSector updates a contract sector root in place and returns the old sector root
575581
func updateSector(tx txn, contractID int64, root types.Hash256, index uint64) (types.Hash256, error) {
576582
row := tx.QueryRow(`SELECT csr.id, csr.sector_id, ss.sector_root
577583
FROM contract_sector_roots csr
@@ -582,26 +588,28 @@ WHERE contract_id=$1 AND root_index=$2`, contractID, index)
582588
return types.Hash256{}, fmt.Errorf("failed to get old sector id: %w", err)
583589
}
584590

585-
// update the sector ID
586591
var newSectorID int64
587-
err = tx.QueryRow(`WITH sector AS (
588-
SELECT id FROM stored_sectors WHERE sector_root=$1
589-
)
590-
UPDATE contract_sector_roots
591-
SET sector_id=sector.id
592-
FROM sector
593-
WHERE contract_sector_roots.id=$2
594-
RETURNING sector_id;`, sqlHash256(root), ref.dbID).Scan(&newSectorID)
592+
err = tx.QueryRow(`SELECT id FROM stored_sectors WHERE sector_root=$1`, sqlHash256(root)).Scan(&newSectorID)
593+
if err != nil {
594+
return types.Hash256{}, fmt.Errorf("failed to get new sector id: %w", err)
595+
}
596+
597+
// update the sector ID
598+
err = tx.QueryRow(`UPDATE contract_sector_roots
599+
SET sector_id=$1
600+
WHERE id=$2
601+
RETURNING sector_id;`, newSectorID, ref.dbID).Scan(&newSectorID)
595602
if err != nil {
596603
return types.Hash256{}, fmt.Errorf("failed to update sector ID: %w", err)
597604
}
598605
// prune the old sector ID
599-
if _, err := pruneSectorRef(tx, ref.sectorID); err != nil {
606+
if _, err := pruneSectors(tx, []int64{ref.sectorID}); err != nil {
600607
return types.Hash256{}, fmt.Errorf("failed to prune old sector: %w", err)
601608
}
602609
return ref.root, nil
603610
}
604611

612+
// swapSectors swaps two sector roots in a contract and returns the sector roots
605613
func swapSectors(tx txn, contractID int64, i, j uint64) (map[types.Hash256]bool, error) {
606614
if i == j {
607615
return nil, nil
@@ -656,11 +664,14 @@ ORDER BY root_index ASC;`, contractID, i, j)
656664
}, nil
657665
}
658666

659-
// lastContractSectors returns the last n sector IDs for a contract.
660-
func lastContractSectors(tx txn, contractID int64, n uint64) (roots []contractSectorRootRef, err error) {
661-
const query = `SELECT csr.id, csr.sector_id, ss.sector_root FROM contract_sector_roots csr
667+
// lastNContractSectors returns the last n sector IDs for a contract.
668+
func lastNContractSectors(tx txn, contractID int64, n uint64) (roots []contractSectorRootRef, err error) {
669+
const query = `SELECT csr.id, csr.sector_id, ss.sector_root FROM contract_sector_roots csr
662670
INNER JOIN stored_sectors ss ON (csr.sector_id=ss.id)
663-
WHERE contract_id=$1 ORDER BY root_index DESC LIMIT $2;`
671+
WHERE csr.contract_id=$1
672+
ORDER BY root_index DESC
673+
LIMIT $2;`
674+
664675
rows, err := tx.Query(query, contractID, n)
665676
if err != nil {
666677
return nil, err
@@ -677,68 +688,48 @@ WHERE contract_id=$1 ORDER BY root_index DESC LIMIT $2;`
677688
return
678689
}
679690

680-
// deleteContractSectors deletes sector roots from a contract. Sectors that are
681-
// still referenced will not be removed. Returns the number of sectors deleted.
682-
func deleteContractSectors(tx txn, refs []contractSectorRootRef) (int, error) {
683-
var rootIDs []int64
684-
for _, ref := range refs {
685-
rootIDs = append(rootIDs, ref.dbID)
691+
// deleteContractSectorRoots deletes the contract sector roots with the given IDs.
692+
func deleteContractSectorRoots(tx txn, ids []int64) error {
693+
query := `DELETE FROM contract_sector_roots WHERE id IN (` + queryPlaceHolders(len(ids)) + `);`
694+
res, err := tx.Exec(query, queryArgs(ids)...)
695+
if err != nil {
696+
return fmt.Errorf("failed to delete contract sector roots: %w", err)
697+
} else if n, err := res.RowsAffected(); err != nil {
698+
return fmt.Errorf("failed to get rows affected: %w", err)
699+
} else if n != int64(len(ids)) {
700+
return fmt.Errorf("expected %v rows affected, got %v", len(ids), n)
686701
}
702+
return nil
703+
}
687704

688-
// delete the sector roots
689-
query := `DELETE FROM contract_sector_roots WHERE id IN (` + queryPlaceHolders(len(rootIDs)) + `) RETURNING id;`
690-
rows, err := tx.Query(query, queryArgs(rootIDs)...)
705+
// trimSectors deletes the last n sector roots for a contract and returns the
706+
// deleted sector roots in order.
707+
func trimSectors(tx txn, contractID int64, n uint64, log *zap.Logger) ([]types.Hash256, error) {
708+
refs, err := lastNContractSectors(tx, contractID, n)
691709
if err != nil {
692-
return 0, fmt.Errorf("failed to delete sectors: %w", err)
693-
}
694-
deleted := make(map[int64]bool)
695-
for rows.Next() {
696-
var id int64
697-
if err := rows.Scan(&id); err != nil {
698-
return 0, fmt.Errorf("failed to scan deleted sector: %w", err)
699-
}
700-
deleted[id] = true
701-
}
702-
if len(deleted) != len(rootIDs) {
703-
return 0, errors.New("failed to delete all sectors")
704-
}
705-
for _, rootID := range rootIDs {
706-
if !deleted[rootID] {
707-
return 0, errors.New("failed to delete all sectors")
708-
}
710+
return nil, fmt.Errorf("failed to get sector roots: %w", err)
709711
}
710712

711-
// decrement the contract metrics
712-
if err := incrementNumericStat(tx, metricContractSectors, -len(refs), time.Now()); err != nil {
713-
return 0, fmt.Errorf("failed to decrement contract sectors: %w", err)
713+
var contractSectorRootIDs []int64
714+
roots := make([]types.Hash256, len(refs))
715+
var sectorIDs []int64
716+
for i, ref := range refs {
717+
contractSectorRootIDs = append(contractSectorRootIDs, ref.dbID)
718+
roots[len(roots)-i-1] = ref.root // reverse the order to match the contract sector roots
719+
sectorIDs = append(sectorIDs, ref.sectorID)
714720
}
715721

716-
// attempt to prune the deleted sectors
717-
var pruned int
718-
for _, ref := range refs {
719-
deleted, err := pruneSectorRef(tx, ref.sectorID)
720-
if err != nil {
721-
return 0, fmt.Errorf("failed to prune sector ref: %w", err)
722-
} else if deleted {
723-
pruned++
724-
}
722+
if err := deleteContractSectorRoots(tx, contractSectorRootIDs); err != nil {
723+
return nil, fmt.Errorf("failed to delete contract sector roots: %w", err)
724+
} else if err := incrementNumericStat(tx, metricContractSectors, -len(contractSectorRootIDs), time.Now()); err != nil {
725+
return nil, fmt.Errorf("failed to decrement contract sectors: %w", err)
725726
}
726-
return pruned, nil
727-
}
728727

729-
// trimSectors deletes the last n sector roots for a contract.
730-
func trimSectors(tx txn, contractID int64, n uint64, log *zap.Logger) (map[types.Hash256]bool, error) {
731-
refs, err := lastContractSectors(tx, contractID, n)
728+
removed, err := pruneSectors(tx, sectorIDs)
732729
if err != nil {
733-
return nil, fmt.Errorf("failed to get sector IDs: %w", err)
734-
} else if _, err = deleteContractSectors(tx, refs); err != nil {
735-
return nil, fmt.Errorf("failed to delete sectors: %w", err)
736-
}
737-
738-
roots := make(map[types.Hash256]bool)
739-
for _, ref := range refs {
740-
roots[ref.root] = true
730+
return nil, fmt.Errorf("failed to prune sectors: %w", err)
741731
}
732+
log.Debug("trimmed sectors", zap.Stringers("trimmed", roots), zap.Stringers("removed", removed))
742733
return roots, nil
743734
}
744735

persist/sqlite/contracts_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ func TestReviseContract(t *testing.T) {
242242
},
243243
}
244244
for _, test := range tests {
245-
t.Run(test.name, func(t *testing.T) {
245+
func() {
246+
t.Log("revising contract:", test.name)
246247
oldRoots := append([]types.Hash256(nil), roots...)
247248
// update the expected roots
248249
for i, change := range test.changes {
@@ -295,7 +296,7 @@ func TestReviseContract(t *testing.T) {
295296
} else if err := checkConsistency(roots, test.sectors); err != nil {
296297
t.Fatal(err)
297298
}
298-
})
299+
}()
299300
}
300301
}
301302

persist/sqlite/migrations.go

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,51 @@ import (
1010
"go.uber.org/zap"
1111
)
1212

13+
// migrateVersion26 recalculates the contract and physical sectors metrics
14+
func migrateVersion26(tx txn, log *zap.Logger) error {
15+
// recalculate the contract sectors metric
16+
var contractSectorCount int64
17+
if err := tx.QueryRow(`SELECT COUNT(*) FROM contract_sector_roots`).Scan(&contractSectorCount); err != nil {
18+
return fmt.Errorf("failed to query contract sector count: %w", err)
19+
} else if err := setNumericStat(tx, metricContractSectors, uint64(contractSectorCount), time.Now()); err != nil {
20+
return fmt.Errorf("failed to set contract sectors metric: %w", err)
21+
}
22+
23+
// recalculate the physical sectors metric
24+
var physicalSectorsCount int64
25+
volumePhysicalSectorCount := make(map[int64]int64)
26+
rows, err := tx.Query(`SELECT volume_id, COUNT(*) FROM volume_sectors WHERE sector_id IS NOT NULL GROUP BY volume_id`)
27+
if err != nil && err != sql.ErrNoRows {
28+
return fmt.Errorf("failed to query volume sector count: %w", err)
29+
}
30+
defer rows.Close()
31+
32+
for rows.Next() {
33+
var volumeID, count int64
34+
if err := rows.Scan(&volumeID, &count); err != nil {
35+
return fmt.Errorf("failed to scan volume sector count: %w", err)
36+
}
37+
volumePhysicalSectorCount[volumeID] = count
38+
physicalSectorsCount += count
39+
}
40+
41+
// update the physical sectors metric
42+
if err := setNumericStat(tx, metricPhysicalSectors, uint64(physicalSectorsCount), time.Now()); err != nil {
43+
return fmt.Errorf("failed to set contract sectors metric: %w", err)
44+
}
45+
46+
// update the volume stats
47+
for volumeID, count := range volumePhysicalSectorCount {
48+
err := tx.QueryRow(`UPDATE storage_volumes SET used_sectors = $1 WHERE id = $2 RETURNING id`, count, volumeID).Scan(&volumeID)
49+
if err != nil {
50+
return fmt.Errorf("failed to update volume stats: %w", err)
51+
}
52+
}
53+
return nil
54+
}
55+
56+
// migrateVersion25 is a no-op migration to trigger foreign key checks
1357
func migrateVersion25(tx txn, log *zap.Logger) error {
14-
// no-op migration to trigger foreign key checks
1558
return nil
1659
}
1760

0 commit comments

Comments
 (0)