diff --git a/api/worker.go b/api/worker.go index ee72ca73e..40264f895 100644 --- a/api/worker.go +++ b/api/worker.go @@ -70,13 +70,6 @@ type ( Total uint64 `json:"total"` } - // MigrateSlabResponse is the response type for the /slab/migrate endpoint. - MigrateSlabResponse struct { - NumShardsMigrated int `json:"numShardsMigrated"` - SurchargeApplied bool `json:"surchargeApplied,omitempty"` - Error string `json:"error,omitempty"` - } - // RHPFormResponse is the response type for the /rhp/form endpoint. RHPFormResponse struct { ContractID types.FileContractID `json:"contractID"` diff --git a/autopilot/alerts.go b/autopilot/alerts.go index 47a926ad5..24b7a8daf 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -7,14 +7,13 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/alerts" - "go.sia.tech/renterd/object" ) var ( - alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted - alertLowBalanceID = alerts.RandomAlertID() // constant until restarted - alertMigrationID = alerts.RandomAlertID() // constant until restarted - alertPruningID = alerts.RandomAlertID() // constant until restarted + alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted + alertLowBalanceID = alerts.RandomAlertID() // constant until restarted + alertOngoingMigrationsID = alerts.RandomAlertID() // constant until restarted + alertPruningID = alerts.RandomAlertID() // constant until restarted ) func (ap *Autopilot) RegisterAlert(ctx context.Context, a alerts.Alert) { @@ -74,7 +73,7 @@ func newOngoingMigrationsAlert(n int, estimate time.Duration) alerts.Alert { } return alerts.Alert{ - ID: alertMigrationID, + ID: alertOngoingMigrationsID, Severity: alerts.SeverityInfo, Message: fmt.Sprintf("Migrating %d slabs", n), Timestamp: time.Now(), @@ -82,66 +81,6 @@ func newOngoingMigrationsAlert(n int, estimate time.Duration) alerts.Alert { } } -func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Alert { - return alerts.Alert{ - ID: alerts.IDForSlab(alertMigrationID, slabKey), - Severity: alerts.SeverityInfo, - Message: "Critical migration succeeded", - Data: map[string]interface{}{ - "slabKey": slabKey.String(), - "hint": "This migration succeeded thanks to the MigrationSurchargeMultiplier in the gouging settings that allowed overpaying hosts on some critical sector downloads", - }, - Timestamp: time.Now(), - } -} - -func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert { - data := map[string]interface{}{ - "error": err.Error(), - "health": health, - "slabKey": slabKey.String(), - "hint": "If migrations of low-health slabs fail, it might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.", - } - if objectIds != nil { - data["objectIDs"] = objectIds - } - - return alerts.Alert{ - ID: alerts.IDForSlab(alertMigrationID, slabKey), - Severity: alerts.SeverityCritical, - Message: "Critical migration failed", - Data: data, - Timestamp: time.Now(), - } -} - -func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert { - data := map[string]interface{}{ - "error": err.Error(), - "health": health, - "slabKey": slabKey.String(), - "hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.", - } - if objectIds != nil { - data["objectIDs"] = objectIds - } - - severity := alerts.SeverityError - if health < 0.25 { - severity = alerts.SeverityCritical - } else if health < 0.5 { - severity = alerts.SeverityWarning - } - - return alerts.Alert{ - ID: alerts.IDForSlab(alertMigrationID, slabKey), - Severity: severity, - Message: "Slab migration failed", - Data: data, - Timestamp: time.Now(), - } -} - func newRefreshHealthFailedAlert(err error) alerts.Alert { return alerts.Alert{ ID: alertHealthRefreshID, diff --git a/autopilot/migrator.go b/autopilot/migrator.go index 0909d0137..01e0c9f20 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -2,14 +2,12 @@ package autopilot import ( "context" - "errors" "fmt" "math" "sort" "sync" "time" - "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/object" @@ -49,20 +47,15 @@ type ( } ) -func (j *job) execute(ctx context.Context, w Worker) (_ api.MigrateSlabResponse, err error) { +func (j *job) execute(ctx context.Context, w Worker) (time.Duration, error) { + start := time.Now() slab, err := j.b.Slab(ctx, j.EncryptionKey) if err != nil { - return api.MigrateSlabResponse{}, fmt.Errorf("failed to fetch slab; %w", err) + return 0, fmt.Errorf("failed to fetch slab; %w", err) } - res, err := w.MigrateSlab(ctx, slab, j.set) - if err != nil { - return api.MigrateSlabResponse{}, fmt.Errorf("failed to migrate slab; %w", err) - } else if res.Error != "" { - return res, fmt.Errorf("failed to migrate slab; %w", errors.New(res.Error)) - } - - return res, nil + err = w.MigrateSlab(ctx, slab, j.set) + return time.Since(start), err } func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uint64) *migrator { @@ -157,44 +150,20 @@ func (m *migrator) performMigrations(p *workerPool) { // process jobs for j := range jobs { - start := time.Now() - res, err := j.execute(ctx, w) - m.statsSlabMigrationSpeedMS.Track(float64(time.Since(start).Milliseconds())) - if err != nil { - m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.EncryptionKey, j.Health, res.SurchargeApplied, err) - if utils.IsErr(err, api.ErrConsensusNotSynced) { - // interrupt migrations if consensus is not synced - select { - case m.signalConsensusNotSynced <- struct{}{}: - default: - } - return - } else if !utils.IsErr(err, api.ErrSlabNotFound) { - // fetch all object IDs for the slab we failed to migrate - var objectIds map[string][]string - if res, err := m.objectIDsForSlabKey(ctx, j.EncryptionKey); err != nil { - m.logger.Errorf("failed to fetch object ids for slab key; %w", err) - } else { - objectIds = res - } - - // register the alert - if res.SurchargeApplied { - m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.EncryptionKey, j.Health, objectIds, err)) - } else { - m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.EncryptionKey, j.Health, objectIds, err)) - } - } - } else { - m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.EncryptionKey, j.Health, res.SurchargeApplied, res.NumShardsMigrated) - m.ap.DismissAlert(ctx, alerts.IDForSlab(alertMigrationID, j.EncryptionKey)) - if res.SurchargeApplied { - // this alert confirms the user his gouging - // settings are working, it will be dismissed - // automatically the next time this slab is - // successfully migrated - m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.EncryptionKey)) + duration, err := j.execute(ctx, w) + m.statsSlabMigrationSpeedMS.Track(float64(duration.Milliseconds())) + if utils.IsErr(err, api.ErrConsensusNotSynced) { + // interrupt migrations if consensus is not synced + select { + case m.signalConsensusNotSynced <- struct{}{}: + default: } + return + } else if err != nil { + m.logger.Errorw("migration failed", + zap.Float64("health", j.Health), + zap.Stringer("slab", j.EncryptionKey), + zap.String("worker", id)) } } }(w) @@ -263,8 +232,8 @@ func (m *migrator) performMigrations(p *workerPool) { }) } - // unregister the migration alert when we're done - defer m.ap.alerts.DismissAlerts(m.ap.shutdownCtx, alertMigrationID) + // unregister the ongoing migrations alert when we're done + defer m.ap.alerts.DismissAlerts(m.ap.shutdownCtx, alertOngoingMigrationsID) OUTER: for { @@ -312,34 +281,3 @@ OUTER: return } } - -func (m *migrator) objectIDsForSlabKey(ctx context.Context, key object.EncryptionKey) (map[string][]string, error) { - // fetch all buckets - // - // NOTE:at the time of writing the bus does not support fetching objects by - // slab key across all buckets at once, therefor we have to list all buckets - // and loop over them, revisit on the next major release - buckets, err := m.ap.bus.ListBuckets(ctx) - if err != nil { - return nil, fmt.Errorf("%w; failed to list buckets", err) - } - - // fetch all objects per bucket - idsPerBucket := make(map[string][]string) - for _, bucket := range buckets { - res, err := m.ap.bus.ListObjects(ctx, "", api.ListObjectOptions{Bucket: bucket.Name, SlabEncryptionKey: key}) - if err != nil { - m.logger.Errorf("failed to fetch objects for slab key in bucket %v; %w", bucket, err) - continue - } else if len(res.Objects) == 0 { - continue - } - - idsPerBucket[bucket.Name] = make([]string, len(res.Objects)) - for i, object := range res.Objects { - idsPerBucket[bucket.Name][i] = object.Key - } - } - - return idsPerBucket, nil -} diff --git a/autopilot/workerpool.go b/autopilot/workerpool.go index 871f1babc..54617aee9 100644 --- a/autopilot/workerpool.go +++ b/autopilot/workerpool.go @@ -16,7 +16,7 @@ type Worker interface { Account(ctx context.Context, hostKey types.PublicKey) (rhpv3.Account, error) Contracts(ctx context.Context, hostTimeout time.Duration) (api.ContractsResponse, error) ID(ctx context.Context) (string, error) - MigrateSlab(ctx context.Context, s object.Slab, set string) (api.MigrateSlabResponse, error) + MigrateSlab(ctx context.Context, s object.Slab, set string) error RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (api.HostPriceTable, error) RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) diff --git a/worker/alerts.go b/worker/alerts.go index 02598c770..894012c5b 100644 --- a/worker/alerts.go +++ b/worker/alerts.go @@ -6,9 +6,15 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/alerts" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/object" "lukechampine.com/frand" ) +var ( + alertMigrationID = alerts.RandomAlertID() // constant until restarted +) + func randomAlertID() types.Hash256 { return frand.Entropy256() } @@ -30,6 +36,46 @@ func newDownloadFailedAlert(bucket, key string, offset, length, contracts int64, } } +func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objects []api.ObjectMetadata, err error) alerts.Alert { + data := map[string]interface{}{ + "error": err.Error(), + "health": health, + "slabKey": slabKey.String(), + "hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously. It might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.", + } + + if len(objects) > 0 { + data["objects"] = objects + } + + hostErr := err + for errors.Unwrap(hostErr) != nil { + hostErr = errors.Unwrap(hostErr) + } + if set, ok := hostErr.(HostErrorSet); ok { + hostErrors := make(map[string]string, len(set)) + for hk, err := range set { + hostErrors[hk.String()] = err.Error() + } + data["hosts"] = hostErrors + } + + severity := alerts.SeverityError + if health < 0.25 { + severity = alerts.SeverityCritical + } else if health < 0.5 { + severity = alerts.SeverityWarning + } + + return alerts.Alert{ + ID: alerts.IDForSlab(alertMigrationID, slabKey), + Severity: severity, + Message: "Slab migration failed", + Data: data, + Timestamp: time.Now(), + } +} + func newUploadFailedAlert(bucket, path, contractSet, mimeType string, minShards, totalShards, contracts int, packing, multipart bool, err error) alerts.Alert { data := map[string]any{ "bucket": bucket, diff --git a/worker/alerts_test.go b/worker/alerts_test.go index 137838a39..4be4c4247 100644 --- a/worker/alerts_test.go +++ b/worker/alerts_test.go @@ -11,7 +11,9 @@ import ( "go.sia.tech/renterd/alerts" ) -// TestUploadFailedAlertErrorSet is a test to verify that an upload failing with a HostErrorSet error registers an alert with all the individual errors of any host in the payload. +// TestUploadFailedAlertErrorSet is a test to verify that an upload failing with +// a HostErrorSet error registers an alert with all the individual errors of any +// host in the payload. func TestUploadFailedAlertErrorSet(t *testing.T) { hostErrSet := HostErrorSet{ types.PublicKey{1, 1, 1}: errors.New("test"), diff --git a/worker/client/client.go b/worker/client/client.go index ca5aee3c8..7fe60b7b7 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -164,11 +164,10 @@ func (c *Client) Memory(ctx context.Context) (resp api.MemoryResponse, err error } // MigrateSlab migrates the specified slab. -func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) (res api.MigrateSlabResponse, err error) { +func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) error { values := make(url.Values) values.Set("contractset", set) - err = c.c.WithContext(ctx).POST("/slab/migrate?"+values.Encode(), slab, &res) - return + return c.c.WithContext(ctx).POST("/slab/migrate?"+values.Encode(), slab, nil) } // State returns the current state of the worker. diff --git a/worker/download.go b/worker/download.go index cedcd7a82..147ae3957 100644 --- a/worker/download.go +++ b/worker/download.go @@ -81,11 +81,10 @@ type ( } slabDownloadResponse struct { - mem Memory - surchargeApplied bool - shards [][]byte - index int - err error + mem Memory + shards [][]byte + index int + err error } sectorDownloadReq struct { @@ -264,14 +263,13 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o wg.Add(1) go func(index int) { defer wg.Done() - shards, surchargeApplied, err := mgr.downloadSlab(ctx, next.SlabSlice, false) + shards, err := mgr.downloadSlab(ctx, next.SlabSlice, false) select { case responseChan <- &slabDownloadResponse{ - mem: mem, - surchargeApplied: surchargeApplied, - shards: shards, - index: index, - err: err, + mem: mem, + shards: shards, + index: index, + err: err, }: case <-ctx.Done(): mem.Release() // relase memory if we're interrupted @@ -302,10 +300,11 @@ outer: } if resp.err != nil { - mgr.logger.Errorf("download slab %v failed, overpaid %v: %v", resp.index, resp.surchargeApplied, resp.err) + mgr.logger.Errorw("slab download failed", + zap.Int("index", resp.index), + zap.Error(err), + ) return resp.err - } else if resp.surchargeApplied { - mgr.logger.Warnf("download for slab %v had to overpay to succeed", resp.index) } responses[resp.index] = resp @@ -353,7 +352,7 @@ outer: return nil } -func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata) ([][]byte, bool, error) { +func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata) ([][]byte, error) { // refresh the downloaders mgr.refreshDownloaders(contracts) @@ -373,7 +372,7 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, // check if we have enough shards if availableShards < slab.MinShards { - return nil, false, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards) + return nil, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards) } // NOTE: we don't acquire memory here since DownloadSlab is only used for @@ -385,19 +384,19 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, Offset: 0, Length: uint32(slab.MinShards) * rhpv2.SectorSize, } - shards, surchargeApplied, err := mgr.downloadSlab(ctx, slice, true) + shards, err := mgr.downloadSlab(ctx, slice, true) if err != nil { - return nil, false, err + return nil, err } // decrypt and recover slice.Decrypt(shards) err = slice.Reconstruct(shards) if err != nil { - return nil, false, err + return nil, err } - return shards, surchargeApplied, err + return shards, err } func (mgr *downloadManager) Stats() downloadManagerStats { @@ -526,7 +525,7 @@ func (mgr *downloadManager) newSlabDownload(slice object.SlabSlice, migration bo } } -func (mgr *downloadManager) downloadSlab(ctx context.Context, slice object.SlabSlice, migration bool) ([][]byte, bool, error) { +func (mgr *downloadManager) downloadSlab(ctx context.Context, slice object.SlabSlice, migration bool) ([][]byte, error) { // prepare new download slab := mgr.newSlabDownload(slice, migration) @@ -694,7 +693,7 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses, } } -func (s *slabDownload) download(ctx context.Context) ([][]byte, bool, error) { +func (s *slabDownload) download(ctx context.Context) ([][]byte, error) { // cancel any sector downloads once the download is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -712,7 +711,7 @@ func (s *slabDownload) download(ctx context.Context) ([][]byte, bool, error) { for i := 0; i < int(s.minShards); { req := s.nextRequest(ctx, resps, false) if req == nil { - return nil, false, fmt.Errorf("no host available for shard %d", i) + return nil, fmt.Errorf("no host available for shard %d", i) } s.launch(req) i++ @@ -728,9 +727,9 @@ loop: for s.inflight() > 0 && !done { select { case <-s.mgr.shutdownCtx.Done(): - return nil, false, errors.New("download stopped") + return nil, errors.New("download stopped") case <-ctx.Done(): - return nil, false, context.Cause(ctx) + return nil, context.Cause(ctx) case <-resps.c: resetOverdrive() } @@ -808,13 +807,13 @@ func (s *slabDownload) downloadSpeed() int64 { return int64(bytes) / ms } -func (s *slabDownload) finish() ([][]byte, bool, error) { +func (s *slabDownload) finish() ([][]byte, error) { s.mu.Lock() defer s.mu.Unlock() if s.numCompleted < s.minShards { - return nil, s.numOverpaid > 0, fmt.Errorf("failed to download slab: completed=%d inflight=%d launched=%d relaunched=%d overpaid=%d downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.numRelaunched, s.numOverpaid, s.mgr.numDownloaders(), len(s.unusedHostSectors), len(s.errs), s.errs) + return nil, fmt.Errorf("failed to download slab: completed=%d inflight=%d launched=%d relaunched=%d overpaid=%d downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.numRelaunched, s.numOverpaid, s.mgr.numDownloaders(), len(s.unusedHostSectors), len(s.errs), s.errs) } - return s.sectors, s.numOverpaid > 0, nil + return s.sectors, nil } func (s *slabDownload) missing() int { diff --git a/worker/migrations.go b/worker/migrations.go index d2d1c6474..03728a3b7 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -10,7 +10,7 @@ import ( "go.sia.tech/renterd/object" ) -func (w *Worker) migrate(ctx context.Context, s object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) (int, bool, error) { +func (w *Worker) migrate(ctx context.Context, s object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) error { // make a map of good hosts goodHosts := make(map[types.PublicKey]map[types.FileContractID]bool) for _, c := range ulContracts { @@ -57,7 +57,7 @@ SHARDS: // if all shards are on good hosts, we're done if len(shardIndices) == 0 { - return 0, false, nil + return nil } // calculate the number of missing shards and take into account hosts for @@ -72,23 +72,23 @@ SHARDS: // perform some sanity checks if len(ulContracts) < int(s.MinShards) { - return 0, false, fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards)) + return fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards)) } if len(s.Shards)-missingShards < int(s.MinShards) { - return 0, false, fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-missingShards, int(s.MinShards)) + return fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-missingShards, int(s.MinShards)) } // acquire memory for the migration mem := w.uploadManager.mm.AcquireMemory(ctx, uint64(len(shardIndices))*rhpv2.SectorSize) if mem == nil { - return 0, false, fmt.Errorf("failed to acquire memory for migration") + return fmt.Errorf("failed to acquire memory for migration") } defer mem.Release() // download the slab - shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, s, dlContracts) + shards, err := w.downloadManager.DownloadSlab(ctx, s, dlContracts) if err != nil { - return 0, false, fmt.Errorf("failed to download slab for migration: %w", err) + return fmt.Errorf("failed to download slab for migration: %w", err) } s.Encrypt(shards) @@ -110,8 +110,8 @@ SHARDS: // migrate the shards err = w.uploadManager.UploadShards(ctx, s, shardIndices, shards, contractSet, allowed, bh, lockingPriorityUpload, mem) if err != nil { - return 0, surchargeApplied, fmt.Errorf("failed to upload slab for migration: %w", err) + return fmt.Errorf("failed to upload slab for migration: %w", err) } - return len(shards), surchargeApplied, nil + return nil } diff --git a/worker/mocks_test.go b/worker/mocks_test.go index 24a70adcb..46ae4f1b4 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -554,6 +554,10 @@ func (os *objectStoreMock) PackedSlabsForUpload(ctx context.Context, lockingDura return } +func (os *objectStoreMock) ListObjects(ctx context.Context, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) { + return api.ObjectsListResponse{}, nil +} + func (os *objectStoreMock) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab) error { os.mu.Lock() defer os.mu.Unlock() @@ -628,10 +632,6 @@ func (*s3Mock) CopyObject(context.Context, string, string, string, string, api.C return api.ObjectMetadata{}, nil } -func (*s3Mock) ListObjects(context.Context, string, api.ListObjectOptions) (resp api.ObjectsListResponse, err error) { - return api.ObjectsListResponse{}, nil -} - func (*s3Mock) AbortMultipartUpload(context.Context, string, string, string) (err error) { return nil } diff --git a/worker/upload_test.go b/worker/upload_test.go index d36c67a6e..4a2b73398 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -314,7 +314,7 @@ func TestMigrateLostSector(t *testing.T) { } // download the slab - shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) + shards, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) if err != nil { t.Fatal(err) } @@ -417,7 +417,7 @@ func TestUploadShards(t *testing.T) { } // download the slab - shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) + shards, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) if err != nil { t.Fatal(err) } diff --git a/worker/worker.go b/worker/worker.go index 3fffec52d..8322cb6f2 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -114,6 +114,7 @@ type ( AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.SlabSlice, slabBufferMaxSizeSoftReached bool, err error) AddUploadingSector(ctx context.Context, uID api.UploadID, id types.FileContractID, root types.Hash256) error FinishUpload(ctx context.Context, uID api.UploadID) error + ListObjects(ctx context.Context, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab) error TrackUpload(ctx context.Context, uID api.UploadID) error UpdateSlab(ctx context.Context, s object.Slab, contractSet string) error @@ -405,21 +406,21 @@ func (w *Worker) slabMigrateHandler(jc jape.Context) { } } - // migrate the slab - numShardsMigrated, surchargeApplied, err := w.migrate(ctx, slab, up.ContractSet, dlContracts, ulContracts, up.CurrentHeight) - if err != nil { - jc.Encode(api.MigrateSlabResponse{ - NumShardsMigrated: numShardsMigrated, - SurchargeApplied: surchargeApplied, - Error: err.Error(), - }) - return + // migrate the slab and handle alerts + err = w.migrate(ctx, slab, up.ContractSet, dlContracts, ulContracts, up.CurrentHeight) + if err != nil && !utils.IsErr(err, api.ErrSlabNotFound) { + var objects []api.ObjectMetadata + if res, err := w.bus.ListObjects(ctx, "", api.ListObjectOptions{SlabEncryptionKey: slab.EncryptionKey}); err != nil { + w.logger.Errorf("failed to list objects for slab key; %w", err) + } else { + objects = res.Objects + } + w.alerts.RegisterAlert(ctx, newMigrationFailedAlert(slab.EncryptionKey, slab.Health, objects, err)) + } else if err == nil { + w.alerts.DismissAlerts(jc.Request.Context(), alerts.IDForSlab(alertMigrationID, slab.EncryptionKey)) } - jc.Encode(api.MigrateSlabResponse{ - NumShardsMigrated: numShardsMigrated, - SurchargeApplied: surchargeApplied, - }) + jc.Check("failed to migrate slab", err) } func (w *Worker) downloadsStatsHandlerGET(jc jape.Context) {