Skip to content

Commit

Permalink
replacing gcs.Object with gcs.MinObject in gcs.Listing (#2657)
Browse files Browse the repository at this point in the history
* fetcing only selected attributes while listing

* fetching minobject attributes while listing

* minor fix

* replacing gcs.Object with gcs.MinObject in gcs.Listing

* refactor
  • Loading branch information
anushka567 authored Nov 8, 2024
1 parent 37cfbe4 commit d46cd46
Show file tree
Hide file tree
Showing 16 changed files with 201 additions and 147 deletions.
16 changes: 8 additions & 8 deletions internal/fs/inode/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,16 +411,16 @@ func findDirInode(ctx context.Context, bucket *gcsx.SyncerBucket, name Name) (*C
return nil, fmt.Errorf("list objects: %w", err)
}

if len(listing.Objects) == 0 {
if len(listing.MinObjects) == 0 {
return nil, nil
}

result := &Core{
Bucket: bucket,
FullName: name,
}
if o := listing.Objects[0]; o.Name == name.GcsObjectName() {
result.MinObject = storageutil.ConvertObjToMinObject(o)
if o := listing.MinObjects[0]; o.Name == name.GcsObjectName() {
result.MinObject = o
}
return result, nil
}
Expand Down Expand Up @@ -631,7 +631,7 @@ func (d *dirInode) ReadDescendants(ctx context.Context, limit int) (map[Name]*Co
return nil, fmt.Errorf("list objects: %w", err)
}

for _, o := range listing.Objects {
for _, o := range listing.MinObjects {
if len(descendants) >= limit {
return descendants, nil
}
Expand All @@ -643,7 +643,7 @@ func (d *dirInode) ReadDescendants(ctx context.Context, limit int) (map[Name]*Co
descendants[name] = &Core{
Bucket: d.Bucket(),
FullName: name,
MinObject: storageutil.ConvertObjToMinObject(o),
MinObject: o,
}
}

Expand Down Expand Up @@ -689,7 +689,7 @@ func (d *dirInode) readObjects(
}
}()

for _, o := range listing.Objects {
for _, o := range listing.MinObjects {
// Skip empty results or the directory object backing this inode.
if o.Name == d.Name().GcsObjectName() || o.Name == "" {
continue
Expand All @@ -709,7 +709,7 @@ func (d *dirInode) readObjects(
explicitDir := &Core{
Bucket: d.Bucket(),
FullName: dirName,
MinObject: storageutil.ConvertObjToMinObject(o),
MinObject: o,
}
cores[dirName] = explicitDir
}
Expand All @@ -718,7 +718,7 @@ func (d *dirInode) readObjects(
file := &Core{
Bucket: d.Bucket(),
FullName: fileName,
MinObject: storageutil.ConvertObjToMinObject(o),
MinObject: o,
}
cores[fileName] = file
}
Expand Down
14 changes: 7 additions & 7 deletions internal/fs/inode/hns_dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,15 +508,15 @@ func (t *HNSDirTest) TestReadEntriesInHierarchicalBucket() {
implicitDir = "implicitDir" // In Hierarchical bucket implicitDir will also become folder.
)
tok := ""
obj1 := gcs.Object{Name: path.Join(dirInodeName, folder1) + "/"}
obj2 := gcs.Object{Name: path.Join(dirInodeName, folder2) + "/"}
obj3 := gcs.Object{Name: path.Join(dirInodeName, folder2, file1)}
obj4 := gcs.Object{Name: path.Join(dirInodeName, file2)}
obj5 := gcs.Object{Name: path.Join(dirInodeName, implicitDir, file3)}
objects := []*gcs.Object{&obj1, &obj2, &obj3, &obj4, &obj5}
obj1 := gcs.MinObject{Name: path.Join(dirInodeName, folder1) + "/"}
obj2 := gcs.MinObject{Name: path.Join(dirInodeName, folder2) + "/"}
obj3 := gcs.MinObject{Name: path.Join(dirInodeName, folder2, file1)}
obj4 := gcs.MinObject{Name: path.Join(dirInodeName, file2)}
obj5 := gcs.MinObject{Name: path.Join(dirInodeName, implicitDir, file3)}
minObjects := []*gcs.MinObject{&obj1, &obj2, &obj3, &obj4, &obj5}
collapsedRuns := []string{path.Join(dirInodeName, folder1) + "/", path.Join(dirInodeName, folder2) + "/", path.Join(dirInodeName, implicitDir) + "/"}
listing := gcs.Listing{
Objects: objects,
MinObjects: minObjects,
CollapsedRuns: collapsedRuns,
}
listObjectReq := gcs.ListObjectsRequest{
Expand Down
8 changes: 4 additions & 4 deletions internal/gcsx/garbage_collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ func garbageCollectOnce(
group, ctx := errgroup.WithContext(ctx)

// List all objects with the temporary prefix.
objects := make(chan *gcs.Object, 100)
minObjects := make(chan *gcs.MinObject, 100)
group.Go(func() (err error) {
defer close(objects)
err = storageutil.ListPrefix(ctx, bucket, tmpObjectPrefix, objects)
defer close(minObjects)
err = storageutil.ListPrefix(ctx, bucket, tmpObjectPrefix, minObjects)
if err != nil {
err = fmt.Errorf("ListPrefix: %w", err)
return
Expand All @@ -52,7 +52,7 @@ func garbageCollectOnce(
staleNames := make(chan string, 100)
group.Go(func() (err error) {
defer close(staleNames)
for o := range objects {
for o := range minObjects {
if now.Sub(o.Updated) < stalenessThreshold {
continue
}
Expand Down
2 changes: 1 addition & 1 deletion internal/gcsx/prefix_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (b *prefixBucket) ListObjects(

// Modify the returned listing.
if l != nil {
for _, o := range l.Objects {
for _, o := range l.MinObjects {
o.Name = b.localName(o.Name)
}

Expand Down
22 changes: 11 additions & 11 deletions internal/gcsx/prefix_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ func (t *PrefixBucketTest) ListObjects_NoOptions() {
AssertEq("", l.ContinuationToken)
AssertThat(l.CollapsedRuns, ElementsAre())

AssertEq(3, len(l.Objects))
ExpectEq("burrito", l.Objects[0].Name)
ExpectEq("enchilada", l.Objects[1].Name)
ExpectEq("taco", l.Objects[2].Name)
AssertEq(3, len(l.MinObjects))
ExpectEq("burrito", l.MinObjects[0].Name)
ExpectEq("enchilada", l.MinObjects[1].Name)
ExpectEq("taco", l.MinObjects[2].Name)
}

func (t *PrefixBucketTest) ListObjects_Prefix() {
Expand Down Expand Up @@ -297,9 +297,9 @@ func (t *PrefixBucketTest) ListObjects_Prefix() {
AssertEq("", l.ContinuationToken)
AssertThat(l.CollapsedRuns, ElementsAre())

AssertEq(2, len(l.Objects))
ExpectEq("burrito0", l.Objects[0].Name)
ExpectEq("burrito1", l.Objects[1].Name)
AssertEq(2, len(l.MinObjects))
ExpectEq("burrito0", l.MinObjects[0].Name)
ExpectEq("burrito1", l.MinObjects[1].Name)
}

func (t *PrefixBucketTest) ListObjects_Delimeter() {
Expand Down Expand Up @@ -333,8 +333,8 @@ func (t *PrefixBucketTest) ListObjects_Delimeter() {

ExpectThat(l.CollapsedRuns, ElementsAre("burrito_", "enchilada_"))

AssertEq(1, len(l.Objects))
ExpectEq("burrito", l.Objects[0].Name)
AssertEq(1, len(l.MinObjects))
ExpectEq("burrito", l.MinObjects[0].Name)
}

func (t *PrefixBucketTest) ListObjects_PrefixAndDelimeter() {
Expand Down Expand Up @@ -369,8 +369,8 @@ func (t *PrefixBucketTest) ListObjects_PrefixAndDelimeter() {

ExpectThat(l.CollapsedRuns, ElementsAre("burrito_"))

AssertEq(1, len(l.Objects))
ExpectEq("burrito", l.Objects[0].Name)
AssertEq(1, len(l.MinObjects))
ExpectEq("burrito", l.MinObjects[0].Name)
}

func (t *PrefixBucketTest) UpdateObject() {
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/bucket_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ func (bh *bucketHandle) ListObjects(ctx context.Context, req *gcs.ListObjectsReq
list.CollapsedRuns = append(list.CollapsedRuns, attrs.Prefix)
} else {
// Converting attrs to *Object type.
currObject := storageutil.ObjectAttrsToBucketObject(attrs)
list.Objects = append(list.Objects, currObject)
currMinObject := storageutil.ObjectAttrsToMinObject(attrs)
list.MinObjects = append(list.MinObjects, currMinObject)
}

// itr.next returns all the objects present in the bucket. Hence adding a
Expand Down
34 changes: 22 additions & 12 deletions internal/storage/bucket_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ func objectsToObjectNames(objects []*gcs.Object) (objectNames []string) {
return
}

func minObjectsToMinObjectNames(minObjects []*gcs.MinObject) (objectNames []string) {
objectNames = make([]string, len(minObjects))
for i, object := range minObjects {
if object != nil {
objectNames[i] = object.Name
}
}
return
}

type BucketHandleTest struct {
suite.Suite
bucketHandle *bucketHandle
Expand Down Expand Up @@ -625,8 +635,8 @@ func (testSuite *BucketHandleTest) TestListObjectMethodWithPrefixObjectExist() {
})

assert.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectSubRootFolderName, TestObjectName, TestGzipObjectName}, objectsToObjectNames(obj.Objects))
assert.Equal(testSuite.T(), TestObjectGeneration, obj.Objects[0].Generation)
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectSubRootFolderName, TestObjectName, TestGzipObjectName}, minObjectsToMinObjectNames(obj.MinObjects))
assert.Equal(testSuite.T(), TestObjectGeneration, obj.MinObjects[0].Generation)
assert.Equal(testSuite.T(), []string{TestObjectSubRootFolderName}, obj.CollapsedRuns)
}

Expand All @@ -642,7 +652,7 @@ func (testSuite *BucketHandleTest) TestListObjectMethodWithPrefixObjectDoesNotEx
})

assert.Nil(testSuite.T(), err)
assert.Nil(testSuite.T(), obj.Objects)
assert.Nil(testSuite.T(), obj.MinObjects)
assert.Nil(testSuite.T(), obj.CollapsedRuns)
}

Expand All @@ -658,7 +668,7 @@ func (testSuite *BucketHandleTest) TestListObjectMethodWithIncludeTrailingDelimi
})

assert.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectName, TestGzipObjectName}, objectsToObjectNames(obj.Objects))
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectName, TestGzipObjectName}, minObjectsToMinObjectNames(obj.MinObjects))
assert.Equal(testSuite.T(), []string{TestObjectSubRootFolderName}, obj.CollapsedRuns)
}

Expand All @@ -675,8 +685,8 @@ func (testSuite *BucketHandleTest) TestListObjectMethodWithEmptyDelimiter() {
})

assert.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectSubRootFolderName, TestSubObjectName, TestObjectName, TestGzipObjectName}, objectsToObjectNames(obj.Objects))
assert.Equal(testSuite.T(), TestObjectGeneration, obj.Objects[0].Generation)
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectSubRootFolderName, TestSubObjectName, TestObjectName, TestGzipObjectName}, minObjectsToMinObjectNames(obj.MinObjects))
assert.Equal(testSuite.T(), TestObjectGeneration, obj.MinObjects[0].Generation)
assert.Nil(testSuite.T(), obj.CollapsedRuns)
}

Expand Down Expand Up @@ -704,7 +714,7 @@ func (testSuite *BucketHandleTest) TestListObjectMethodForMaxResult() {

// Validate that 5 objects are listed when MaxResults is passed 5.
assert.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectSubRootFolderName, TestSubObjectName, TestObjectName, TestGzipObjectName}, objectsToObjectNames(fiveObj.Objects))
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectSubRootFolderName, TestSubObjectName, TestObjectName, TestGzipObjectName}, minObjectsToMinObjectNames(fiveObj.MinObjects))
assert.Nil(testSuite.T(), fiveObj.CollapsedRuns)

