Skip to content

Commit

Permalink
worker: move migration alert
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Sep 16, 2024
1 parent 23646fb commit ccd9250
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 216 deletions.
7 changes: 0 additions & 7 deletions api/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ type (
Total uint64 `json:"total"`
}

// MigrateSlabResponse is the response type for the /slab/migrate endpoint.
MigrateSlabResponse struct {
NumShardsMigrated int `json:"numShardsMigrated"`
SurchargeApplied bool `json:"surchargeApplied,omitempty"`
Error string `json:"error,omitempty"`
}

// RHPFormResponse is the response type for the /rhp/form endpoint.
RHPFormResponse struct {
ContractID types.FileContractID `json:"contractID"`
Expand Down
71 changes: 5 additions & 66 deletions autopilot/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ import (

"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/object"
)

var (
alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted
alertLowBalanceID = alerts.RandomAlertID() // constant until restarted
alertMigrationID = alerts.RandomAlertID() // constant until restarted
alertPruningID = alerts.RandomAlertID() // constant until restarted
alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted
alertLowBalanceID = alerts.RandomAlertID() // constant until restarted
alertOngoingMigrationsID = alerts.RandomAlertID() // constant until restarted
alertPruningID = alerts.RandomAlertID() // constant until restarted
)

func (ap *Autopilot) RegisterAlert(ctx context.Context, a alerts.Alert) {
Expand Down Expand Up @@ -74,74 +73,14 @@ func newOngoingMigrationsAlert(n int, estimate time.Duration) alerts.Alert {
}

return alerts.Alert{
ID: alertMigrationID,
ID: alertOngoingMigrationsID,
Severity: alerts.SeverityInfo,
Message: fmt.Sprintf("Migrating %d slabs", n),
Timestamp: time.Now(),
Data: data,
}
}

func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Alert {
return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityInfo,
Message: "Critical migration succeeded",
Data: map[string]interface{}{
"slabKey": slabKey.String(),
"hint": "This migration succeeded thanks to the MigrationSurchargeMultiplier in the gouging settings that allowed overpaying hosts on some critical sector downloads",
},
Timestamp: time.Now(),
}
}

func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "If migrations of low-health slabs fail, it might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.",
}
if objectIds != nil {
data["objectIDs"] = objectIds
}

return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: alerts.SeverityCritical,
Message: "Critical migration failed",
Data: data,
Timestamp: time.Now(),
}
}

func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.",
}
if objectIds != nil {
data["objectIDs"] = objectIds
}

severity := alerts.SeverityError
if health < 0.25 {
severity = alerts.SeverityCritical
} else if health < 0.5 {
severity = alerts.SeverityWarning
}

return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: severity,
Message: "Slab migration failed",
Data: data,
Timestamp: time.Now(),
}
}

func newRefreshHealthFailedAlert(err error) alerts.Alert {
return alerts.Alert{
ID: alertHealthRefreshID,
Expand Down
102 changes: 20 additions & 82 deletions autopilot/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package autopilot

import (
"context"
"errors"
"fmt"
"math"
"sort"
"sync"
"time"

"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
"go.sia.tech/renterd/object"
Expand Down Expand Up @@ -49,20 +47,15 @@ type (
}
)

func (j *job) execute(ctx context.Context, w Worker) (_ api.MigrateSlabResponse, err error) {
func (j *job) execute(ctx context.Context, w Worker) (time.Duration, error) {
start := time.Now()
slab, err := j.b.Slab(ctx, j.EncryptionKey)
if err != nil {
return api.MigrateSlabResponse{}, fmt.Errorf("failed to fetch slab; %w", err)
return 0, fmt.Errorf("failed to fetch slab; %w", err)
}

res, err := w.MigrateSlab(ctx, slab, j.set)
if err != nil {
return api.MigrateSlabResponse{}, fmt.Errorf("failed to migrate slab; %w", err)
} else if res.Error != "" {
return res, fmt.Errorf("failed to migrate slab; %w", errors.New(res.Error))
}

return res, nil
err = w.MigrateSlab(ctx, slab, j.set)
return time.Since(start), err
}

func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uint64) *migrator {
Expand Down Expand Up @@ -157,44 +150,20 @@ func (m *migrator) performMigrations(p *workerPool) {

// process jobs
for j := range jobs {
start := time.Now()
res, err := j.execute(ctx, w)
m.statsSlabMigrationSpeedMS.Track(float64(time.Since(start).Milliseconds()))
if err != nil {
m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.EncryptionKey, j.Health, res.SurchargeApplied, err)
if utils.IsErr(err, api.ErrConsensusNotSynced) {
// interrupt migrations if consensus is not synced
select {
case m.signalConsensusNotSynced <- struct{}{}:
default:
}
return
} else if !utils.IsErr(err, api.ErrSlabNotFound) {
// fetch all object IDs for the slab we failed to migrate
var objectIds map[string][]string
if res, err := m.objectIDsForSlabKey(ctx, j.EncryptionKey); err != nil {
m.logger.Errorf("failed to fetch object ids for slab key; %w", err)
} else {
objectIds = res
}

// register the alert
if res.SurchargeApplied {
m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.EncryptionKey, j.Health, objectIds, err))
} else {
m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.EncryptionKey, j.Health, objectIds, err))
}
}
} else {
m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.EncryptionKey, j.Health, res.SurchargeApplied, res.NumShardsMigrated)
m.ap.DismissAlert(ctx, alerts.IDForSlab(alertMigrationID, j.EncryptionKey))
if res.SurchargeApplied {
// this alert confirms the user his gouging
// settings are working, it will be dismissed
// automatically the next time this slab is
// successfully migrated
m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.EncryptionKey))
duration, err := j.execute(ctx, w)
m.statsSlabMigrationSpeedMS.Track(float64(duration.Milliseconds()))
if utils.IsErr(err, api.ErrConsensusNotSynced) {
// interrupt migrations if consensus is not synced
select {
case m.signalConsensusNotSynced <- struct{}{}:
default:
}
return
} else if err != nil {
m.logger.Errorw("migration failed",
zap.Float64("health", j.Health),
zap.Stringer("slab", j.EncryptionKey),
zap.String("worker", id))
}
}
}(w)
Expand Down Expand Up @@ -263,8 +232,8 @@ func (m *migrator) performMigrations(p *workerPool) {
})
}

