From 90b7a35592b653cf89280993388e6d63e630053b Mon Sep 17 00:00:00 2001 From: PJ Date: Sat, 14 Sep 2024 14:46:43 +0200 Subject: [PATCH 01/11] api: add bucket to object metadata --- api/object.go | 1 + internal/test/e2e/cluster_test.go | 14 +++++++++++++- stores/metadata_test.go | 19 +++++++++++++++++-- stores/sql/main.go | 7 +++++-- stores/sql/mysql/main.go | 4 ++-- stores/sql/sqlite/main.go | 4 ++-- 6 files changed, 40 insertions(+), 9 deletions(-) diff --git a/api/object.go b/api/object.go index 09e33dba1..e97ab9ddf 100644 --- a/api/object.go +++ b/api/object.go @@ -63,6 +63,7 @@ type ( // ObjectMetadata contains various metadata about an object. ObjectMetadata struct { + Bucket string `json:"bucket"` ETag string `json:"eTag,omitempty"` Health float64 `json:"health"` ModTime TimeRFC3339 `json:"modTime"` diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 1b51b2fc1..acc75089c 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -44,6 +44,12 @@ func TestListObjectsWithNoDelimiter(t *testing.T) { start := time.Now() assertMetadata := func(entries []api.ObjectMetadata) { for i := range entries { + // assert bucket + if entries[i].Bucket != api.DefaultBucketName { + t.Fatal("unexpected bucket", entries[i].Bucket) + } + entries[i].Bucket = "" + // assert mod time if !strings.HasSuffix(entries[i].Key, "/") && !entries[i].ModTime.Std().After(start.UTC()) { t.Fatal("mod time should be set") @@ -145,7 +151,7 @@ func TestListObjectsWithNoDelimiter(t *testing.T) { t.Fatal(err) } - // assert mod time & clear it afterwards so we can compare + // assert metadata & clear it afterwards so we can compare assertMetadata(res.Objects) got := res.Objects @@ -389,6 +395,12 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { start := time.Now() assertMetadata := func(entries []api.ObjectMetadata) { for i := range entries { + // assert bucket + if entries[i].Bucket != api.DefaultBucketName { + t.Fatal("unexpected bucket", entries[i].Bucket) + } + entries[i].Bucket = "" + // assert mod time if !strings.HasSuffix(entries[i].Key, "/") && !entries[i].ModTime.Std().After(start.UTC()) { t.Fatal("mod time should be set") diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 0195b264f..ef0de556e 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -342,9 +342,10 @@ func TestObjectMetadata(t *testing.T) { t.Log(got.Object) t.Log(want) t.Fatal("object mismatch", cmp.Diff(got.Object, want, cmp.AllowUnexported(object.EncryptionKey{}))) - } - if !reflect.DeepEqual(got.Metadata, testMetadata) { + } else if !reflect.DeepEqual(got.Metadata, testMetadata) { t.Fatal("meta mismatch", cmp.Diff(got.Metadata, testMetadata)) + } else if got.Bucket != api.DefaultBucketName { + t.Fatal("unexpected bucket", got.Bucket) } // assert metadata CASCADE on object delete @@ -1124,6 +1125,7 @@ func TestSQLMetadataStore(t *testing.T) { expectedObj := api.Object{ ObjectMetadata: api.ObjectMetadata{ + Bucket: api.DefaultBucketName, ETag: testETag, Health: 1, ModTime: api.TimeRFC3339{}, @@ -1607,6 +1609,12 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { {"/", "", "size", "DESC", []api.ObjectMetadata{{Key: "/foo/", Size: 10, Health: .5}, {Key: "/FOO/", Size: 7, Health: 1}, {Key: "/fileś/", Size: 6, Health: 1}, {Key: "/gab/", Size: 5, Health: 1}}}, {"/", "", "size", "ASC", []api.ObjectMetadata{{Key: "/gab/", Size: 5, Health: 1}, {Key: "/fileś/", Size: 6, Health: 1}, {Key: "/FOO/", Size: 7, Health: 1}, {Key: "/foo/", Size: 10, Health: .5}}}, } + // set common fields + for i := range tests { + for j := range tests[i].want { + tests[i].want[j].Bucket = api.DefaultBucketName + } + } for _, test := range tests { resp, err := ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, "", -1) if err != nil { @@ -1709,6 +1717,12 @@ func TestListObjectsExplicitDir(t *testing.T) { }}, {"/dir/", "", "", "", []api.ObjectMetadata{{ETag: "d34db33f", Key: "/dir/file", Size: 1, Health: 0.5, MimeType: testMimeType}}}, } + // set common fields + for i := range tests { + for j := range tests[i].want { + tests[i].want[j].Bucket = api.DefaultBucketName + } + } for _, test := range tests { got, err := ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, "", -1) if err != nil { @@ -3716,6 +3730,7 @@ func TestListObjectsNoDelimiter(t *testing.T) { // set common fields for i := range tests { for j := range tests[i].want { + tests[i].want[j].Bucket = api.DefaultBucketName tests[i].want[j].ETag = testETag tests[i].want[j].MimeType = testMimeType } diff --git a/stores/sql/main.go b/stores/sql/main.go index 12b265587..2b3a7073b 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -1522,6 +1522,7 @@ func ObjectMetadata(ctx context.Context, tx Tx, bucket, key string) (api.Object, om, err := tx.ScanObjectMetadata(tx.QueryRow(ctx, fmt.Sprintf(` SELECT %s FROM objects o + INNER JOIN buckets b ON b.id = o.db_bucket_id WHERE o.id = ? `, tx.SelectObjectMetadataExpr()), objID)) if err != nil { @@ -2698,6 +2699,7 @@ func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, s rows, err := tx.Query(ctx, fmt.Sprintf(` SELECT %s FROM objects o + INNER JOIN buckets b ON b.id = o.db_bucket_id WHERE %s ORDER BY %s LIMIT ? @@ -2846,13 +2848,13 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s rows, err := tx.Query(ctx, fmt.Sprintf(` SELECT %s FROM ( - SELECT o.object_id, o.size, o.health, o.mime_type, o.created_at, o.etag + SELECT o.db_bucket_id, o.object_id, o.size, o.health, o.mime_type, o.created_at, o.etag FROM objects o LEFT JOIN directories d ON d.name = o.object_id WHERE o.object_id != ? AND o.db_directory_id = ? AND o.db_bucket_id = (SELECT id FROM buckets b WHERE b.name = ?) %s AND d.id IS NULL UNION ALL - SELECT d.name as object_id, SUM(o.size), MIN(o.health), '' as mime_type, MAX(o.created_at) as created_at, '' as etag + SELECT o.db_bucket_id, d.name as object_id, SUM(o.size), MIN(o.health), '' as mime_type, MAX(o.created_at) as created_at, '' as etag FROM objects o INNER JOIN directories d ON SUBSTR(o.object_id, 1, %s(d.name)) = d.name %s WHERE o.db_bucket_id = (SELECT id FROM buckets b WHERE b.name = ?) @@ -2861,6 +2863,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s AND d.db_parent_id = ? GROUP BY d.id ) AS o + INNER JOIN buckets b ON b.id = o.db_bucket_id %s ORDER BY %s LIMIT ? diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index 38f6bf97d..ac6a4c7bc 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -813,7 +813,7 @@ func (tx MainDatabaseTx) SaveAccounts(ctx context.Context, accounts []api.Accoun } func (tx *MainDatabaseTx) ScanObjectMetadata(s ssql.Scanner, others ...any) (md api.ObjectMetadata, err error) { - dst := []any{&md.Key, &md.Size, &md.Health, &md.MimeType, &md.ModTime, &md.ETag} + dst := []any{&md.Key, &md.Size, &md.Health, &md.MimeType, &md.ModTime, &md.ETag, &md.Bucket} dst = append(dst, others...) if err := s.Scan(dst...); err != nil { return api.ObjectMetadata{}, fmt.Errorf("failed to scan object metadata: %w", err) @@ -822,7 +822,7 @@ func (tx *MainDatabaseTx) ScanObjectMetadata(s ssql.Scanner, others ...any) (md } func (tx *MainDatabaseTx) SelectObjectMetadataExpr() string { - return "o.object_id, o.size, o.health, o.mime_type, o.created_at, o.etag" + return "o.object_id, o.size, o.health, o.mime_type, o.created_at, o.etag, b.name" } func (tx *MainDatabaseTx) Setting(ctx context.Context, key string) (string, error) { diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index 18c66fa28..a1e7385cc 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -825,7 +825,7 @@ func (tx *MainDatabaseTx) SaveAccounts(ctx context.Context, accounts []api.Accou func (tx *MainDatabaseTx) ScanObjectMetadata(s ssql.Scanner, others ...any) (md api.ObjectMetadata, err error) { var createdAt string - dst := []any{&md.Key, &md.Size, &md.Health, &md.MimeType, &createdAt, &md.ETag} + dst := []any{&md.Key, &md.Size, &md.Health, &md.MimeType, &createdAt, &md.ETag, &md.Bucket} dst = append(dst, others...) if err := s.Scan(dst...); err != nil { return api.ObjectMetadata{}, fmt.Errorf("failed to scan object metadata: %w", err) @@ -836,7 +836,7 @@ func (tx *MainDatabaseTx) ScanObjectMetadata(s ssql.Scanner, others ...any) (md } func (tx *MainDatabaseTx) SelectObjectMetadataExpr() string { - return "o.object_id, o.size, o.health, o.mime_type, DATETIME(o.created_at), o.etag" + return "o.object_id, o.size, o.health, o.mime_type, DATETIME(o.created_at), o.etag, b.name" } func (tx *MainDatabaseTx) UpdateContractSet(ctx context.Context, name string, toAdd, toRemove []types.FileContractID) error { From 371f3fc15bca5b97512d993b2e025abf6a6d115e Mon Sep 17 00:00:00 2001 From: PJ Date: Sat, 14 Sep 2024 14:50:05 +0200 Subject: [PATCH 02/11] bus: remove objects by slab key --- autopilot/autopilot.go | 1 - autopilot/migrator.go | 28 +++--- bus/bus.go | 2 - bus/client/objects.go | 8 -- bus/routes.go | 16 ---- internal/test/e2e/cluster_test.go | 31 ++++--- stores/metadata.go | 8 -- stores/metadata_test.go | 147 +++++++++++++++--------------- stores/sql/database.go | 4 - stores/sql/main.go | 29 ------ stores/sql/mysql/main.go | 4 - stores/sql/sqlite/main.go | 4 - 12 files changed, 107 insertions(+), 175 deletions(-) diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 12f4c809e..0cafa5fd0 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -67,7 +67,6 @@ type Bus interface { ListBuckets(ctx context.Context) ([]api.Bucket, error) // objects - ObjectsBySlabKey(ctx context.Context, bucket string, key object.EncryptionKey) (objects []api.ObjectMetadata, err error) RefreshHealth(ctx context.Context) error Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error) SlabsForMigration(ctx context.Context, healthCutoff float64, set string, limit int) ([]api.UnhealthySlab, error) diff --git a/autopilot/migrator.go b/autopilot/migrator.go index 251c5e893..a84669ec5 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -327,18 +327,22 @@ func (m *migrator) objectIDsForSlabKey(ctx context.Context, key object.Encryptio // fetch all objects per bucket idsPerBucket := make(map[string][]string) for _, bucket := range buckets { - objects, err := m.ap.bus.ObjectsBySlabKey(ctx, bucket.Name, key) - if err != nil { - m.logger.Errorf("failed to fetch objects for slab key in bucket %v; %w", bucket, err) - continue - } else if len(objects) == 0 { - continue - } - - idsPerBucket[bucket.Name] = make([]string, len(objects)) - for i, object := range objects { - idsPerBucket[bucket.Name][i] = object.Key - } + _ = bucket + _ = key + // TODO PJ: fetch objects by slab key + // + // objects, err := m.ap.bus.ObjectsBySlabKey(ctx, bucket.Name, key) + // if err != nil { + // m.logger.Errorf("failed to fetch objects for slab key in bucket %v; %w", bucket, err) + // continue + // } else if len(objects) == 0 { + // continue + // } + + // idsPerBucket[bucket.Name] = make([]string, len(objects)) + // for i, object := range objects { + // idsPerBucket[bucket.Name][i] = object.Key + // } } return idsPerBucket, nil diff --git a/bus/bus.go b/bus/bus.go index 90db3764d..8b80f50c3 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -238,7 +238,6 @@ type ( ListObjects(ctx context.Context, bucketName, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) Object(ctx context.Context, bucketName, key string) (api.Object, error) ObjectMetadata(ctx context.Context, bucketName, key string) (api.Object, error) - ObjectsBySlabKey(ctx context.Context, bucketName string, slabKey object.EncryptionKey) ([]api.ObjectMetadata, error) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error) RemoveObject(ctx context.Context, bucketName, key string) error RemoveObjects(ctx context.Context, bucketName, prefix string) error @@ -487,7 +486,6 @@ func (b *Bus) Handler() http.Handler { "POST /slabs/partial": b.slabsPartialHandlerPOST, "POST /slabs/refreshhealth": b.slabsRefreshHealthHandlerPOST, "GET /slab/:key": b.slabHandlerGET, - "GET /slab/:key/objects": b.slabObjectsHandlerGET, "PUT /slab": b.slabHandlerPUT, "GET /state": b.stateHandlerGET, diff --git a/bus/client/objects.go b/bus/client/objects.go index 1a1f7fc0b..1b8ec20cb 100644 --- a/bus/client/objects.go +++ b/bus/client/objects.go @@ -75,14 +75,6 @@ func (c *Client) Objects(ctx context.Context, bucket string, prefix string, opts return } -// ObjectsBySlabKey returns all objects that reference a given slab. -func (c *Client) ObjectsBySlabKey(ctx context.Context, bucket string, key object.EncryptionKey) (objects []api.ObjectMetadata, err error) { - values := url.Values{} - values.Set("bucket", bucket) - err = c.c.WithContext(ctx).GET(fmt.Sprintf("/slab/%v/objects?"+values.Encode(), key), &objects) - return -} - // ObjectsStats returns information about the number of objects and their size. func (c *Client) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (osr api.ObjectsStatsResponse, err error) { values := url.Values{} diff --git a/bus/routes.go b/bus/routes.go index d1505af0d..55c3e7248 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -1468,22 +1468,6 @@ func (b *Bus) sectorsHostRootHandlerDELETE(jc jape.Context) { } } -func (b *Bus) slabObjectsHandlerGET(jc jape.Context) { - var key object.EncryptionKey - if jc.DecodeParam("key", &key) != nil { - return - } - bucket := api.DefaultBucketName - if jc.DecodeForm("bucket", &bucket) != nil { - return - } - objects, err := b.ms.ObjectsBySlabKey(jc.Request.Context(), bucket, key) - if jc.Check("failed to retrieve objects by slab", err) != nil { - return - } - jc.Encode(objects) -} - func (b *Bus) slabHandlerGET(jc jape.Context) { var key object.EncryptionKey if jc.DecodeParam("key", &key) != nil { diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index acc75089c..0fea4014f 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -1755,23 +1755,24 @@ func TestUploadPacking(t *testing.T) { return nil }) + // TODO PJ: use Objects // ObjectsBySlabKey should return 2 objects for the slab of file1 since file1 // and file2 share the same slab. - res, err := b.Object(context.Background(), api.DefaultBucketName, "file1", api.GetObjectOptions{}) - tt.OK(err) - objs, err := b.ObjectsBySlabKey(context.Background(), api.DefaultBucketName, res.Object.Slabs[0].EncryptionKey) - tt.OK(err) - if len(objs) != 2 { - t.Fatal("expected 2 objects", len(objs)) - } - sort.Slice(objs, func(i, j int) bool { - return objs[i].Key < objs[j].Key // make result deterministic - }) - if objs[0].Key != "/file1" { - t.Fatal("expected file1", objs[0].Key) - } else if objs[1].Key != "/file2" { - t.Fatal("expected file2", objs[1].Key) - } + // res, err := b.Object(context.Background(), api.DefaultBucketName, "file1", api.GetObjectOptions{}) + // tt.OK(err) + // objs, err := b.ObjectsBySlabKey(context.Background(), api.DefaultBucketName, res.Object.Slabs[0].EncryptionKey) + // tt.OK(err) + // if len(objs) != 2 { + // t.Fatal("expected 2 objects", len(objs)) + // } + // sort.Slice(objs, func(i, j int) bool { + // return objs[i].Key < objs[j].Key // make result deterministic + // }) + // if objs[0].Key != "/file1" { + // t.Fatal("expected file1", objs[0].Key) + // } else if objs[1].Key != "/file2" { + // t.Fatal("expected file2", objs[1].Key) + // } } func TestWallet(t *testing.T) { diff --git a/stores/metadata.go b/stores/metadata.go index 64acb2ec8..5dbda1f80 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -649,14 +649,6 @@ func (s *SQLStore) PackedSlabsForUpload(ctx context.Context, lockingDuration tim return s.slabBufferMgr.SlabsForUpload(ctx, lockingDuration, minShards, totalShards, set, limit) } -func (s *SQLStore) ObjectsBySlabKey(ctx context.Context, bucket string, slabKey object.EncryptionKey) (metadata []api.ObjectMetadata, err error) { - err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { - metadata, err = tx.ObjectsBySlabKey(ctx, bucket, slabKey) - return err - }) - return -} - func (s *SQLStore) PrunableContractRoots(ctx context.Context, fcid types.FileContractID, roots []types.Hash256) (indices []uint64, err error) { err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { indices, err = tx.PrunableContractRoots(ctx, fcid, roots) diff --git a/stores/metadata_test.go b/stores/metadata_test.go index ef0de556e..57b79f32e 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -3269,67 +3269,68 @@ func TestContractSizes(t *testing.T) { } } -func TestObjectsBySlabKey(t *testing.T) { - ss := newTestSQLStore(t, defaultTestSQLStoreConfig) - defer ss.Close() - - // create a host - hks, err := ss.addTestHosts(1) - if err != nil { - t.Fatal(err) - } - hk1 := hks[0] - - // create a contract - fcids, _, err := ss.addTestContracts(hks) - if err != nil { - t.Fatal(err) - } - fcid1 := fcids[0] - - // create a slab. - slab := object.Slab{ - Health: 1.0, - EncryptionKey: object.GenerateEncryptionKey(), - MinShards: 1, - Shards: newTestShards(hk1, fcid1, types.Hash256{1}), - } - - // Add 3 objects that all reference the slab. - obj := object.Object{ - Key: object.GenerateEncryptionKey(), - Slabs: []object.SlabSlice{ - { - Slab: slab, - Offset: 1, - Length: 0, // incremented later - }, - }, - } - for _, name := range []string{"obj1", "obj2", "obj3"} { - obj.Slabs[0].Length++ - if _, err := ss.addTestObject(name, obj); err != nil { - t.Fatal(err) - } - } - - // Fetch the objects by slab. - objs, err := ss.ObjectsBySlabKey(context.Background(), api.DefaultBucketName, slab.EncryptionKey) - if err != nil { - t.Fatal(err) - } - for i, name := range []string{"obj1", "obj2", "obj3"} { - if objs[i].Key != name { - t.Fatal("unexpected object name", objs[i].Key, name) - } - if objs[i].Size != int64(i)+1 { - t.Fatal("unexpected object size", objs[i].Size, i+1) - } - if objs[i].Health != 1.0 { - t.Fatal("unexpected object health", objs[i].Health) - } - } -} +// TODO PJ: use this as a list test +// func TestObjectsBySlabKey(t *testing.T) { +// ss := newTestSQLStore(t, defaultTestSQLStoreConfig) +// defer ss.Close() + +// // create a host +// hks, err := ss.addTestHosts(1) +// if err != nil { +// t.Fatal(err) +// } +// hk1 := hks[0] + +// // create a contract +// fcids, _, err := ss.addTestContracts(hks) +// if err != nil { +// t.Fatal(err) +// } +// fcid1 := fcids[0] + +// // create a slab. +// slab := object.Slab{ +// Health: 1.0, +// EncryptionKey: object.GenerateEncryptionKey(), +// MinShards: 1, +// Shards: newTestShards(hk1, fcid1, types.Hash256{1}), +// } + +// // Add 3 objects that all reference the slab. +// obj := object.Object{ +// Key: object.GenerateEncryptionKey(), +// Slabs: []object.SlabSlice{ +// { +// Slab: slab, +// Offset: 1, +// Length: 0, // incremented later +// }, +// }, +// } +// for _, name := range []string{"obj1", "obj2", "obj3"} { +// obj.Slabs[0].Length++ +// if _, err := ss.addTestObject(name, obj); err != nil { +// t.Fatal(err) +// } +// } + +// // Fetch the objects by slab. +// objs, err := ss.ObjectsBySlabKey(context.Background(), api.DefaultBucketName, slab.EncryptionKey) +// if err != nil { +// t.Fatal(err) +// } +// for i, name := range []string{"obj1", "obj2", "obj3"} { +// if objs[i].Key != name { +// t.Fatal("unexpected object name", objs[i].Key, name) +// } +// if objs[i].Size != int64(i)+1 { +// t.Fatal("unexpected object size", objs[i].Size, i+1) +// } +// if objs[i].Health != 1.0 { +// t.Fatal("unexpected object health", objs[i].Health) +// } +// } +// } func TestBuckets(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) @@ -3526,18 +3527,20 @@ func TestBucketObjects(t *testing.T) { t.Fatal(err) } + // TODO PJ: use Objects + // // See if we can fetch the object by slab. - if obj, err := ss.Object(context.Background(), b1, "/bar"); err != nil { - t.Fatal(err) - } else if objects, err := ss.ObjectsBySlabKey(context.Background(), b1, obj.Slabs[0].EncryptionKey); err != nil { - t.Fatal(err) - } else if len(objects) != 1 { - t.Fatal("expected 1 object", len(objects)) - } else if objects, err := ss.ObjectsBySlabKey(context.Background(), b2, obj.Slabs[0].EncryptionKey); err != nil { - t.Fatal(err) - } else if len(objects) != 0 { - t.Fatal("expected 0 objects", len(objects)) - } + // if obj, err := ss.Object(context.Background(), b1, "/bar"); err != nil { + // t.Fatal(err) + // } else if objects, err := ss.ObjectsBySlabKey(context.Background(), b1, obj.Slabs[0].EncryptionKey); err != nil { + // t.Fatal(err) + // } else if len(objects) != 1 { + // t.Fatal("expected 1 object", len(objects)) + // } else if objects, err := ss.ObjectsBySlabKey(context.Background(), b2, obj.Slabs[0].EncryptionKey); err != nil { + // t.Fatal(err) + // } else if len(objects) != 0 { + // t.Fatal("expected 0 objects", len(objects)) + // } } func TestCopyObject(t *testing.T) { diff --git a/stores/sql/database.go b/stores/sql/database.go index 651fe81f3..0e5cb21b1 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -223,10 +223,6 @@ type ( // ObjectMetadata returns an object's metadata. ObjectMetadata(ctx context.Context, bucket, key string) (api.Object, error) - // ObjectsBySlabKey returns all objects that contain a reference to the - // slab with the given slabKey. - ObjectsBySlabKey(ctx context.Context, bucket string, slabKey object.EncryptionKey) (metadata []api.ObjectMetadata, err error) - // ObjectsStats returns overall stats about stored objects ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error) diff --git a/stores/sql/main.go b/stores/sql/main.go index 2b3a7073b..3bcbb033a 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -2365,35 +2365,6 @@ func scanStateElement(s Scanner) (types.StateElement, error) { }, nil } -func ObjectsBySlabKey(ctx context.Context, tx Tx, bucket string, slabKey object.EncryptionKey) ([]api.ObjectMetadata, error) { - rows, err := tx.Query(ctx, fmt.Sprintf(` - SELECT %s - FROM objects o - INNER JOIN buckets b ON o.db_bucket_id = b.id - WHERE b.name = ? AND EXISTS ( - SELECT 1 - FROM objects o2 - INNER JOIN slices sli ON sli.db_object_id = o2.id - INNER JOIN slabs sla ON sla.id = sli.db_slab_id - WHERE o2.id = o.id AND sla.key = ? - ) - `, tx.SelectObjectMetadataExpr()), bucket, EncryptionKey(slabKey)) - if err != nil { - return nil, fmt.Errorf("failed to query objects: %w", err) - } - defer rows.Close() - - var objects []api.ObjectMetadata - for rows.Next() { - om, err := tx.ScanObjectMetadata(rows) - if err != nil { - return nil, fmt.Errorf("failed to scan object metadata: %w", err) - } - objects = append(objects, om) - } - return objects, nil -} - func MarkPackedSlabUploaded(ctx context.Context, tx Tx, slab api.UploadedPackedSlab) (string, error) { // fetch relevant slab info var slabID, bufferedSlabID int64 diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index ac6a4c7bc..fb64eaf69 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -547,10 +547,6 @@ func (tx *MainDatabaseTx) ObjectMetadata(ctx context.Context, bucket, key string return ssql.ObjectMetadata(ctx, tx, bucket, key) } -func (tx *MainDatabaseTx) ObjectsBySlabKey(ctx context.Context, bucket string, slabKey object.EncryptionKey) (metadata []api.ObjectMetadata, err error) { - return ssql.ObjectsBySlabKey(ctx, tx, bucket, slabKey) -} - func (tx *MainDatabaseTx) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error) { return ssql.ObjectsStats(ctx, tx, opts) } diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index a1e7385cc..d80a8b3b7 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -543,10 +543,6 @@ func (tx *MainDatabaseTx) ObjectMetadata(ctx context.Context, bucket, key string return ssql.ObjectMetadata(ctx, tx, bucket, key) } -func (tx *MainDatabaseTx) ObjectsBySlabKey(ctx context.Context, bucket string, slabKey object.EncryptionKey) (metadata []api.ObjectMetadata, err error) { - return ssql.ObjectsBySlabKey(ctx, tx, bucket, slabKey) -} - func (tx *MainDatabaseTx) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error) { return ssql.ObjectsStats(ctx, tx, opts) } From 33f01c08154e24db7b7a7e8e827fa3902ffb6ef9 Mon Sep 17 00:00:00 2001 From: PJ Date: Sun, 15 Sep 2024 19:10:42 +0200 Subject: [PATCH 03/11] stores: make bucket optional --- api/object.go | 4 ++ bus/client/objects.go | 5 +- bus/routes.go | 3 +- internal/test/e2e/cluster_test.go | 49 +++++++++------- stores/metadata_test.go | 10 +++- stores/sql/main.go | 93 ++++++++++++++++++++++--------- worker/s3/backend.go | 3 +- worker/s3/s3.go | 2 +- 8 files changed, 115 insertions(+), 54 deletions(-) diff --git a/api/object.go b/api/object.go index e97ab9ddf..a27d39627 100644 --- a/api/object.go +++ b/api/object.go @@ -207,6 +207,7 @@ type ( } ListObjectOptions struct { + Bucket string Delimiter string Limit int Marker string @@ -311,6 +312,9 @@ func (opts GetObjectOptions) Apply(values url.Values) { } func (opts ListObjectOptions) Apply(values url.Values) { + if opts.Bucket != "" { + values.Set("bucket", opts.Bucket) + } if opts.Delimiter != "" { values.Set("delimiter", opts.Delimiter) } diff --git a/bus/client/objects.go b/bus/client/objects.go index 1b8ec20cb..ea44ce20f 100644 --- a/bus/client/objects.go +++ b/bus/client/objects.go @@ -62,10 +62,9 @@ func (c *Client) Object(ctx context.Context, bucket, key string, opts api.GetObj return } -// Objects lists objects in the given bucket. -func (c *Client) Objects(ctx context.Context, bucket string, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) { +// ListObjects lists objects in the given bucket. +func (c *Client) ListObjects(ctx context.Context, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) { values := url.Values{} - values.Set("bucket", bucket) opts.Apply(values) prefix = api.ObjectKeyEscape(prefix) diff --git a/bus/routes.go b/bus/routes.go index 55c3e7248..7c16061a5 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -1148,8 +1148,7 @@ func (b *Bus) objectHandlerGET(jc jape.Context) { } func (b *Bus) objectsHandlerGET(jc jape.Context) { - var marker, delim, sortBy, sortDir, substring string - bucket := api.DefaultBucketName + var bucket, marker, delim, sortBy, sortDir, substring string if jc.DecodeForm("bucket", &bucket) != nil { return } diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index 0fea4014f..c7e240afc 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -123,7 +123,8 @@ func TestListObjectsWithNoDelimiter(t *testing.T) { } for _, test := range tests { // use the bus client - res, err := b.Objects(context.Background(), api.DefaultBucketName, test.prefix, api.ListObjectOptions{ + res, err := b.ListObjects(context.Background(), test.prefix, api.ListObjectOptions{ + Bucket: api.DefaultBucketName, SortBy: test.sortBy, SortDir: test.sortDir, Limit: -1, @@ -141,7 +142,8 @@ func TestListObjectsWithNoDelimiter(t *testing.T) { if len(res.Objects) > 0 { marker := "" for offset := 0; offset < len(test.want); offset++ { - res, err := b.Objects(context.Background(), api.DefaultBucketName, test.prefix, api.ListObjectOptions{ + res, err := b.ListObjects(context.Background(), test.prefix, api.ListObjectOptions{ + Bucket: api.DefaultBucketName, SortBy: test.sortBy, SortDir: test.sortDir, Marker: marker, @@ -166,7 +168,8 @@ func TestListObjectsWithNoDelimiter(t *testing.T) { } // list invalid marker - _, err := b.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{ + _, err := b.ListObjects(context.Background(), "", api.ListObjectOptions{ + Bucket: api.DefaultBucketName, Marker: "invalid", SortBy: api.ObjectSortByHealth, }) @@ -490,7 +493,8 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { } for _, test := range tests { // use the bus client - res, err := b.Objects(context.Background(), api.DefaultBucketName, test.path+test.prefix, api.ListObjectOptions{ + res, err := b.ListObjects(context.Background(), test.path+test.prefix, api.ListObjectOptions{ + Bucket: api.DefaultBucketName, Delimiter: "/", SortBy: test.sortBy, SortDir: test.sortDir, @@ -505,7 +509,8 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { } var marker string for offset := 0; offset < len(test.want); offset++ { - res, err := b.Objects(context.Background(), api.DefaultBucketName, test.path+test.prefix, api.ListObjectOptions{ + res, err := b.ListObjects(context.Background(), test.path+test.prefix, api.ListObjectOptions{ + Bucket: api.DefaultBucketName, Delimiter: "/", SortBy: test.sortBy, SortDir: test.sortDir, @@ -531,7 +536,8 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { continue } - res, err = b.Objects(context.Background(), api.DefaultBucketName, test.path+test.prefix, api.ListObjectOptions{ + res, err = b.ListObjects(context.Background(), test.path+test.prefix, api.ListObjectOptions{ + Bucket: api.DefaultBucketName, Delimiter: "/", SortBy: test.sortBy, SortDir: test.sortDir, @@ -560,7 +566,7 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { } // assert root dir is empty - if resp, err := b.Objects(context.Background(), api.DefaultBucketName, "/", api.ListObjectOptions{}); err != nil { + if resp, err := b.ListObjects(context.Background(), "/", api.ListObjectOptions{Bucket: api.DefaultBucketName}); err != nil { t.Fatal(err) } else if len(resp.Objects) != 0 { t.Fatal("there should be no entries left", resp.Objects) @@ -774,7 +780,8 @@ func TestUploadDownloadExtended(t *testing.T) { tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader(file2), api.DefaultBucketName, "fileś/file2", api.UploadObjectOptions{})) // fetch all entries from the worker - resp, err := cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "fileś/", api.ListObjectOptions{ + resp, err := cluster.Bus.ListObjects(context.Background(), "fileś/", api.ListObjectOptions{ + Bucket: api.DefaultBucketName, Delimiter: "/", }) tt.OK(err) @@ -789,7 +796,8 @@ func TestUploadDownloadExtended(t *testing.T) { } // fetch entries in /fileś starting with "file" - res, err := cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "fileś/file", api.ListObjectOptions{ + res, err := cluster.Bus.ListObjects(context.Background(), "fileś/file", api.ListObjectOptions{ + Bucket: api.DefaultBucketName, Delimiter: "/", }) tt.OK(err) @@ -798,7 +806,8 @@ func TestUploadDownloadExtended(t *testing.T) { } // fetch entries in /fileś starting with "foo" - res, err = cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "fileś/foo", api.ListObjectOptions{ + res, err = cluster.Bus.ListObjects(context.Background(), "fileś/foo", api.ListObjectOptions{ + Bucket: api.DefaultBucketName, Delimiter: "/", }) tt.OK(err) @@ -970,17 +979,17 @@ func TestUploadDownloadSpending(t *testing.T) { uploadDownload() // Fuzzy search for uploaded data in various ways. - resp, err := cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{}) + resp, err := cluster.Bus.ListObjects(context.Background(), "", api.ListObjectOptions{Bucket: api.DefaultBucketName}) tt.OK(err) if len(resp.Objects) != 2 { t.Fatalf("should have 2 objects but got %v", len(resp.Objects)) } - resp, err = cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{Substring: "ata"}) + resp, err = cluster.Bus.ListObjects(context.Background(), "", api.ListObjectOptions{Bucket: api.DefaultBucketName, Substring: "ata"}) tt.OK(err) if len(resp.Objects) != 2 { t.Fatalf("should have 2 objects but got %v", len(resp.Objects)) } - resp, err = cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{Substring: "1258"}) + resp, err = cluster.Bus.ListObjects(context.Background(), "", api.ListObjectOptions{Bucket: api.DefaultBucketName, Substring: "1258"}) tt.OK(err) if len(resp.Objects) != 1 { t.Fatalf("should have 1 objects but got %v", len(resp.Objects)) @@ -1214,7 +1223,7 @@ func TestParallelUpload(t *testing.T) { wg.Wait() // Check if objects exist. - resp, err := cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{Substring: "/dir/", Limit: 100}) + resp, err := cluster.Bus.ListObjects(context.Background(), "", api.ListObjectOptions{Bucket: api.DefaultBucketName, Substring: "/dir/", Limit: 100}) tt.OK(err) if len(resp.Objects) != 3 { t.Fatal("wrong number of objects", len(resp.Objects)) @@ -1223,7 +1232,7 @@ func TestParallelUpload(t *testing.T) { // Upload one more object. tt.OKAll(w.UploadObject(context.Background(), bytes.NewReader([]byte("data")), api.DefaultBucketName, "/foo", api.UploadObjectOptions{})) - resp, err = cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{Substring: "/", Limit: 100}) + resp, err = cluster.Bus.ListObjects(context.Background(), "", api.ListObjectOptions{Bucket: api.DefaultBucketName, Substring: "/", Limit: 100}) tt.OK(err) if len(resp.Objects) != 4 { t.Fatal("wrong number of objects", len(resp.Objects)) @@ -1233,8 +1242,8 @@ func TestParallelUpload(t *testing.T) { if err := cluster.Bus.DeleteObject(context.Background(), api.DefaultBucketName, "/dir/", api.DeleteObjectOptions{Batch: true}); err != nil { t.Fatal(err) } - resp, err = cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{Substring: "/", Limit: 100}) - cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{Substring: "/", Limit: 100}) + resp, err = cluster.Bus.ListObjects(context.Background(), "", api.ListObjectOptions{Bucket: api.DefaultBucketName, Substring: "/", Limit: 100}) + cluster.Bus.ListObjects(context.Background(), "", api.ListObjectOptions{Bucket: api.DefaultBucketName, Substring: "/", Limit: 100}) tt.OK(err) if len(resp.Objects) != 1 { t.Fatal("objects weren't deleted") @@ -1244,8 +1253,8 @@ func TestParallelUpload(t *testing.T) { if err := cluster.Bus.DeleteObject(context.Background(), api.DefaultBucketName, "/", api.DeleteObjectOptions{Batch: true}); err != nil { t.Fatal(err) } - resp, err = cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{Substring: "/", Limit: 100}) - cluster.Bus.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{Substring: "/", Limit: 100}) + resp, err = cluster.Bus.ListObjects(context.Background(), "", api.ListObjectOptions{Bucket: api.DefaultBucketName, Substring: "/", Limit: 100}) + cluster.Bus.ListObjects(context.Background(), "", api.ListObjectOptions{Bucket: api.DefaultBucketName, Substring: "/", Limit: 100}) tt.OK(err) if len(resp.Objects) != 0 { t.Fatal("objects weren't deleted") @@ -1662,7 +1671,7 @@ func TestUploadPacking(t *testing.T) { if res.Size != int64(len(data)) { t.Fatal("unexpected size after upload", res.Size, len(data)) } - resp, err := b.Objects(context.Background(), api.DefaultBucketName, "", api.ListObjectOptions{}) + resp, err := b.ListObjects(context.Background(), "", api.ListObjectOptions{Bucket: api.DefaultBucketName}) if err != nil { t.Fatal(err) } diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 57b79f32e..bf2e74a92 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -3269,7 +3269,7 @@ func TestContractSizes(t *testing.T) { } } -// TODO PJ: use this as a list test +//  : use this as a list test // func TestObjectsBySlabKey(t *testing.T) { // ss := newTestSQLStore(t, defaultTestSQLStoreConfig) // defer ss.Close() @@ -3435,6 +3435,10 @@ func TestBucketObjects(t *testing.T) { t.Fatal("expected 1 entry", len(entries)) } else if entries[0].Size != 2 { t.Fatal("unexpected size", entries[0].Size) + } else if resp, err := ss.ListObjects(context.Background(), "", "/foo/", "", "", "", "", "", -1); err != nil { + t.Fatal(err) + } else if entries := resp.Objects; len(entries) != 2 { + t.Fatal("expected 2 entries", len(entries)) } // Search the objects in the buckets. @@ -3450,6 +3454,10 @@ func TestBucketObjects(t *testing.T) { t.Fatal("expected 2 objects", len(objects)) } else if objects[0].Size != 4 || objects[1].Size != 2 { t.Fatal("unexpected size", objects[0].Size, objects[1].Size) + } else if resp, err := ss.ListObjects(context.Background(), "", "", "", "", "", "", "", -1); err != nil { + t.Fatal(err) + } else if objects := resp.Objects; len(objects) != 4 { + t.Fatal("expected 4 objects", len(objects)) } // Rename object foo/bar in bucket 1 to foo/baz but not in bucket 2. diff --git a/stores/sql/main.go b/stores/sql/main.go index 3bcbb033a..c080cc592 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -2621,9 +2621,14 @@ func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, s sortDir = api.SortDirAsc } - // filter by bucket - whereExprs := []string{"o.db_bucket_id = (SELECT id FROM buckets b WHERE b.name = ?)"} - whereArgs := []any{bucket} + var whereExprs []string + var whereArgs []any + + // apply bucket + if bucket != "" { + whereExprs = append(whereExprs, "b.name = ?") + whereArgs = append(whereArgs, bucket) + } // apply prefix if prefix != "" { @@ -2645,12 +2650,20 @@ func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, s // apply marker markerExprs, markerArgs, err := whereObjectMarker(marker, sortBy, sortDir, func(dst any, marker, col string) error { + markerExprs := []string{"o.object_id = ?"} + markerArgs := []any{marker} + + if bucket != "" { + markerExprs = append(markerExprs, "b.name = ?") + markerArgs = append(markerArgs, bucket) + } + err := tx.QueryRow(ctx, fmt.Sprintf(` SELECT o.%s FROM objects o INNER JOIN buckets b ON o.db_bucket_id = b.id - WHERE b.name = ? AND o.object_id = ? - `, col), bucket, marker).Scan(dst) + WHERE %s + `, col, strings.Join(markerExprs, " AND ")), markerArgs...).Scan(dst) if errors.Is(err, dsql.ErrNoRows) { return api.ErrMarkerNotFound } else { @@ -2666,17 +2679,23 @@ func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, s // apply limit whereArgs = append(whereArgs, limit) + // build where expression + var whereExpr string + if len(whereExprs) > 0 { + whereExpr = fmt.Sprintf("WHERE %s", strings.Join(whereExprs, " AND ")) + } + // run query rows, err := tx.Query(ctx, fmt.Sprintf(` - SELECT %s - FROM objects o - INNER JOIN buckets b ON b.id = o.db_bucket_id - WHERE %s - ORDER BY %s - LIMIT ? - `, + SELECT %s + FROM objects o + INNER JOIN buckets b ON b.id = o.db_bucket_id + %s + ORDER BY %s + LIMIT ? +`, tx.SelectObjectMetadataExpr(), - strings.Join(whereExprs, " AND "), + whereExpr, strings.Join(orderByExprs, ", ")), whereArgs...) if err != nil { @@ -2746,7 +2765,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s args := []any{ path, - dirID, bucket, + dirID, } // apply prefix @@ -2760,14 +2779,13 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s } args = append(args, - bucket, path+"%", utf8.RuneCountInString(path), path, dirID, ) // apply marker - var whereExpr string + var whereExprs []string markerExprs, markerArgs, err := whereObjectMarker(marker, sortBy, sortDir, func(dst any, marker, col string) error { var groupFn string switch col { @@ -2778,19 +2796,34 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s default: return fmt.Errorf("unknown column: %v", col) } + + markerExprsObj := []string{"o.object_id = ?"} + markerArgsObj := []any{marker} + if bucket != "" { + markerExprsObj = append(markerExprsObj, "b.name = ?") + markerArgsObj = append(markerArgsObj, bucket) + } + + markerExprsDir := []string{"d.name = ?"} + markerArgsDir := []any{marker} + if bucket != "" { + markerExprsDir = append(markerExprsDir, "b.name = ?") + markerArgsDir = append(markerArgsDir, bucket) + } + err := tx.QueryRow(ctx, fmt.Sprintf(` SELECT o.%s FROM objects o INNER JOIN buckets b ON o.db_bucket_id = b.id - WHERE b.name = ? AND o.object_id = ? + WHERE %s UNION ALL SELECT %s(o.%s) FROM objects o INNER JOIN buckets b ON o.db_bucket_id = b.id INNER JOIN directories d ON SUBSTR(o.object_id, 1, %s(d.name)) = d.name - WHERE b.name = ? AND d.name = ? + WHERE %s GROUP BY d.id - `, col, groupFn, col, tx.CharLengthExpr()), bucket, marker, bucket, marker).Scan(dst) + `, col, strings.Join(markerExprsObj, " AND "), groupFn, col, tx.CharLengthExpr(), strings.Join(markerExprsDir, " AND ")), append(markerArgsObj, markerArgsDir...)...).Scan(dst) if errors.Is(err, dsql.ErrNoRows) { return api.ErrMarkerNotFound } else { @@ -2800,7 +2833,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s if err != nil { return api.ObjectsListResponse{}, fmt.Errorf("failed to query marker: %w", err) } else if len(markerExprs) > 0 { - whereExpr = "WHERE " + strings.Join(markerExprs, " AND ") + whereExprs = append(whereExprs, markerExprs...) } args = append(args, markerArgs...) @@ -2810,9 +2843,21 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s return api.ObjectsListResponse{}, fmt.Errorf("failed to apply sorting: %w", err) } + // apply bucket + if bucket != "" { + whereExprs = append(whereExprs, "b.name = ?") + args = append(args, bucket) + } + // apply offset and limit args = append(args, limit) + // build where expression + var whereExpr string + if len(whereExprs) > 0 { + whereExpr = fmt.Sprintf("WHERE %s", strings.Join(whereExprs, " AND ")) + } + // objectsQuery consists of 2 parts // 1. fetch all objects in requested directory // 2. fetch all sub-directories @@ -2822,16 +2867,12 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s SELECT o.db_bucket_id, o.object_id, o.size, o.health, o.mime_type, o.created_at, o.etag FROM objects o LEFT JOIN directories d ON d.name = o.object_id - WHERE o.object_id != ? AND o.db_directory_id = ? AND o.db_bucket_id = (SELECT id FROM buckets b WHERE b.name = ?) %s - AND d.id IS NULL + WHERE o.object_id != ? AND o.db_directory_id = ? AND d.id IS NULL %s UNION ALL SELECT o.db_bucket_id, d.name as object_id, SUM(o.size), MIN(o.health), '' as mime_type, MAX(o.created_at) as created_at, '' as etag FROM objects o INNER JOIN directories d ON SUBSTR(o.object_id, 1, %s(d.name)) = d.name %s - WHERE o.db_bucket_id = (SELECT id FROM buckets b WHERE b.name = ?) - AND o.object_id LIKE ? - AND SUBSTR(o.object_id, 1, ?) = ? - AND d.db_parent_id = ? + WHERE o.object_id LIKE ? AND SUBSTR(o.object_id, 1, ?) = ? AND d.db_parent_id = ? GROUP BY d.id ) AS o INNER JOIN buckets b ON b.id = o.db_bucket_id diff --git a/worker/s3/backend.go b/worker/s3/backend.go index 28e1f7299..fc68b1cd0 100644 --- a/worker/s3/backend.go +++ b/worker/s3/backend.go @@ -91,7 +91,8 @@ func (s *s3) ListBucket(ctx context.Context, bucketName string, prefix *gofakes3 page.Marker = "/" + page.Marker } - resp, err := s.b.Objects(ctx, bucketName, prefix.Prefix, api.ListObjectOptions{ + resp, err := s.b.ListObjects(ctx, prefix.Prefix, api.ListObjectOptions{ + Bucket: bucketName, Delimiter: prefix.Delimiter, Limit: int(page.MaxKeys), Marker: page.Marker, diff --git a/worker/s3/s3.go b/worker/s3/s3.go index 10b074306..a80d49f90 100644 --- a/worker/s3/s3.go +++ b/worker/s3/s3.go @@ -33,7 +33,7 @@ type Bus interface { AddObject(ctx context.Context, bucket, key, contractSet string, o object.Object, opts api.AddObjectOptions) (err error) CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey string, opts api.CopyObjectOptions) (om api.ObjectMetadata, err error) DeleteObject(ctx context.Context, bucket, key string, opts api.DeleteObjectOptions) (err error) - Objects(ctx context.Context, bucket, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) + ListObjects(ctx context.Context, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) AbortMultipartUpload(ctx context.Context, bucket, key string, uploadID string) (err error) CompleteMultipartUpload(ctx context.Context, bucket, key, uploadID string, parts []api.MultipartCompletedPart, opts api.CompleteMultipartOptions) (_ api.MultipartCompleteResponse, err error) From 23646fbb3e423bfe4e34ecc1a4e592ed1146a045 Mon Sep 17 00:00:00 2001 From: PJ Date: Sun, 15 Sep 2024 19:56:38 +0200 Subject: [PATCH 04/11] bus: add slab encryption key to list object options --- api/object.go | 18 ++++--- autopilot/autopilot.go | 1 + autopilot/migrator.go | 28 +++++------ bus/bus.go | 2 +- bus/routes.go | 6 ++- internal/test/e2e/cluster_test.go | 35 +++++++------- stores/metadata.go | 4 +- stores/metadata_test.go | 80 +++++++++++++++---------------- stores/sql/database.go | 2 +- stores/sql/main.go | 26 +++++++--- stores/sql/mysql/main.go | 4 +- stores/sql/sqlite/main.go | 4 +- 12 files changed, 113 insertions(+), 97 deletions(-) diff --git a/api/object.go b/api/object.go index a27d39627..07df140ec 100644 --- a/api/object.go +++ b/api/object.go @@ -207,13 +207,14 @@ type ( } ListObjectOptions struct { - Bucket string - Delimiter string - Limit int - Marker string - SortBy string - SortDir string - Substring string + Bucket string + Delimiter string + Limit int + Marker string + SortBy string + SortDir string + Substring string + SlabEncryptionKey object.EncryptionKey } SearchObjectOptions struct { @@ -333,6 +334,9 @@ func (opts ListObjectOptions) Apply(values url.Values) { if opts.Substring != "" { values.Set("substring", opts.Substring) } + if opts.SlabEncryptionKey != (object.EncryptionKey{}) { + values.Set("slabEncryptionKey", opts.SlabEncryptionKey.String()) + } } func (opts SearchObjectOptions) Apply(values url.Values) { diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 0cafa5fd0..7b9cd98c1 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -67,6 +67,7 @@ type Bus interface { ListBuckets(ctx context.Context) ([]api.Bucket, error) // objects + ListObjects(ctx context.Context, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) RefreshHealth(ctx context.Context) error Slab(ctx context.Context, key object.EncryptionKey) (object.Slab, error) SlabsForMigration(ctx context.Context, healthCutoff float64, set string, limit int) ([]api.UnhealthySlab, error) diff --git a/autopilot/migrator.go b/autopilot/migrator.go index a84669ec5..0909d0137 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -327,22 +327,18 @@ func (m *migrator) objectIDsForSlabKey(ctx context.Context, key object.Encryptio // fetch all objects per bucket idsPerBucket := make(map[string][]string) for _, bucket := range buckets { - _ = bucket - _ = key - // TODO PJ: fetch objects by slab key - // - // objects, err := m.ap.bus.ObjectsBySlabKey(ctx, bucket.Name, key) - // if err != nil { - // m.logger.Errorf("failed to fetch objects for slab key in bucket %v; %w", bucket, err) - // continue - // } else if len(objects) == 0 { - // continue - // } - - // idsPerBucket[bucket.Name] = make([]string, len(objects)) - // for i, object := range objects { - // idsPerBucket[bucket.Name][i] = object.Key - // } + res, err := m.ap.bus.ListObjects(ctx, "", api.ListObjectOptions{Bucket: bucket.Name, SlabEncryptionKey: key}) + if err != nil { + m.logger.Errorf("failed to fetch objects for slab key in bucket %v; %w", bucket, err) + continue + } else if len(res.Objects) == 0 { + continue + } + + idsPerBucket[bucket.Name] = make([]string, len(res.Objects)) + for i, object := range res.Objects { + idsPerBucket[bucket.Name][i] = object.Key + } } return idsPerBucket, nil diff --git a/bus/bus.go b/bus/bus.go index 8b80f50c3..b9b660fc4 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -235,7 +235,7 @@ type ( UpdateBucketPolicy(ctx context.Context, bucketName string, policy api.BucketPolicy) error CopyObject(ctx context.Context, srcBucket, dstBucket, srcKey, dstKey, mimeType string, metadata api.ObjectUserMetadata) (api.ObjectMetadata, error) - ListObjects(ctx context.Context, bucketName, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) + ListObjects(ctx context.Context, bucketName, prefix, substring, delim, sortBy, sortDir, marker string, limit int, slabEncryptionKey object.EncryptionKey) (api.ObjectsListResponse, error) Object(ctx context.Context, bucketName, key string) (api.Object, error) ObjectMetadata(ctx context.Context, bucketName, key string) (api.Object, error) ObjectsStats(ctx context.Context, opts api.ObjectsStatsOpts) (api.ObjectsStatsResponse, error) diff --git a/bus/routes.go b/bus/routes.go index 7c16061a5..f69ab8394 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -1171,8 +1171,12 @@ func (b *Bus) objectsHandlerGET(jc jape.Context) { if jc.DecodeForm("substring", &substring) != nil { return } + var slabEncryptionKey object.EncryptionKey + if jc.DecodeForm("slabEncryptionKey", &slabEncryptionKey) != nil { + return + } - resp, err := b.ms.ListObjects(jc.Request.Context(), bucket, jc.PathParam("prefix"), substring, delim, sortBy, sortDir, marker, limit) + resp, err := b.ms.ListObjects(jc.Request.Context(), bucket, jc.PathParam("prefix"), substring, delim, sortBy, sortDir, marker, limit, slabEncryptionKey) if errors.Is(err, api.ErrUnsupportedDelimiter) { jc.Error(err, http.StatusBadRequest) return diff --git a/internal/test/e2e/cluster_test.go b/internal/test/e2e/cluster_test.go index c7e240afc..bcea19b72 100644 --- a/internal/test/e2e/cluster_test.go +++ b/internal/test/e2e/cluster_test.go @@ -1764,24 +1764,23 @@ func TestUploadPacking(t *testing.T) { return nil }) - // TODO PJ: use Objects - // ObjectsBySlabKey should return 2 objects for the slab of file1 since file1 - // and file2 share the same slab. - // res, err := b.Object(context.Background(), api.DefaultBucketName, "file1", api.GetObjectOptions{}) - // tt.OK(err) - // objs, err := b.ObjectsBySlabKey(context.Background(), api.DefaultBucketName, res.Object.Slabs[0].EncryptionKey) - // tt.OK(err) - // if len(objs) != 2 { - // t.Fatal("expected 2 objects", len(objs)) - // } - // sort.Slice(objs, func(i, j int) bool { - // return objs[i].Key < objs[j].Key // make result deterministic - // }) - // if objs[0].Key != "/file1" { - // t.Fatal("expected file1", objs[0].Key) - // } else if objs[1].Key != "/file2" { - // t.Fatal("expected file2", objs[1].Key) - // } + // ListObjects should return 2 objects for the slab of file1 since file1 and + // file2 share the same slab. + o, err := b.Object(context.Background(), api.DefaultBucketName, "file1", api.GetObjectOptions{}) + tt.OK(err) + res, err := b.ListObjects(context.Background(), "", api.ListObjectOptions{Bucket: api.DefaultBucketName, SlabEncryptionKey: o.Object.Slabs[0].EncryptionKey}) + tt.OK(err) + if len(res.Objects) != 2 { + t.Fatal("expected 2 objects", len(res.Objects)) + } + sort.Slice(res.Objects, func(i, j int) bool { + return res.Objects[i].Key < res.Objects[j].Key // make result deterministic + }) + if res.Objects[0].Key != "/file1" { + t.Fatal("expected file1", res.Objects[0].Key) + } else if res.Objects[1].Key != "/file2" { + t.Fatal("expected file2", res.Objects[1].Key) + } } func TestWallet(t *testing.T) { diff --git a/stores/metadata.go b/stores/metadata.go index 5dbda1f80..947fab20e 100644 --- a/stores/metadata.go +++ b/stores/metadata.go @@ -782,9 +782,9 @@ func (s *SQLStore) invalidateSlabHealthByFCID(ctx context.Context, fcids []types } } -func (s *SQLStore) ListObjects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (resp api.ObjectsListResponse, err error) { +func (s *SQLStore) ListObjects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int, slabEncryptionKey object.EncryptionKey) (resp api.ObjectsListResponse, err error) { err = s.db.Transaction(ctx, func(tx sql.DatabaseTx) error { - resp, err = tx.ListObjects(ctx, bucket, prefix, substring, delim, sortBy, sortDir, marker, limit) + resp, err = tx.ListObjects(ctx, bucket, prefix, substring, delim, sortBy, sortDir, marker, limit, slabEncryptionKey) return err }) return diff --git a/stores/metadata_test.go b/stores/metadata_test.go index bf2e74a92..4c1e508e4 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -1457,7 +1457,7 @@ func TestObjectHealth(t *testing.T) { } // assert health is returned correctly by ObjectEntries - resp, err := ss.ListObjects(context.Background(), api.DefaultBucketName, "/", "", "", "", "", "", -1) + resp, err := ss.ListObjects(context.Background(), api.DefaultBucketName, "/", "", "", "", "", "", -1, object.EncryptionKey{}) entries := resp.Objects if err != nil { t.Fatal(err) @@ -1468,7 +1468,7 @@ func TestObjectHealth(t *testing.T) { } // assert health is returned correctly by SearchObject - resp, err = ss.ListObjects(context.Background(), api.DefaultBucketName, "/", "foo", "", "", "", "", -1) + resp, err = ss.ListObjects(context.Background(), api.DefaultBucketName, "/", "foo", "", "", "", "", -1, object.EncryptionKey{}) if err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { @@ -1616,7 +1616,7 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { } } for _, test := range tests { - resp, err := ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, "", -1) + resp, err := ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, "", -1, object.EncryptionKey{}) if err != nil { t.Fatal(err) } @@ -1629,7 +1629,7 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { var marker string for offset := 0; offset < len(test.want); offset++ { - resp, err := ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, marker, 1) + resp, err := ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, marker, 1, object.EncryptionKey{}) if err != nil { t.Fatal(err) } @@ -1651,7 +1651,7 @@ func TestListObjectsWithDelimiterSlash(t *testing.T) { continue } - resp, err = ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, test.want[offset].Key, 1) + resp, err = ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, test.want[offset].Key, 1, object.EncryptionKey{}) if err != nil { t.Fatal(err) } @@ -1724,7 +1724,7 @@ func TestListObjectsExplicitDir(t *testing.T) { } } for _, test := range tests { - got, err := ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, "", -1) + got, err := ss.ListObjects(ctx, api.DefaultBucketName, test.path+test.prefix, "", "/", test.sortBy, test.sortDir, "", -1, object.EncryptionKey{}) if err != nil { t.Fatal(err) } @@ -1792,7 +1792,7 @@ func TestListObjectsSubstring(t *testing.T) { {"uu", []api.ObjectMetadata{{Key: "/foo/baz/quux", Size: 3, Health: 1}, {Key: "/foo/baz/quuz", Size: 4, Health: 1}, {Key: "/gab/guub", Size: 5, Health: 1}}}, } for _, test := range tests { - resp, err := ss.ListObjects(ctx, api.DefaultBucketName, "", test.key, "", "", "", "", -1) + resp, err := ss.ListObjects(ctx, api.DefaultBucketName, "", test.key, "", "", "", "", -1, object.EncryptionKey{}) if err != nil { t.Fatal(err) } @@ -1800,7 +1800,7 @@ func TestListObjectsSubstring(t *testing.T) { assertEqual(got, test.want) var marker string for offset := 0; offset < len(test.want); offset++ { - if resp, err := ss.ListObjects(ctx, api.DefaultBucketName, "", test.key, "", "", "", marker, 1); err != nil { + if resp, err := ss.ListObjects(ctx, api.DefaultBucketName, "", test.key, "", "", "", marker, 1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if got := resp.Objects; len(got) != 1 { t.Errorf("\nkey: %v unexpected number of objects, %d != 1", test.key, len(got)) @@ -2645,7 +2645,7 @@ func TestRenameObjects(t *testing.T) { } // Assert that number of objects matches. - resp, err := ss.ListObjects(ctx, api.DefaultBucketName, "", "/", "", "", "", "", 100) + resp, err := ss.ListObjects(ctx, api.DefaultBucketName, "", "/", "", "", "", "", 100, object.EncryptionKey{}) if err != nil { t.Fatal(err) } @@ -3423,38 +3423,38 @@ func TestBucketObjects(t *testing.T) { } // List the objects in the buckets. - if resp, err := ss.ListObjects(context.Background(), b1, "/foo/", "", "", "", "", "", -1); err != nil { + if resp, err := ss.ListObjects(context.Background(), b1, "/foo/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 1 entry", len(entries)) } else if entries[0].Size != 1 { t.Fatal("unexpected size", entries[0].Size) - } else if resp, err := ss.ListObjects(context.Background(), b2, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(context.Background(), b2, "/foo/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 1 entry", len(entries)) } else if entries[0].Size != 2 { t.Fatal("unexpected size", entries[0].Size) - } else if resp, err := ss.ListObjects(context.Background(), "", "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(context.Background(), "", "/foo/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 2 { t.Fatal("expected 2 entries", len(entries)) } // Search the objects in the buckets. - if resp, err := ss.ListObjects(context.Background(), b1, "", "", "", "", "", "", -1); err != nil { + if resp, err := ss.ListObjects(context.Background(), b1, "", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if objects := resp.Objects; len(objects) != 2 { t.Fatal("expected 2 objects", len(objects)) } else if objects[0].Size != 3 || objects[1].Size != 1 { t.Fatal("unexpected size", objects[0].Size, objects[1].Size) - } else if resp, err := ss.ListObjects(context.Background(), b2, "", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(context.Background(), b2, "", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if objects := resp.Objects; len(objects) != 2 { t.Fatal("expected 2 objects", len(objects)) } else if objects[0].Size != 4 || objects[1].Size != 2 { t.Fatal("unexpected size", objects[0].Size, objects[1].Size) - } else if resp, err := ss.ListObjects(context.Background(), "", "", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(context.Background(), "", "", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if objects := resp.Objects; len(objects) != 4 { t.Fatal("expected 4 objects", len(objects)) @@ -3463,13 +3463,13 @@ func TestBucketObjects(t *testing.T) { // Rename object foo/bar in bucket 1 to foo/baz but not in bucket 2. if err := ss.RenameObjectBlocking(context.Background(), b1, "/foo/bar", "/foo/baz", false); err != nil { t.Fatal(err) - } else if resp, err := ss.ListObjects(context.Background(), b1, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(context.Background(), b1, "/foo/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 2 entries", len(entries)) } else if entries[0].Key != "/foo/baz" { t.Fatal("unexpected name", entries[0].Key) - } else if resp, err := ss.ListObjects(context.Background(), b2, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(context.Background(), b2, "/foo/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 2 entries", len(entries)) @@ -3480,13 +3480,13 @@ func TestBucketObjects(t *testing.T) { // Rename foo/bar in bucket 2 using the batch rename. if err := ss.RenameObjectsBlocking(context.Background(), b2, "/foo/bar", "/foo/bam", false); err != nil { t.Fatal(err) - } else if resp, err := ss.ListObjects(context.Background(), b1, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(context.Background(), b1, "/foo/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 2 entries", len(entries)) } else if entries[0].Key != "/foo/baz" { t.Fatal("unexpected name", entries[0].Key) - } else if resp, err := ss.ListObjects(context.Background(), b2, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(context.Background(), b2, "/foo/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 2 entries", len(entries)) @@ -3499,28 +3499,28 @@ func TestBucketObjects(t *testing.T) { t.Fatal(err) } else if err := ss.RemoveObjectBlocking(context.Background(), b1, "/foo/baz"); err != nil { t.Fatal(err) - } else if resp, err := ss.ListObjects(context.Background(), b1, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(context.Background(), b1, "/foo/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) > 0 { t.Fatal("expected 0 entries", len(entries)) - } else if resp, err := ss.ListObjects(context.Background(), b2, "/foo/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(context.Background(), b2, "/foo/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 1 entry", len(entries)) } // Delete all files in bucket 2. - if resp, err := ss.ListObjects(context.Background(), b2, "/", "", "", "", "", "", -1); err != nil { + if resp, err := ss.ListObjects(context.Background(), b2, "/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 2 { t.Fatal("expected 2 entries", len(entries)) } else if err := ss.RemoveObjectsBlocking(context.Background(), b2, "/"); err != nil { t.Fatal(err) - } else if resp, err := ss.ListObjects(context.Background(), b2, "/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(context.Background(), b2, "/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 0 { t.Fatal("expected 0 entries", len(entries)) - } else if resp, err := ss.ListObjects(context.Background(), b1, "/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(context.Background(), b1, "/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 1 entry", len(entries)) @@ -3535,20 +3535,18 @@ func TestBucketObjects(t *testing.T) { t.Fatal(err) } - // TODO PJ: use Objects - // // See if we can fetch the object by slab. - // if obj, err := ss.Object(context.Background(), b1, "/bar"); err != nil { - // t.Fatal(err) - // } else if objects, err := ss.ObjectsBySlabKey(context.Background(), b1, obj.Slabs[0].EncryptionKey); err != nil { - // t.Fatal(err) - // } else if len(objects) != 1 { - // t.Fatal("expected 1 object", len(objects)) - // } else if objects, err := ss.ObjectsBySlabKey(context.Background(), b2, obj.Slabs[0].EncryptionKey); err != nil { - // t.Fatal(err) - // } else if len(objects) != 0 { - // t.Fatal("expected 0 objects", len(objects)) - // } + if obj, err := ss.Object(context.Background(), b1, "/bar"); err != nil { + t.Fatal(err) + } else if res, err := ss.ListObjects(context.Background(), b1, "", "", "", "", "", "", -1, obj.Slabs[0].EncryptionKey); err != nil { + t.Fatal(err) + } else if len(res.Objects) != 1 { + t.Fatal("expected 1 object", len(objects)) + } else if res, err := ss.ListObjects(context.Background(), b2, "", "", "", "", "", "", -1, obj.Slabs[0].EncryptionKey); err != nil { + t.Fatal(err) + } else if len(res.Objects) != 0 { + t.Fatal("expected 0 objects", len(objects)) + } } func TestCopyObject(t *testing.T) { @@ -3573,7 +3571,7 @@ func TestCopyObject(t *testing.T) { // Copy it within the same bucket. if om, err := ss.CopyObject(ctx, "src", "src", "/foo", "/bar", "", nil); err != nil { t.Fatal(err) - } else if resp, err := ss.ListObjects(ctx, "src", "/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(ctx, "src", "/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 2 { t.Fatal("expected 2 entries", len(entries)) @@ -3586,7 +3584,7 @@ func TestCopyObject(t *testing.T) { // Copy it cross buckets. if om, err := ss.CopyObject(ctx, "src", "dst", "/foo", "/bar", "", nil); err != nil { t.Fatal(err) - } else if resp, err := ss.ListObjects(ctx, "dst", "/", "", "", "", "", "", -1); err != nil { + } else if resp, err := ss.ListObjects(ctx, "dst", "/", "", "", "", "", "", -1, object.EncryptionKey{}); err != nil { t.Fatal(err) } else if entries := resp.Objects; len(entries) != 1 { t.Fatal("expected 1 entry", len(entries)) @@ -3747,7 +3745,7 @@ func TestListObjectsNoDelimiter(t *testing.T) { } } for _, test := range tests { - res, err := ss.ListObjects(ctx, api.DefaultBucketName, test.prefix, "", "", test.sortBy, test.sortDir, "", -1) + res, err := ss.ListObjects(ctx, api.DefaultBucketName, test.prefix, "", "", test.sortBy, test.sortDir, "", -1, object.EncryptionKey{}) if err != nil { t.Fatal(err) } @@ -3762,7 +3760,7 @@ func TestListObjectsNoDelimiter(t *testing.T) { if len(res.Objects) > 0 { marker := "" for offset := 0; offset < len(test.want); offset++ { - res, err := ss.ListObjects(ctx, api.DefaultBucketName, test.prefix, "", "", test.sortBy, test.sortDir, marker, 1) + res, err := ss.ListObjects(ctx, api.DefaultBucketName, test.prefix, "", "", test.sortBy, test.sortDir, marker, 1, object.EncryptionKey{}) if err != nil { t.Fatal(err) } diff --git a/stores/sql/database.go b/stores/sql/database.go index 0e5cb21b1..bbabcdcc1 100644 --- a/stores/sql/database.go +++ b/stores/sql/database.go @@ -196,7 +196,7 @@ type ( ListBuckets(ctx context.Context) ([]api.Bucket, error) // ListObjects returns a list of objects from the given bucket. - ListObjects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (resp api.ObjectsListResponse, err error) + ListObjects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int, slabEncryptionKey object.EncryptionKey) (resp api.ObjectsListResponse, err error) // MakeDirsForPath creates all directories for a given object's path. MakeDirsForPath(ctx context.Context, path string) (int64, error) diff --git a/stores/sql/main.go b/stores/sql/main.go index c080cc592..8df3e88c9 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -1250,12 +1250,12 @@ func orderByObject(sortBy, sortDir string) (orderByExprs []string, _ error) { return orderByExprs, nil } -func ListObjects(ctx context.Context, tx Tx, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (resp api.ObjectsListResponse, err error) { +func ListObjects(ctx context.Context, tx Tx, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int, slabEncryptionKey object.EncryptionKey) (resp api.ObjectsListResponse, err error) { switch delim { case "": - resp, err = listObjectsNoDelim(ctx, tx, bucket, prefix, substring, sortBy, sortDir, marker, limit) + resp, err = listObjectsNoDelim(ctx, tx, bucket, prefix, substring, sortBy, sortDir, marker, limit, slabEncryptionKey) case "/": - resp, err = listObjectsSlashDelim(ctx, tx, bucket, prefix, sortBy, sortDir, marker, limit) + resp, err = listObjectsSlashDelim(ctx, tx, bucket, prefix, sortBy, sortDir, marker, limit, slabEncryptionKey) default: err = fmt.Errorf("unsupported delimiter: '%s'", delim) } @@ -2605,7 +2605,7 @@ func Object(ctx context.Context, tx Tx, bucket, key string) (api.Object, error) }, nil } -func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) { +func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, sortBy, sortDir, marker string, limit int, slabEncryptionKey object.EncryptionKey) (api.ObjectsListResponse, error) { // fetch one more to see if there are more entries if limit <= -1 { limit = math.MaxInt @@ -2676,6 +2676,12 @@ func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, s whereExprs = append(whereExprs, markerExprs...) whereArgs = append(whereArgs, markerArgs...) + // apply slab key + if slabEncryptionKey != (object.EncryptionKey{}) { + whereExprs = append(whereExprs, "EXISTS(SELECT 1 FROM objects o2 INNER JOIN slices sli ON sli.db_object_id = o2.id INNER JOIN slabs sla ON sla.id = sli.db_slab_id WHERE o2.id = o.id AND sla.key = ?)") + whereArgs = append(whereArgs, EncryptionKey(slabEncryptionKey)) + } + // apply limit whereArgs = append(whereArgs, limit) @@ -2729,7 +2735,7 @@ func listObjectsNoDelim(ctx context.Context, tx Tx, bucket, prefix, substring, s }, nil } -func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) { +func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, sortDir, marker string, limit int, slabEncryptionKey object.EncryptionKey) (api.ObjectsListResponse, error) { // split prefix into path and object prefix path := "/" // root of bucket if idx := strings.LastIndex(prefix, "/"); idx != -1 { @@ -2778,6 +2784,13 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s ) } + // apply slab key + var slabKeyExpr string + if slabEncryptionKey != (object.EncryptionKey{}) { + slabKeyExpr = "AND EXISTS(SELECT 1 FROM objects o2 INNER JOIN slices sli ON sli.db_object_id = o2.id INNER JOIN slabs sla ON sla.id = sli.db_slab_id WHERE o2.id = o.id AND sla.key = ?)" + args = append(args, EncryptionKey(slabEncryptionKey)) + } + args = append(args, path+"%", utf8.RuneCountInString(path), path, @@ -2867,7 +2880,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s SELECT o.db_bucket_id, o.object_id, o.size, o.health, o.mime_type, o.created_at, o.etag FROM objects o LEFT JOIN directories d ON d.name = o.object_id - WHERE o.object_id != ? AND o.db_directory_id = ? AND d.id IS NULL %s + WHERE o.object_id != ? AND o.db_directory_id = ? AND d.id IS NULL %s %s UNION ALL SELECT o.db_bucket_id, d.name as object_id, SUM(o.size), MIN(o.health), '' as mime_type, MAX(o.created_at) as created_at, '' as etag FROM objects o @@ -2882,6 +2895,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s `, tx.SelectObjectMetadataExpr(), prefixExpr, + slabKeyExpr, tx.CharLengthExpr(), prefixExpr, whereExpr, diff --git a/stores/sql/mysql/main.go b/stores/sql/mysql/main.go index fb64eaf69..a4908d201 100644 --- a/stores/sql/mysql/main.go +++ b/stores/sql/mysql/main.go @@ -483,8 +483,8 @@ func (tx *MainDatabaseTx) ListBuckets(ctx context.Context) ([]api.Bucket, error) return ssql.ListBuckets(ctx, tx) } -func (tx *MainDatabaseTx) ListObjects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) { - return ssql.ListObjects(ctx, tx, bucket, prefix, substring, delim, sortBy, sortDir, marker, limit) +func (tx *MainDatabaseTx) ListObjects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int, slabEncryptionKey object.EncryptionKey) (api.ObjectsListResponse, error) { + return ssql.ListObjects(ctx, tx, bucket, prefix, substring, delim, sortBy, sortDir, marker, limit, slabEncryptionKey) } func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int64, error) { diff --git a/stores/sql/sqlite/main.go b/stores/sql/sqlite/main.go index d80a8b3b7..f1f159fb0 100644 --- a/stores/sql/sqlite/main.go +++ b/stores/sql/sqlite/main.go @@ -469,8 +469,8 @@ func (tx *MainDatabaseTx) ListBuckets(ctx context.Context) ([]api.Bucket, error) return ssql.ListBuckets(ctx, tx) } -func (tx *MainDatabaseTx) ListObjects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int) (api.ObjectsListResponse, error) { - return ssql.ListObjects(ctx, tx, bucket, prefix, substring, delim, sortBy, sortDir, marker, limit) +func (tx *MainDatabaseTx) ListObjects(ctx context.Context, bucket, prefix, substring, delim, sortBy, sortDir, marker string, limit int, slabEncryptionKey object.EncryptionKey) (api.ObjectsListResponse, error) { + return ssql.ListObjects(ctx, tx, bucket, prefix, substring, delim, sortBy, sortDir, marker, limit, slabEncryptionKey) } func (tx *MainDatabaseTx) MakeDirsForPath(ctx context.Context, path string) (int64, error) { From ccd9250dbc2c9e5e6eb3ff4063a1928aebdbe538 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 16 Sep 2024 09:49:23 +0200 Subject: [PATCH 05/11] worker: move migration alert --- api/worker.go | 7 --- autopilot/alerts.go | 71 ++-------------------------- autopilot/migrator.go | 102 ++++++++-------------------------------- autopilot/workerpool.go | 2 +- worker/alerts.go | 46 ++++++++++++++++++ worker/alerts_test.go | 4 +- worker/client/client.go | 5 +- worker/download.go | 55 +++++++++++----------- worker/migrations.go | 18 +++---- worker/mocks_test.go | 8 ++-- worker/upload_test.go | 4 +- worker/worker.go | 27 ++++++----- 12 files changed, 133 insertions(+), 216 deletions(-) diff --git a/api/worker.go b/api/worker.go index ee72ca73e..40264f895 100644 --- a/api/worker.go +++ b/api/worker.go @@ -70,13 +70,6 @@ type ( Total uint64 `json:"total"` } - // MigrateSlabResponse is the response type for the /slab/migrate endpoint. - MigrateSlabResponse struct { - NumShardsMigrated int `json:"numShardsMigrated"` - SurchargeApplied bool `json:"surchargeApplied,omitempty"` - Error string `json:"error,omitempty"` - } - // RHPFormResponse is the response type for the /rhp/form endpoint. RHPFormResponse struct { ContractID types.FileContractID `json:"contractID"` diff --git a/autopilot/alerts.go b/autopilot/alerts.go index 47a926ad5..24b7a8daf 100644 --- a/autopilot/alerts.go +++ b/autopilot/alerts.go @@ -7,14 +7,13 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/alerts" - "go.sia.tech/renterd/object" ) var ( - alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted - alertLowBalanceID = alerts.RandomAlertID() // constant until restarted - alertMigrationID = alerts.RandomAlertID() // constant until restarted - alertPruningID = alerts.RandomAlertID() // constant until restarted + alertHealthRefreshID = alerts.RandomAlertID() // constant until restarted + alertLowBalanceID = alerts.RandomAlertID() // constant until restarted + alertOngoingMigrationsID = alerts.RandomAlertID() // constant until restarted + alertPruningID = alerts.RandomAlertID() // constant until restarted ) func (ap *Autopilot) RegisterAlert(ctx context.Context, a alerts.Alert) { @@ -74,7 +73,7 @@ func newOngoingMigrationsAlert(n int, estimate time.Duration) alerts.Alert { } return alerts.Alert{ - ID: alertMigrationID, + ID: alertOngoingMigrationsID, Severity: alerts.SeverityInfo, Message: fmt.Sprintf("Migrating %d slabs", n), Timestamp: time.Now(), @@ -82,66 +81,6 @@ func newOngoingMigrationsAlert(n int, estimate time.Duration) alerts.Alert { } } -func newCriticalMigrationSucceededAlert(slabKey object.EncryptionKey) alerts.Alert { - return alerts.Alert{ - ID: alerts.IDForSlab(alertMigrationID, slabKey), - Severity: alerts.SeverityInfo, - Message: "Critical migration succeeded", - Data: map[string]interface{}{ - "slabKey": slabKey.String(), - "hint": "This migration succeeded thanks to the MigrationSurchargeMultiplier in the gouging settings that allowed overpaying hosts on some critical sector downloads", - }, - Timestamp: time.Now(), - } -} - -func newCriticalMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert { - data := map[string]interface{}{ - "error": err.Error(), - "health": health, - "slabKey": slabKey.String(), - "hint": "If migrations of low-health slabs fail, it might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.", - } - if objectIds != nil { - data["objectIDs"] = objectIds - } - - return alerts.Alert{ - ID: alerts.IDForSlab(alertMigrationID, slabKey), - Severity: alerts.SeverityCritical, - Message: "Critical migration failed", - Data: data, - Timestamp: time.Now(), - } -} - -func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objectIds map[string][]string, err error) alerts.Alert { - data := map[string]interface{}{ - "error": err.Error(), - "health": health, - "slabKey": slabKey.String(), - "hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously.", - } - if objectIds != nil { - data["objectIDs"] = objectIds - } - - severity := alerts.SeverityError - if health < 0.25 { - severity = alerts.SeverityCritical - } else if health < 0.5 { - severity = alerts.SeverityWarning - } - - return alerts.Alert{ - ID: alerts.IDForSlab(alertMigrationID, slabKey), - Severity: severity, - Message: "Slab migration failed", - Data: data, - Timestamp: time.Now(), - } -} - func newRefreshHealthFailedAlert(err error) alerts.Alert { return alerts.Alert{ ID: alertHealthRefreshID, diff --git a/autopilot/migrator.go b/autopilot/migrator.go index 0909d0137..01e0c9f20 100644 --- a/autopilot/migrator.go +++ b/autopilot/migrator.go @@ -2,14 +2,12 @@ package autopilot import ( "context" - "errors" "fmt" "math" "sort" "sync" "time" - "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" "go.sia.tech/renterd/internal/utils" "go.sia.tech/renterd/object" @@ -49,20 +47,15 @@ type ( } ) -func (j *job) execute(ctx context.Context, w Worker) (_ api.MigrateSlabResponse, err error) { +func (j *job) execute(ctx context.Context, w Worker) (time.Duration, error) { + start := time.Now() slab, err := j.b.Slab(ctx, j.EncryptionKey) if err != nil { - return api.MigrateSlabResponse{}, fmt.Errorf("failed to fetch slab; %w", err) + return 0, fmt.Errorf("failed to fetch slab; %w", err) } - res, err := w.MigrateSlab(ctx, slab, j.set) - if err != nil { - return api.MigrateSlabResponse{}, fmt.Errorf("failed to migrate slab; %w", err) - } else if res.Error != "" { - return res, fmt.Errorf("failed to migrate slab; %w", errors.New(res.Error)) - } - - return res, nil + err = w.MigrateSlab(ctx, slab, j.set) + return time.Since(start), err } func newMigrator(ap *Autopilot, healthCutoff float64, parallelSlabsPerWorker uint64) *migrator { @@ -157,44 +150,20 @@ func (m *migrator) performMigrations(p *workerPool) { // process jobs for j := range jobs { - start := time.Now() - res, err := j.execute(ctx, w) - m.statsSlabMigrationSpeedMS.Track(float64(time.Since(start).Milliseconds())) - if err != nil { - m.logger.Errorf("%v: migration %d/%d failed, key: %v, health: %v, overpaid: %v, err: %v", id, j.slabIdx+1, j.batchSize, j.EncryptionKey, j.Health, res.SurchargeApplied, err) - if utils.IsErr(err, api.ErrConsensusNotSynced) { - // interrupt migrations if consensus is not synced - select { - case m.signalConsensusNotSynced <- struct{}{}: - default: - } - return - } else if !utils.IsErr(err, api.ErrSlabNotFound) { - // fetch all object IDs for the slab we failed to migrate - var objectIds map[string][]string - if res, err := m.objectIDsForSlabKey(ctx, j.EncryptionKey); err != nil { - m.logger.Errorf("failed to fetch object ids for slab key; %w", err) - } else { - objectIds = res - } - - // register the alert - if res.SurchargeApplied { - m.ap.RegisterAlert(ctx, newCriticalMigrationFailedAlert(j.EncryptionKey, j.Health, objectIds, err)) - } else { - m.ap.RegisterAlert(ctx, newMigrationFailedAlert(j.EncryptionKey, j.Health, objectIds, err)) - } - } - } else { - m.logger.Infof("%v: migration %d/%d succeeded, key: %v, health: %v, overpaid: %v, shards migrated: %v", id, j.slabIdx+1, j.batchSize, j.EncryptionKey, j.Health, res.SurchargeApplied, res.NumShardsMigrated) - m.ap.DismissAlert(ctx, alerts.IDForSlab(alertMigrationID, j.EncryptionKey)) - if res.SurchargeApplied { - // this alert confirms the user his gouging - // settings are working, it will be dismissed - // automatically the next time this slab is - // successfully migrated - m.ap.RegisterAlert(ctx, newCriticalMigrationSucceededAlert(j.EncryptionKey)) + duration, err := j.execute(ctx, w) + m.statsSlabMigrationSpeedMS.Track(float64(duration.Milliseconds())) + if utils.IsErr(err, api.ErrConsensusNotSynced) { + // interrupt migrations if consensus is not synced + select { + case m.signalConsensusNotSynced <- struct{}{}: + default: } + return + } else if err != nil { + m.logger.Errorw("migration failed", + zap.Float64("health", j.Health), + zap.Stringer("slab", j.EncryptionKey), + zap.String("worker", id)) } } }(w) @@ -263,8 +232,8 @@ func (m *migrator) performMigrations(p *workerPool) { }) } - // unregister the migration alert when we're done - defer m.ap.alerts.DismissAlerts(m.ap.shutdownCtx, alertMigrationID) + // unregister the ongoing migrations alert when we're done + defer m.ap.alerts.DismissAlerts(m.ap.shutdownCtx, alertOngoingMigrationsID) OUTER: for { @@ -312,34 +281,3 @@ OUTER: return } } - -func (m *migrator) objectIDsForSlabKey(ctx context.Context, key object.EncryptionKey) (map[string][]string, error) { - // fetch all buckets - // - // NOTE:at the time of writing the bus does not support fetching objects by - // slab key across all buckets at once, therefor we have to list all buckets - // and loop over them, revisit on the next major release - buckets, err := m.ap.bus.ListBuckets(ctx) - if err != nil { - return nil, fmt.Errorf("%w; failed to list buckets", err) - } - - // fetch all objects per bucket - idsPerBucket := make(map[string][]string) - for _, bucket := range buckets { - res, err := m.ap.bus.ListObjects(ctx, "", api.ListObjectOptions{Bucket: bucket.Name, SlabEncryptionKey: key}) - if err != nil { - m.logger.Errorf("failed to fetch objects for slab key in bucket %v; %w", bucket, err) - continue - } else if len(res.Objects) == 0 { - continue - } - - idsPerBucket[bucket.Name] = make([]string, len(res.Objects)) - for i, object := range res.Objects { - idsPerBucket[bucket.Name][i] = object.Key - } - } - - return idsPerBucket, nil -} diff --git a/autopilot/workerpool.go b/autopilot/workerpool.go index 871f1babc..54617aee9 100644 --- a/autopilot/workerpool.go +++ b/autopilot/workerpool.go @@ -16,7 +16,7 @@ type Worker interface { Account(ctx context.Context, hostKey types.PublicKey) (rhpv3.Account, error) Contracts(ctx context.Context, hostTimeout time.Duration) (api.ContractsResponse, error) ID(ctx context.Context) (string, error) - MigrateSlab(ctx context.Context, s object.Slab, set string) (api.MigrateSlabResponse, error) + MigrateSlab(ctx context.Context, s object.Slab, set string) error RHPPriceTable(ctx context.Context, hostKey types.PublicKey, siamuxAddr string, timeout time.Duration) (api.HostPriceTable, error) RHPScan(ctx context.Context, hostKey types.PublicKey, hostIP string, timeout time.Duration) (api.RHPScanResponse, error) diff --git a/worker/alerts.go b/worker/alerts.go index 02598c770..894012c5b 100644 --- a/worker/alerts.go +++ b/worker/alerts.go @@ -6,9 +6,15 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/alerts" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/object" "lukechampine.com/frand" ) +var ( + alertMigrationID = alerts.RandomAlertID() // constant until restarted +) + func randomAlertID() types.Hash256 { return frand.Entropy256() } @@ -30,6 +36,46 @@ func newDownloadFailedAlert(bucket, key string, offset, length, contracts int64, } } +func newMigrationFailedAlert(slabKey object.EncryptionKey, health float64, objects []api.ObjectMetadata, err error) alerts.Alert { + data := map[string]interface{}{ + "error": err.Error(), + "health": health, + "slabKey": slabKey.String(), + "hint": "Migration failures can be temporary, but if they persist it can eventually lead to data loss and should therefor be taken very seriously. It might be necessary to increase the MigrationSurchargeMultiplier in the gouging settings to ensure it has every chance of succeeding.", + } + + if len(objects) > 0 { + data["objects"] = objects + } + + hostErr := err + for errors.Unwrap(hostErr) != nil { + hostErr = errors.Unwrap(hostErr) + } + if set, ok := hostErr.(HostErrorSet); ok { + hostErrors := make(map[string]string, len(set)) + for hk, err := range set { + hostErrors[hk.String()] = err.Error() + } + data["hosts"] = hostErrors + } + + severity := alerts.SeverityError + if health < 0.25 { + severity = alerts.SeverityCritical + } else if health < 0.5 { + severity = alerts.SeverityWarning + } + + return alerts.Alert{ + ID: alerts.IDForSlab(alertMigrationID, slabKey), + Severity: severity, + Message: "Slab migration failed", + Data: data, + Timestamp: time.Now(), + } +} + func newUploadFailedAlert(bucket, path, contractSet, mimeType string, minShards, totalShards, contracts int, packing, multipart bool, err error) alerts.Alert { data := map[string]any{ "bucket": bucket, diff --git a/worker/alerts_test.go b/worker/alerts_test.go index 137838a39..4be4c4247 100644 --- a/worker/alerts_test.go +++ b/worker/alerts_test.go @@ -11,7 +11,9 @@ import ( "go.sia.tech/renterd/alerts" ) -// TestUploadFailedAlertErrorSet is a test to verify that an upload failing with a HostErrorSet error registers an alert with all the individual errors of any host in the payload. +// TestUploadFailedAlertErrorSet is a test to verify that an upload failing with +// a HostErrorSet error registers an alert with all the individual errors of any +// host in the payload. func TestUploadFailedAlertErrorSet(t *testing.T) { hostErrSet := HostErrorSet{ types.PublicKey{1, 1, 1}: errors.New("test"), diff --git a/worker/client/client.go b/worker/client/client.go index ca5aee3c8..7fe60b7b7 100644 --- a/worker/client/client.go +++ b/worker/client/client.go @@ -164,11 +164,10 @@ func (c *Client) Memory(ctx context.Context) (resp api.MemoryResponse, err error } // MigrateSlab migrates the specified slab. -func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) (res api.MigrateSlabResponse, err error) { +func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) error { values := make(url.Values) values.Set("contractset", set) - err = c.c.WithContext(ctx).POST("/slab/migrate?"+values.Encode(), slab, &res) - return + return c.c.WithContext(ctx).POST("/slab/migrate?"+values.Encode(), slab, nil) } // State returns the current state of the worker. diff --git a/worker/download.go b/worker/download.go index cedcd7a82..147ae3957 100644 --- a/worker/download.go +++ b/worker/download.go @@ -81,11 +81,10 @@ type ( } slabDownloadResponse struct { - mem Memory - surchargeApplied bool - shards [][]byte - index int - err error + mem Memory + shards [][]byte + index int + err error } sectorDownloadReq struct { @@ -264,14 +263,13 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o wg.Add(1) go func(index int) { defer wg.Done() - shards, surchargeApplied, err := mgr.downloadSlab(ctx, next.SlabSlice, false) + shards, err := mgr.downloadSlab(ctx, next.SlabSlice, false) select { case responseChan <- &slabDownloadResponse{ - mem: mem, - surchargeApplied: surchargeApplied, - shards: shards, - index: index, - err: err, + mem: mem, + shards: shards, + index: index, + err: err, }: case <-ctx.Done(): mem.Release() // relase memory if we're interrupted @@ -302,10 +300,11 @@ outer: } if resp.err != nil { - mgr.logger.Errorf("download slab %v failed, overpaid %v: %v", resp.index, resp.surchargeApplied, resp.err) + mgr.logger.Errorw("slab download failed", + zap.Int("index", resp.index), + zap.Error(err), + ) return resp.err - } else if resp.surchargeApplied { - mgr.logger.Warnf("download for slab %v had to overpay to succeed", resp.index) } responses[resp.index] = resp @@ -353,7 +352,7 @@ outer: return nil } -func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata) ([][]byte, bool, error) { +func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata) ([][]byte, error) { // refresh the downloaders mgr.refreshDownloaders(contracts) @@ -373,7 +372,7 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, // check if we have enough shards if availableShards < slab.MinShards { - return nil, false, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards) + return nil, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards) } // NOTE: we don't acquire memory here since DownloadSlab is only used for @@ -385,19 +384,19 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, Offset: 0, Length: uint32(slab.MinShards) * rhpv2.SectorSize, } - shards, surchargeApplied, err := mgr.downloadSlab(ctx, slice, true) + shards, err := mgr.downloadSlab(ctx, slice, true) if err != nil { - return nil, false, err + return nil, err } // decrypt and recover slice.Decrypt(shards) err = slice.Reconstruct(shards) if err != nil { - return nil, false, err + return nil, err } - return shards, surchargeApplied, err + return shards, err } func (mgr *downloadManager) Stats() downloadManagerStats { @@ -526,7 +525,7 @@ func (mgr *downloadManager) newSlabDownload(slice object.SlabSlice, migration bo } } -func (mgr *downloadManager) downloadSlab(ctx context.Context, slice object.SlabSlice, migration bool) ([][]byte, bool, error) { +func (mgr *downloadManager) downloadSlab(ctx context.Context, slice object.SlabSlice, migration bool) ([][]byte, error) { // prepare new download slab := mgr.newSlabDownload(slice, migration) @@ -694,7 +693,7 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses, } } -func (s *slabDownload) download(ctx context.Context) ([][]byte, bool, error) { +func (s *slabDownload) download(ctx context.Context) ([][]byte, error) { // cancel any sector downloads once the download is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -712,7 +711,7 @@ func (s *slabDownload) download(ctx context.Context) ([][]byte, bool, error) { for i := 0; i < int(s.minShards); { req := s.nextRequest(ctx, resps, false) if req == nil { - return nil, false, fmt.Errorf("no host available for shard %d", i) + return nil, fmt.Errorf("no host available for shard %d", i) } s.launch(req) i++ @@ -728,9 +727,9 @@ loop: for s.inflight() > 0 && !done { select { case <-s.mgr.shutdownCtx.Done(): - return nil, false, errors.New("download stopped") + return nil, errors.New("download stopped") case <-ctx.Done(): - return nil, false, context.Cause(ctx) + return nil, context.Cause(ctx) case <-resps.c: resetOverdrive() } @@ -808,13 +807,13 @@ func (s *slabDownload) downloadSpeed() int64 { return int64(bytes) / ms } -func (s *slabDownload) finish() ([][]byte, bool, error) { +func (s *slabDownload) finish() ([][]byte, error) { s.mu.Lock() defer s.mu.Unlock() if s.numCompleted < s.minShards { - return nil, s.numOverpaid > 0, fmt.Errorf("failed to download slab: completed=%d inflight=%d launched=%d relaunched=%d overpaid=%d downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.numRelaunched, s.numOverpaid, s.mgr.numDownloaders(), len(s.unusedHostSectors), len(s.errs), s.errs) + return nil, fmt.Errorf("failed to download slab: completed=%d inflight=%d launched=%d relaunched=%d overpaid=%d downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.numRelaunched, s.numOverpaid, s.mgr.numDownloaders(), len(s.unusedHostSectors), len(s.errs), s.errs) } - return s.sectors, s.numOverpaid > 0, nil + return s.sectors, nil } func (s *slabDownload) missing() int { diff --git a/worker/migrations.go b/worker/migrations.go index d2d1c6474..03728a3b7 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -10,7 +10,7 @@ import ( "go.sia.tech/renterd/object" ) -func (w *Worker) migrate(ctx context.Context, s object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) (int, bool, error) { +func (w *Worker) migrate(ctx context.Context, s object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) error { // make a map of good hosts goodHosts := make(map[types.PublicKey]map[types.FileContractID]bool) for _, c := range ulContracts { @@ -57,7 +57,7 @@ SHARDS: // if all shards are on good hosts, we're done if len(shardIndices) == 0 { - return 0, false, nil + return nil } // calculate the number of missing shards and take into account hosts for @@ -72,23 +72,23 @@ SHARDS: // perform some sanity checks if len(ulContracts) < int(s.MinShards) { - return 0, false, fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards)) + return fmt.Errorf("not enough hosts to repair unhealthy shard to minimum redundancy, %d<%d", len(ulContracts), int(s.MinShards)) } if len(s.Shards)-missingShards < int(s.MinShards) { - return 0, false, fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-missingShards, int(s.MinShards)) + return fmt.Errorf("not enough hosts to download unhealthy shard, %d<%d", len(s.Shards)-missingShards, int(s.MinShards)) } // acquire memory for the migration mem := w.uploadManager.mm.AcquireMemory(ctx, uint64(len(shardIndices))*rhpv2.SectorSize) if mem == nil { - return 0, false, fmt.Errorf("failed to acquire memory for migration") + return fmt.Errorf("failed to acquire memory for migration") } defer mem.Release() // download the slab - shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, s, dlContracts) + shards, err := w.downloadManager.DownloadSlab(ctx, s, dlContracts) if err != nil { - return 0, false, fmt.Errorf("failed to download slab for migration: %w", err) + return fmt.Errorf("failed to download slab for migration: %w", err) } s.Encrypt(shards) @@ -110,8 +110,8 @@ SHARDS: // migrate the shards err = w.uploadManager.UploadShards(ctx, s, shardIndices, shards, contractSet, allowed, bh, lockingPriorityUpload, mem) if err != nil { - return 0, surchargeApplied, fmt.Errorf("failed to upload slab for migration: %w", err) + return fmt.Errorf("failed to upload slab for migration: %w", err) } - return len(shards), surchargeApplied, nil + return nil } diff --git a/worker/mocks_test.go b/worker/mocks_test.go index 24a70adcb..46ae4f1b4 100644 --- a/worker/mocks_test.go +++ b/worker/mocks_test.go @@ -554,6 +554,10 @@ func (os *objectStoreMock) PackedSlabsForUpload(ctx context.Context, lockingDura return } +func (os *objectStoreMock) ListObjects(ctx context.Context, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) { + return api.ObjectsListResponse{}, nil +} + func (os *objectStoreMock) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab) error { os.mu.Lock() defer os.mu.Unlock() @@ -628,10 +632,6 @@ func (*s3Mock) CopyObject(context.Context, string, string, string, string, api.C return api.ObjectMetadata{}, nil } -func (*s3Mock) ListObjects(context.Context, string, api.ListObjectOptions) (resp api.ObjectsListResponse, err error) { - return api.ObjectsListResponse{}, nil -} - func (*s3Mock) AbortMultipartUpload(context.Context, string, string, string) (err error) { return nil } diff --git a/worker/upload_test.go b/worker/upload_test.go index d36c67a6e..4a2b73398 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -314,7 +314,7 @@ func TestMigrateLostSector(t *testing.T) { } // download the slab - shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) + shards, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) if err != nil { t.Fatal(err) } @@ -417,7 +417,7 @@ func TestUploadShards(t *testing.T) { } // download the slab - shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) + shards, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) if err != nil { t.Fatal(err) } diff --git a/worker/worker.go b/worker/worker.go index 3fffec52d..8322cb6f2 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -114,6 +114,7 @@ type ( AddPartialSlab(ctx context.Context, data []byte, minShards, totalShards uint8, contractSet string) (slabs []object.SlabSlice, slabBufferMaxSizeSoftReached bool, err error) AddUploadingSector(ctx context.Context, uID api.UploadID, id types.FileContractID, root types.Hash256) error FinishUpload(ctx context.Context, uID api.UploadID) error + ListObjects(ctx context.Context, prefix string, opts api.ListObjectOptions) (resp api.ObjectsListResponse, err error) MarkPackedSlabsUploaded(ctx context.Context, slabs []api.UploadedPackedSlab) error TrackUpload(ctx context.Context, uID api.UploadID) error UpdateSlab(ctx context.Context, s object.Slab, contractSet string) error @@ -405,21 +406,21 @@ func (w *Worker) slabMigrateHandler(jc jape.Context) { } } - // migrate the slab - numShardsMigrated, surchargeApplied, err := w.migrate(ctx, slab, up.ContractSet, dlContracts, ulContracts, up.CurrentHeight) - if err != nil { - jc.Encode(api.MigrateSlabResponse{ - NumShardsMigrated: numShardsMigrated, - SurchargeApplied: surchargeApplied, - Error: err.Error(), - }) - return + // migrate the slab and handle alerts + err = w.migrate(ctx, slab, up.ContractSet, dlContracts, ulContracts, up.CurrentHeight) + if err != nil && !utils.IsErr(err, api.ErrSlabNotFound) { + var objects []api.ObjectMetadata + if res, err := w.bus.ListObjects(ctx, "", api.ListObjectOptions{SlabEncryptionKey: slab.EncryptionKey}); err != nil { + w.logger.Errorf("failed to list objects for slab key; %w", err) + } else { + objects = res.Objects + } + w.alerts.RegisterAlert(ctx, newMigrationFailedAlert(slab.EncryptionKey, slab.Health, objects, err)) + } else if err == nil { + w.alerts.DismissAlerts(jc.Request.Context(), alerts.IDForSlab(alertMigrationID, slab.EncryptionKey)) } - jc.Encode(api.MigrateSlabResponse{ - NumShardsMigrated: numShardsMigrated, - SurchargeApplied: surchargeApplied, - }) + jc.Check("failed to migrate slab", err) } func (w *Worker) downloadsStatsHandlerGET(jc jape.Context) { From 2ad292bc8bc05a6509d4bf09ab0c1b55cd4d1e85 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 16 Sep 2024 09:56:30 +0200 Subject: [PATCH 06/11] worker: debug log migration result --- worker/download.go | 48 ++++++++++++++++++++++--------------------- worker/migrations.go | 22 +++++++++++++++++++- worker/upload_test.go | 4 ++-- 3 files changed, 48 insertions(+), 26 deletions(-) diff --git a/worker/download.go b/worker/download.go index 147ae3957..8939acfc9 100644 --- a/worker/download.go +++ b/worker/download.go @@ -81,10 +81,11 @@ type ( } slabDownloadResponse struct { - mem Memory - shards [][]byte - index int - err error + mem Memory + surchargeApplied bool + shards [][]byte + index int + err error } sectorDownloadReq struct { @@ -263,13 +264,14 @@ func (mgr *downloadManager) DownloadObject(ctx context.Context, w io.Writer, o o wg.Add(1) go func(index int) { defer wg.Done() - shards, err := mgr.downloadSlab(ctx, next.SlabSlice, false) + shards, surchargeApplied, err := mgr.downloadSlab(ctx, next.SlabSlice, false) select { case responseChan <- &slabDownloadResponse{ - mem: mem, - shards: shards, - index: index, - err: err, + mem: mem, + surchargeApplied: surchargeApplied, + shards: shards, + index: index, + err: err, }: case <-ctx.Done(): mem.Release() // relase memory if we're interrupted @@ -352,7 +354,7 @@ outer: return nil } -func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata) ([][]byte, error) { +func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, contracts []api.ContractMetadata) ([][]byte, bool, error) { // refresh the downloaders mgr.refreshDownloaders(contracts) @@ -372,7 +374,7 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, // check if we have enough shards if availableShards < slab.MinShards { - return nil, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards) + return nil, false, fmt.Errorf("not enough hosts available to download the slab: %v/%v", availableShards, slab.MinShards) } // NOTE: we don't acquire memory here since DownloadSlab is only used for @@ -384,19 +386,19 @@ func (mgr *downloadManager) DownloadSlab(ctx context.Context, slab object.Slab, Offset: 0, Length: uint32(slab.MinShards) * rhpv2.SectorSize, } - shards, err := mgr.downloadSlab(ctx, slice, true) + shards, surchargeApplied, err := mgr.downloadSlab(ctx, slice, true) if err != nil { - return nil, err + return nil, false, err } // decrypt and recover slice.Decrypt(shards) err = slice.Reconstruct(shards) if err != nil { - return nil, err + return nil, false, err } - return shards, err + return shards, surchargeApplied, err } func (mgr *downloadManager) Stats() downloadManagerStats { @@ -525,7 +527,7 @@ func (mgr *downloadManager) newSlabDownload(slice object.SlabSlice, migration bo } } -func (mgr *downloadManager) downloadSlab(ctx context.Context, slice object.SlabSlice, migration bool) ([][]byte, error) { +func (mgr *downloadManager) downloadSlab(ctx context.Context, slice object.SlabSlice, migration bool) ([][]byte, bool, error) { // prepare new download slab := mgr.newSlabDownload(slice, migration) @@ -693,7 +695,7 @@ func (s *slabDownload) nextRequest(ctx context.Context, resps *sectorResponses, } } -func (s *slabDownload) download(ctx context.Context) ([][]byte, error) { +func (s *slabDownload) download(ctx context.Context) ([][]byte, bool, error) { // cancel any sector downloads once the download is done ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -711,7 +713,7 @@ func (s *slabDownload) download(ctx context.Context) ([][]byte, error) { for i := 0; i < int(s.minShards); { req := s.nextRequest(ctx, resps, false) if req == nil { - return nil, fmt.Errorf("no host available for shard %d", i) + return nil, false, fmt.Errorf("no host available for shard %d", i) } s.launch(req) i++ @@ -727,9 +729,9 @@ loop: for s.inflight() > 0 && !done { select { case <-s.mgr.shutdownCtx.Done(): - return nil, errors.New("download stopped") + return nil, false, errors.New("download stopped") case <-ctx.Done(): - return nil, context.Cause(ctx) + return nil, false, context.Cause(ctx) case <-resps.c: resetOverdrive() } @@ -807,13 +809,13 @@ func (s *slabDownload) downloadSpeed() int64 { return int64(bytes) / ms } -func (s *slabDownload) finish() ([][]byte, error) { +func (s *slabDownload) finish() ([][]byte, bool, error) { s.mu.Lock() defer s.mu.Unlock() if s.numCompleted < s.minShards { - return nil, fmt.Errorf("failed to download slab: completed=%d inflight=%d launched=%d relaunched=%d overpaid=%d downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.numRelaunched, s.numOverpaid, s.mgr.numDownloaders(), len(s.unusedHostSectors), len(s.errs), s.errs) + return nil, s.numOverpaid > 0, fmt.Errorf("failed to download slab: completed=%d inflight=%d launched=%d relaunched=%d overpaid=%d downloaders=%d unused=%d errors=%d %v", s.numCompleted, s.numInflight, s.numLaunched, s.numRelaunched, s.numOverpaid, s.mgr.numDownloaders(), len(s.unusedHostSectors), len(s.errs), s.errs) } - return s.sectors, nil + return s.sectors, s.numOverpaid > 0, nil } func (s *slabDownload) missing() int { diff --git a/worker/migrations.go b/worker/migrations.go index 03728a3b7..13b31aa52 100644 --- a/worker/migrations.go +++ b/worker/migrations.go @@ -8,6 +8,7 @@ import ( "go.sia.tech/core/types" "go.sia.tech/renterd/api" "go.sia.tech/renterd/object" + "go.uber.org/zap" ) func (w *Worker) migrate(ctx context.Context, s object.Slab, contractSet string, dlContracts, ulContracts []api.ContractMetadata, bh uint64) error { @@ -86,8 +87,14 @@ SHARDS: defer mem.Release() // download the slab - shards, err := w.downloadManager.DownloadSlab(ctx, s, dlContracts) + shards, surchargeApplied, err := w.downloadManager.DownloadSlab(ctx, s, dlContracts) if err != nil { + w.logger.Debugw("slab migration failed", + zap.Error(err), + zap.Stringer("slab", s.EncryptionKey), + zap.Int("numShardsMigrated", len(shards)), + zap.Bool("surchargeApplied", surchargeApplied), + ) return fmt.Errorf("failed to download slab for migration: %w", err) } s.Encrypt(shards) @@ -110,8 +117,21 @@ SHARDS: // migrate the shards err = w.uploadManager.UploadShards(ctx, s, shardIndices, shards, contractSet, allowed, bh, lockingPriorityUpload, mem) if err != nil { + w.logger.Debugw("slab migration failed", + zap.Error(err), + zap.Stringer("slab", s.EncryptionKey), + zap.Int("numShardsMigrated", len(shards)), + zap.Bool("surchargeApplied", surchargeApplied), + ) return fmt.Errorf("failed to upload slab for migration: %w", err) } + // debug log migration result + w.logger.Debugw("slab migration succeeded", + zap.Stringer("slab", s.EncryptionKey), + zap.Int("numShardsMigrated", len(shards)), + zap.Bool("surchargeApplied", surchargeApplied), + ) + return nil } diff --git a/worker/upload_test.go b/worker/upload_test.go index 4a2b73398..d36c67a6e 100644 --- a/worker/upload_test.go +++ b/worker/upload_test.go @@ -314,7 +314,7 @@ func TestMigrateLostSector(t *testing.T) { } // download the slab - shards, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) + shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) if err != nil { t.Fatal(err) } @@ -417,7 +417,7 @@ func TestUploadShards(t *testing.T) { } // download the slab - shards, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) + shards, _, err := dl.DownloadSlab(context.Background(), slab.Slab, w.Contracts()) if err != nil { t.Fatal(err) } From dc8f9b9ebe09931b5b7c505b703ec37cc08090e2 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 16 Sep 2024 13:42:33 +0200 Subject: [PATCH 07/11] stores: fix only_full_group_by --- stores/sql/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stores/sql/main.go b/stores/sql/main.go index 8df3e88c9..b1877fcf4 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -2835,7 +2835,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s INNER JOIN buckets b ON o.db_bucket_id = b.id INNER JOIN directories d ON SUBSTR(o.object_id, 1, %s(d.name)) = d.name WHERE %s - GROUP BY d.id + GROUP BY d.id, o.db_bucket_id `, col, strings.Join(markerExprsObj, " AND "), groupFn, col, tx.CharLengthExpr(), strings.Join(markerExprsDir, " AND ")), append(markerArgsObj, markerArgsDir...)...).Scan(dst) if errors.Is(err, dsql.ErrNoRows) { return api.ErrMarkerNotFound @@ -2886,7 +2886,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s FROM objects o INNER JOIN directories d ON SUBSTR(o.object_id, 1, %s(d.name)) = d.name %s WHERE o.object_id LIKE ? AND SUBSTR(o.object_id, 1, ?) = ? AND d.db_parent_id = ? - GROUP BY d.id + GROUP BY d.id, o.db_bucket_id ) AS o INNER JOIN buckets b ON b.id = o.db_bucket_id %s From ec194080f9332bf4c88db5c0df4459409747dbc4 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 16 Sep 2024 13:44:56 +0200 Subject: [PATCH 08/11] stores: fix only_full_group_by --- stores/sql/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stores/sql/main.go b/stores/sql/main.go index 8df3e88c9..b1877fcf4 100644 --- a/stores/sql/main.go +++ b/stores/sql/main.go @@ -2835,7 +2835,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s INNER JOIN buckets b ON o.db_bucket_id = b.id INNER JOIN directories d ON SUBSTR(o.object_id, 1, %s(d.name)) = d.name WHERE %s - GROUP BY d.id + GROUP BY d.id, o.db_bucket_id `, col, strings.Join(markerExprsObj, " AND "), groupFn, col, tx.CharLengthExpr(), strings.Join(markerExprsDir, " AND ")), append(markerArgsObj, markerArgsDir...)...).Scan(dst) if errors.Is(err, dsql.ErrNoRows) { return api.ErrMarkerNotFound @@ -2886,7 +2886,7 @@ func listObjectsSlashDelim(ctx context.Context, tx Tx, bucket, prefix, sortBy, s FROM objects o INNER JOIN directories d ON SUBSTR(o.object_id, 1, %s(d.name)) = d.name %s WHERE o.object_id LIKE ? AND SUBSTR(o.object_id, 1, ?) = ? AND d.db_parent_id = ? - GROUP BY d.id + GROUP BY d.id, o.db_bucket_id ) AS o INNER JOIN buckets b ON b.id = o.db_bucket_id %s From 313ae88f57e79854be2ca947c22266f2f0928069 Mon Sep 17 00:00:00 2001 From: PJ Date: Mon, 16 Sep 2024 14:15:53 +0200 Subject: [PATCH 09/11] testing: fix TestMigrations --- internal/test/e2e/migrations_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/test/e2e/migrations_test.go b/internal/test/e2e/migrations_test.go index ab03a3339..4bde1ee30 100644 --- a/internal/test/e2e/migrations_test.go +++ b/internal/test/e2e/migrations_test.go @@ -3,6 +3,7 @@ package e2e import ( "bytes" "context" + "encoding/json" "errors" "fmt" "reflect" @@ -138,18 +139,17 @@ func TestMigrations(t *testing.T) { tt.OK(err) for _, alert := range ress.Alerts { // skip if not a migration alert - data, ok := alert.Data["objectIDs"].(map[string]interface{}) + _, ok := alert.Data["objects"] if !ok { continue } // collect all object ids per bucket - for bucket, ids := range data { - if objectIDs, ok := ids.([]interface{}); ok { - for _, id := range objectIDs { - got[bucket] = append(got[bucket], id.(string)) - } - } + var objects []api.ObjectMetadata + b, _ := json.Marshal(alert.Data["objects"]) + _ = json.Unmarshal(b, &objects) + for _, object := range objects { + got[object.Bucket] = append(got[object.Bucket], object.Key) } } if len(got) != 2 { From 6f099c456349d0babbf07ac48f4544aea9c6214d Mon Sep 17 00:00:00 2001 From: PJ Date: Tue, 17 Sep 2024 13:59:03 +0200 Subject: [PATCH 10/11] stores: add TestListObjectsSlabEncryptionKey --- stores/metadata_test.go | 128 ++++++++++++++++++++-------------------- 1 file changed, 65 insertions(+), 63 deletions(-) diff --git a/stores/metadata_test.go b/stores/metadata_test.go index 4c1e508e4..ed4664bfa 100644 --- a/stores/metadata_test.go +++ b/stores/metadata_test.go @@ -1737,6 +1737,68 @@ func TestListObjectsExplicitDir(t *testing.T) { } } +func TestListObjectsSlabEncryptionKey(t *testing.T) { + ss := newTestSQLStore(t, defaultTestSQLStoreConfig) + defer ss.Close() + + // create a host + hks, err := ss.addTestHosts(1) + if err != nil { + t.Fatal(err) + } + hk1 := hks[0] + + // create a contract + fcids, _, err := ss.addTestContracts(hks) + if err != nil { + t.Fatal(err) + } + fcid1 := fcids[0] + + // create a slab. + slab := object.Slab{ + Health: 1.0, + EncryptionKey: object.GenerateEncryptionKey(), + MinShards: 1, + Shards: newTestShards(hk1, fcid1, types.Hash256{1}), + } + + // add 3 objects that all reference the slab + obj := object.Object{ + Key: object.GenerateEncryptionKey(), + Slabs: []object.SlabSlice{ + { + Slab: slab, + Offset: 1, + Length: 0, // incremented later + }, + }, + } + for _, name := range []string{"obj1", "obj2", "obj3"} { + obj.Slabs[0].Length++ + if _, err := ss.addTestObject(name, obj); err != nil { + t.Fatal(err) + } + } + + // Fetch the objects by slab. + res, err := ss.ListObjects(context.Background(), "", "", "", "", "", "", "", -1, slab.EncryptionKey) + if err != nil { + t.Fatal(err) + } + for i, name := range []string{"obj1", "obj2", "obj3"} { + if res.Objects[i].Key != name { + t.Fatal("unexpected object name", res.Objects[i].Key, name) + } + if res.Objects[i].Size != int64(i)+1 { + t.Fatal("unexpected object size", res.Objects[i].Size, i+1) + } + if res.Objects[i].Health != 1.0 { + t.Fatal("unexpected object health", res.Objects[i].Health) + } + } +} + // TestListObjectsSubstring is a test for the ListObjects fuzzy // search via the "substring" argument. func TestListObjectsSubstring(t *testing.T) { @@ -3269,69 +3331,6 @@ func TestContractSizes(t *testing.T) { } } -//  : use this as a list test -// func TestObjectsBySlabKey(t *testing.T) { -// ss := newTestSQLStore(t, defaultTestSQLStoreConfig) -// defer ss.Close() - -// // create a host -// hks, err := ss.addTestHosts(1) -// if err != nil { -// t.Fatal(err) -// } -// hk1 := hks[0] - -// // create a contract -// fcids, _, err := ss.addTestContracts(hks) -// if err != nil { -// t.Fatal(err) -// } -// fcid1 := fcids[0] - -// // create a slab. -// slab := object.Slab{ -// Health: 1.0, -// EncryptionKey: object.GenerateEncryptionKey(), -// MinShards: 1, -// Shards: newTestShards(hk1, fcid1, types.Hash256{1}), -// } - -// // Add 3 objects that all reference the slab. -// obj := object.Object{ -// Key: object.GenerateEncryptionKey(), -// Slabs: []object.SlabSlice{ -// { -// Slab: slab, -// Offset: 1, -// Length: 0, // incremented later -// }, -// }, -// } -// for _, name := range []string{"obj1", "obj2", "obj3"} { -// obj.Slabs[0].Length++ -// if _, err := ss.addTestObject(name, obj); err != nil { -// t.Fatal(err) -// } -// } - -// // Fetch the objects by slab. -// objs, err := ss.ObjectsBySlabKey(context.Background(), api.DefaultBucketName, slab.EncryptionKey) -// if err != nil { -// t.Fatal(err) -// } -// for i, name := range []string{"obj1", "obj2", "obj3"} { -// if objs[i].Key != name { -// t.Fatal("unexpected object name", objs[i].Key, name) -// } -// if objs[i].Size != int64(i)+1 { -// t.Fatal("unexpected object size", objs[i].Size, i+1) -// } -// if objs[i].Health != 1.0 { -// t.Fatal("unexpected object health", objs[i].Health) -// } -// } -// } - func TestBuckets(t *testing.T) { ss := newTestSQLStore(t, defaultTestSQLStoreConfig) defer ss.Close() @@ -3547,6 +3546,9 @@ func TestBucketObjects(t *testing.T) { } else if len(res.Objects) != 0 { t.Fatal("expected 0 objects", len(objects)) } + + // Check if we can fetch both objects by not specifying the bucket + } func TestCopyObject(t *testing.T) { From 00dff0b34456207eec5a37d8aa7b22608abf7c28 Mon Sep 17 00:00:00 2001 From: Chris Schinnerl Date: Tue, 17 Sep 2024 16:47:32 +0200 Subject: [PATCH 11/11] bus: make 'bucket' optinoal in objectsHandlerGet --- bus/routes.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/bus/routes.go b/bus/routes.go index f7eb54929..3ce1750d7 100644 --- a/bus/routes.go +++ b/bus/routes.go @@ -1142,9 +1142,6 @@ func (b *Bus) objectsHandlerGET(jc jape.Context) { var bucket, marker, delim, sortBy, sortDir, substring string if jc.DecodeForm("bucket", &bucket) != nil { return - } else if bucket == "" { - jc.Error(api.ErrBucketMissing, http.StatusBadRequest) - return } if jc.DecodeForm("delimiter", &delim) != nil {