// Note: The behavior is different in real GCS storage JSON API. In real API,
Expand All @@ -714,7 +724,7 @@ func (testSuite *BucketHandleTest) TestListObjectMethodForMaxResult() {
// This is because fake storage doesn'testSuite support pagination and hence maxResults
// has no affect.
assert.Nil(testSuite.T(), err2)
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectName, TestGzipObjectName}, objectsToObjectNames(threeObj.Objects))
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectName, TestGzipObjectName}, minObjectsToMinObjectNames(threeObj.MinObjects))
assert.Equal(testSuite.T(), 1, len(threeObj.CollapsedRuns))
}

Expand All @@ -730,7 +740,7 @@ func (testSuite *BucketHandleTest) TestListObjectMethodWithMissingMaxResult() {
ProjectionVal: 0,
})
assert.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), 5, len(fiveObjWithMaxResults.Objects))
assert.Equal(testSuite.T(), 5, len(fiveObjWithMaxResults.MinObjects))

fiveObjWithoutMaxResults, err2 := testSuite.bucketHandle.ListObjects(context.Background(),
&gcs.ListObjectsRequest{
Expand All @@ -743,7 +753,7 @@ func (testSuite *BucketHandleTest) TestListObjectMethodWithMissingMaxResult() {

// Validate that all objects (5) are listed when MaxResults is not passed.
assert.Nil(testSuite.T(), err2)
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectSubRootFolderName, TestSubObjectName, TestObjectName, TestGzipObjectName}, objectsToObjectNames(fiveObjWithoutMaxResults.Objects))
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectSubRootFolderName, TestSubObjectName, TestObjectName, TestGzipObjectName}, minObjectsToMinObjectNames(fiveObjWithoutMaxResults.MinObjects))
assert.Nil(testSuite.T(), fiveObjWithoutMaxResults.CollapsedRuns)
}