// unregister the migration alert when we're done
defer m.ap.alerts.DismissAlerts(m.ap.shutdownCtx, alertMigrationID)
// unregister the ongoing migrations alert when we're done
defer m.ap.alerts.DismissAlerts(m.ap.shutdownCtx, alertOngoingMigrationsID)

OUTER:
for {
Expand Down Expand Up @@ -312,34 +281,3 @@ OUTER:
return
}
}

func (m *migrator) objectIDsForSlabKey(ctx context.Context, key object.EncryptionKey) (map[string][]string, error) {
// fetch all buckets
//
// NOTE:at the time of writing the bus does not support fetching objects by
// slab key across all buckets at once, therefor we have to list all buckets
// and loop over them, revisit on the next major release
buckets, err := m.ap.bus.ListBuckets(ctx)
if err != nil {
return nil, fmt.Errorf("%w; failed to list buckets", err)
}

// fetch all objects per bucket
idsPerBucket := make(map[string][]string)
for _, bucket := range buckets {
res, err := m.ap.bus.ListObjects(ctx, "", api.ListObjectOptions{Bucket: bucket.Name, SlabEncryptionKey: key})
if err != nil {
m.logger.Errorf("failed to fetch objects for slab key in bucket %v; %w", bucket, err)
continue
} else if len(res.Objects) == 0 {
continue
}

idsPerBucket[bucket.Name] = make([]string, len(res.Objects))
for i, object := range res.Objects {
idsPerBucket[bucket.Name][i] = object.Key
}
}

return idsPerBucket, nil
}
2 changes: 1 addition & 1 deletion autopilot/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Worker interface {
Account(ctx context.Context, hostKey types.PublicKey) (rhpv3.Account, error)
Contracts(ctx context.Context, hostTimeout time.Duration) (api.ContractsResponse, error)
ID(ctx context.Context) (string, error)
MigrateSlab(ctx context.Context, s object.Slab, set string) (api.MigrateSlabResponse, error)
MigrateSlab(ctx context.Context, s object.Slab, set string) error

RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (api.HostPriceTable, error)
RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error)
Expand Down
46 changes: 46 additions & 0 deletions worker/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ import (

"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/object"
"lukechampine.com/frand"
)

var (
alertMigrationID = alerts.RandomAlertID() // constant until restarted
)

func randomAlertID() types.Hash256 {
return frand.Entropy256()
}
Expand All @@ -30,6 +36,46 @@ func newDownloadFailedAlert(bucket, key string, offset, length, contracts int64,
}
}

func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objects []api.ObjectMetadata, err error) alerts.Alert {
data := map[string]interface{}{
"error": err.Error(),
"health": health,
"slabKey": slabKey.String(),
"hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously. It might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.",
}

if len(objects) > 0 {
data["objects"] = objects
}

hostErr := err
for errors.Unwrap(hostErr) != nil {
hostErr = errors.Unwrap(hostErr)
}
if set, ok := hostErr.(HostErrorSet); ok {
hostErrors := make(map[string]string, len(set))
for hk, err := range set {
hostErrors[hk.String()] = err.Error()
}
data["hosts"] = hostErrors
}

severity := alerts.SeverityError
if health < 0.25 {
severity = alerts.SeverityCritical
} else if health < 0.5 {
severity = alerts.SeverityWarning
}

return alerts.Alert{
ID: alerts.IDForSlab(alertMigrationID, slabKey),
Severity: severity,
Message: "Slab migration failed",
Data: data,
Timestamp: time.Now(),
}
}

func newUploadFailedAlert(bucket, path, contractSet, mimeType string, minShards, totalShards, contracts int, packing, multipart bool, err error) alerts.Alert {
data := map[string]any{
"bucket": bucket,
Expand Down
4 changes: 3 additions & 1 deletion worker/alerts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"go.sia.tech/renterd/alerts"
)

// TestUploadFailedAlertErrorSet is a test to verify that an upload failing with a HostErrorSet error registers an alert with all the individual errors of any host in the payload.
// TestUploadFailedAlertErrorSet is a test to verify that an upload failing with
// a HostErrorSet error registers an alert with all the individual errors of any
// host in the payload.
func TestUploadFailedAlertErrorSet(t *testing.T) {
hostErrSet := HostErrorSet{
types.PublicKey{1, 1, 1}: errors.New("test"),
Expand Down
5 changes: 2 additions & 3 deletions worker/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,10 @@ func (c *Client) Memory(ctx context.Context) (resp api.MemoryResponse, err error
}

// MigrateSlab migrates the specified slab.
func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) (res api.MigrateSlabResponse, err error) {
func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) error {
values := make(url.Values)
values.Set("contractset", set)
err = c.c.WithContext(ctx).POST("/slab/migrate?"+values.Encode(), slab, &res)
return
return c.c.WithContext(ctx).POST("/slab/migrate?"+values.Encode(), slab, nil)
}

// State returns the current state of the worker.
Expand Down
Loading

0 comments on commit ccd9250

Please sign in to comment.