Skip to content

Commit

Permalink
fix: garbage collection with more sensible limits grouped by plan/repo (
Browse files Browse the repository at this point in the history
  • Loading branch information
garethgeorge authored Nov 14, 2024
1 parent 78c01a1 commit 492beb2
Showing 1 changed file with 89 additions and 23 deletions.
112 changes: 89 additions & 23 deletions internal/orchestrator/tasks/taskcollectgarbage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tasks
import (
"context"
"fmt"
"reflect"
"time"

v1 "github.com/garethgeorge/backrest/gen/go/v1"
Expand All @@ -11,21 +12,45 @@ import (
"go.uber.org/zap"
)

type gcSettingsForType struct {
maxAge time.Duration
keepMin int
keepMax int
}

type groupByKey struct {
Repo string
Plan string
Type reflect.Type
}

const (
gcStartupDelay = 60 * time.Second
gcStartupDelay = 1 * time.Second
gcInterval = 24 * time.Hour
)

// gcAgeForOperation returns the age at which an operation is eligible for garbage collection.
func gcAgeForOperation(op *v1.Operation) time.Duration {
switch op.Op.(type) {
// stats, check, and prune operations are kept for a year
case *v1.Operation_OperationStats, *v1.Operation_OperationCheck, *v1.Operation_OperationPrune:
return 365 * 24 * time.Hour
// all other operations are kept for 30 days
default:
return 30 * 24 * time.Hour
}
var gcSettings = map[reflect.Type]gcSettingsForType{
reflect.TypeOf(&v1.Operation_OperationStats{}): {
maxAge: 365 * 24 * time.Hour,
keepMin: 1,
keepMax: 100,
},
reflect.TypeOf(&v1.Operation_OperationCheck{}): {
maxAge: 365 * 24 * time.Hour,
keepMin: 1,
keepMax: 12,
},
reflect.TypeOf(&v1.Operation_OperationPrune{}): {
maxAge: 365 * 24 * time.Hour,
keepMin: 1,
keepMax: 12,
},
}

var defaultGcSettings = gcSettingsForType{
maxAge: 30 * 24 * time.Hour,
keepMin: 1,
keepMax: 100,
}

type CollectGarbageTask struct {
Expand Down Expand Up @@ -85,38 +110,79 @@ func (t *CollectGarbageTask) gcOperations(log *oplog.OpLog) error {
return fmt.Errorf("identifying forgotten snapshots: %w", err)
}

// keep track of IDs that are still valid and of the IDs that are being forgotten
validIDs := make(map[int64]struct{})
forgetIDs := []int64{}
curTime := curTimeMillis()
if err := log.Query(oplog.SelectAll, func(op *v1.Operation) error {

var deletedByMaxAge, deletedByMaxCount, deletedByForgottenSnapshot int
deletedByType := make(map[string]int)
stats := make(map[groupByKey]gcSettingsForType)

if err := log.Query(oplog.Query{Reversed: true}, func(op *v1.Operation) error {
validIDs[op.Id] = struct{}{}

forgot, ok := snapshotForgottenForFlow[op.FlowId]
if !ok {
// no snapshot associated with this flow; check if it's old enough to be gc'd
maxAgeForOperation := gcAgeForOperation(op)
if curTime-op.UnixTimeStartMs > maxAgeForOperation.Milliseconds() {
if ok {
if forgot {
// snapshot is forgotten; this operation is eligible for gc
forgetIDs = append(forgetIDs, op.Id)
deletedByForgottenSnapshot++
deletedByType[reflect.TypeOf(op.Op).String()]++
}
} else if forgot {
// snapshot is forgotten; this operation is eligible for gc
return nil
}

key := groupByKey{
Repo: op.RepoId,
Plan: op.PlanId,
Type: reflect.TypeOf(op.Op),
}

st, ok := stats[key]
if !ok {
gcSettings, ok := gcSettings[reflect.TypeOf(op.Op)]
if !ok {
st = defaultGcSettings
} else {
st = gcSettings
}
}

st.keepMax-- // decrement the max retention, when this < 0 operation must be gc'd
st.keepMin-- // decrement the min retention, when this < 0 we can start gc'ing
stats[key] = st // update the stats

if st.keepMin >= 0 {
// can't delete if within min retention period
return nil
}
if st.keepMax < 0 {
// max retention reached; this operation must be gc'd.
forgetIDs = append(forgetIDs, op.Id)
deletedByMaxCount++
deletedByType[key.Type.String()]++
} else if curTime-op.UnixTimeStartMs > st.maxAge.Milliseconds() {
// operation is old enough to be gc'd
forgetIDs = append(forgetIDs, op.Id)
deletedByMaxAge++
deletedByType[key.Type.String()]++
}

return nil
}); err != nil {
return fmt.Errorf("identifying gc eligible operations: %w", err)
}

if err := log.Delete(forgetIDs...); err != nil {
return fmt.Errorf("removing gc eligible operations: %w", err)
} else if len(forgetIDs) > 0 {
for _, id := range forgetIDs {
delete(validIDs, id)
}
}
for _, id := range forgetIDs { // update validIDs with respect to the just deleted operations
delete(validIDs, id)
}

zap.L().Info("collecting garbage operations",
zap.Any("operations_removed", len(forgetIDs)))
zap.Int("operations_removed", len(forgetIDs)), zap.Int("removed_by_age", deletedByMaxAge), zap.Int("removed_by_limit", deletedByMaxCount), zap.Int("removed_by_snapshot_forgotten", deletedByForgottenSnapshot), zap.Any("removed_by_type", deletedByType))

// cleaning up logstore
toDelete := []string{}
Expand Down

0 comments on commit 492beb2

Please sign in to comment.