Expand All @@ -759,7 +769,7 @@ func (testSuite *BucketHandleTest) TestListObjectMethodWithZeroMaxResult() {
ProjectionVal: 0,
})
assert.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), 5, len(fiveObj.Objects))
assert.Equal(testSuite.T(), 5, len(fiveObj.MinObjects))

fiveObjWithZeroMaxResults, err2 := testSuite.bucketHandle.ListObjects(context.Background(),
&gcs.ListObjectsRequest{
Expand All @@ -774,7 +784,7 @@ func (testSuite *BucketHandleTest) TestListObjectMethodWithZeroMaxResult() {
// Validate that all objects (5) are listed when MaxResults is 0. This has
// same behavior as not passing MaxResults in request.
assert.Nil(testSuite.T(), err2)
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectSubRootFolderName, TestSubObjectName, TestObjectName, TestGzipObjectName}, objectsToObjectNames(fiveObjWithZeroMaxResults.Objects))
assert.Equal(testSuite.T(), []string{TestObjectRootFolderName, TestObjectSubRootFolderName, TestSubObjectName, TestObjectName, TestGzipObjectName}, minObjectsToMinObjectNames(fiveObjWithZeroMaxResults.MinObjects))
assert.Nil(testSuite.T(), fiveObjWithZeroMaxResults.CollapsedRuns)
}

Expand Down
18 changes: 14 additions & 4 deletions internal/storage/caching/fast_stat_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ func (b *fastStatBucket) insertMultiple(objs []*gcs.Object) {
}
}

// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) insertMultipleMinObjects(minObjs []*gcs.MinObject) {
b.mu.Lock()
defer b.mu.Unlock()

expiration := b.clock.Now().Add(b.ttl)
for _, o := range minObjs {
b.cache.Insert(o, expiration)
}
}

// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) eraseEntriesWithGivenPrefix(folderName string) {
b.mu.Lock()
Expand All @@ -102,10 +113,9 @@ func (b *fastStatBucket) insertHierarchicalListing(listing *gcs.Listing) {

expiration := b.clock.Now().Add(b.ttl)

for _, o := range listing.Objects {
for _, o := range listing.MinObjects {
if !strings.HasSuffix(o.Name, "/") {
m := storageutil.ConvertObjToMinObject(o)
b.cache.Insert(m, expiration)
b.cache.Insert(o, expiration)
}
}

Expand Down Expand Up @@ -326,7 +336,7 @@ func (b *fastStatBucket) ListObjects(
}

// note anything we found.
b.insertMultiple(listing.Objects)
b.insertMultipleMinObjects(listing.MinObjects)
return
}

Expand Down
12 changes: 6 additions & 6 deletions internal/storage/caching/fast_stat_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -727,11 +727,11 @@ func (t *ListObjectsTest) EmptyListingForHNS() {

func (t *ListObjectsTest) NonEmptyListing() {
// Wrapped
o0 := &gcs.Object{Name: "taco"}
o1 := &gcs.Object{Name: "burrito"}
o0 := &gcs.MinObject{Name: "taco"}
o1 := &gcs.MinObject{Name: "burrito"}

expected := &gcs.Listing{
Objects: []*gcs.Object{o0, o1},
MinObjects: []*gcs.MinObject{o0, o1},
}

ExpectCall(t.wrapped, "BucketType")().
Expand All @@ -752,11 +752,11 @@ func (t *ListObjectsTest) NonEmptyListing() {

func (t *ListObjectsTest) NonEmptyListingForHNS() {
// wrapped
o0 := &gcs.Object{Name: "taco"}
o1 := &gcs.Object{Name: "burrito"}
o0 := &gcs.MinObject{Name: "taco"}
o1 := &gcs.MinObject{Name: "burrito"}

expected := &gcs.Listing{
Objects: []*gcs.Object{o0, o1},
MinObjects: []*gcs.MinObject{o0, o1},
CollapsedRuns: []string{"p0", "p1/"},
}

Expand Down
15 changes: 14 additions & 1 deletion internal/storage/fake/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,19 @@ func copyObject(o *gcs.Object) *gcs.Object {
return &copy
}

func copyMinObject(o *gcs.Object) *gcs.MinObject {
var copy gcs.MinObject
copy.Name = o.Name
copy.Size = o.Size
copy.Generation = o.Generation
copy.MetaGeneration = o.MetaGeneration
copy.Updated = o.Updated
copy.Metadata = copyMetadata(o.Metadata)
copy.ContentEncoding = o.ContentEncoding
copy.CRC32C = o.CRC32C
return &copy
}

////////////////////////////////////////////////////////////////////////
// Public interface
////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -614,7 +627,7 @@ func (b *bucket) ListObjects(

// Otherwise, return as an object result. Make a copy to avoid handing back
// internal state.
listing.Objects = append(listing.Objects, copyObject(&o.metadata))
listing.MinObjects = append(listing.MinObjects, copyMinObject(&o.metadata))
}

// Set up a cursor for where to start the next scan if we didn't exhaust the
Expand Down
Loading

0 comments on commit d46cd46

Please sign in to comment.