Skip to content

Commit

Permalink
use min object in buffered writes (#2795)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashmeenkaur authored Dec 16, 2024
1 parent de832bf commit 78b69e8
Show file tree
Hide file tree
Showing 16 changed files with 33 additions and 28 deletions.
2 changes: 1 addition & 1 deletion internal/bufferedwrites/buffered_write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (wh *BufferedWriteHandler) Sync() (err error) {
}

// Flush finalizes the upload.
func (wh *BufferedWriteHandler) Flush() (*gcs.Object, error) {
func (wh *BufferedWriteHandler) Flush() (*gcs.MinObject, error) {
if wh.current != nil {
err := wh.uploadHandler.Upload(wh.current)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/bufferedwrites/upload_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (uh *UploadHandler) uploader() {
}

// Finalize finalizes the upload.
func (uh *UploadHandler) Finalize() (*gcs.Object, error) {
func (uh *UploadHandler) Finalize() (*gcs.MinObject, error) {
uh.wg.Wait()
close(uh.uploadCh)

Expand Down
8 changes: 4 additions & 4 deletions internal/bufferedwrites/upload_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (t *UploadHandlerTest) TestMultipleBlockUpload() {
}
// CreateObjectChunkWriter -- should be called once.
writer := &storagemock.Writer{}
mockObj := &gcs.Object{}
mockObj := &gcs.MinObject{}
t.mockBucket.On("CreateObjectChunkWriter", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(writer, nil)
t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, nil)

Expand Down Expand Up @@ -105,7 +105,7 @@ func (t *UploadHandlerTest) TestUploadWhenCreateObjectWriterFails() {

func (t *UploadHandlerTest) TestFinalizeWithWriterAlreadyPresent() {
writer := &storagemock.Writer{}
mockObj := &gcs.Object{}
mockObj := &gcs.MinObject{}
t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, nil)
t.uh.writer = writer

Expand All @@ -120,7 +120,7 @@ func (t *UploadHandlerTest) TestFinalizeWithNoWriter() {
writer := &storagemock.Writer{}
t.mockBucket.On("CreateObjectChunkWriter", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(writer, nil)
assert.Nil(t.T(), t.uh.writer)
mockObj := &gcs.Object{}
mockObj := &gcs.MinObject{}
t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, nil)

obj, err := t.uh.Finalize()
Expand All @@ -146,7 +146,7 @@ func (t *UploadHandlerTest) TestFinalizeWhenFinalizeUploadFails() {
writer := &storagemock.Writer{}
t.mockBucket.On("CreateObjectChunkWriter", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(writer, nil)
assert.Nil(t.T(), t.uh.writer)
mockObj := &gcs.Object{}
mockObj := &gcs.MinObject{}
t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, fmt.Errorf("taco"))

obj, err := t.uh.Finalize()
Expand Down
2 changes: 1 addition & 1 deletion internal/gcsx/prefix_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (b *prefixBucket) CreateObjectChunkWriter(ctx context.Context, req *gcs.Cre
return wc, err
}

func (b *prefixBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (o *gcs.Object, err error) {
func (b *prefixBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (o *gcs.MinObject, err error) {
o, err = b.wrapped.FinalizeUpload(ctx, w)
// Modify the returned object.
if o != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/gcsx/prefix_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (t *PrefixBucketTest) CreateObjectChunkWriterAndFinalizeUpload() {
t.ctx,
&gcs.CreateObjectRequest{
Name: suffix,
ContentLanguage: "en-GB",
ContentEncoding: "gzip",
Contents: nil,
},
1024, nil)
Expand All @@ -139,7 +139,7 @@ func (t *PrefixBucketTest) CreateObjectChunkWriterAndFinalizeUpload() {

AssertEq(nil, err)
ExpectEq(suffix, o.Name)
ExpectEq("en-GB", o.ContentLanguage)
ExpectEq("gzip", o.ContentEncoding)
// Read it through the back door.
actual, err := storageutil.ReadObject(t.ctx, t.wrapped, t.prefix+suffix)
AssertEq(nil, err)
Expand Down
2 changes: 1 addition & 1 deletion internal/monitor/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (mb *monitoringBucket) CreateObjectChunkWriter(ctx context.Context, req *gc
return wc, err
}

func (mb *monitoringBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (*gcs.Object, error) {
func (mb *monitoringBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (*gcs.MinObject, error) {
startTime := time.Now()
o, err := mb.wrapped.FinalizeUpload(ctx, w)
recordRequest(ctx, mb.metricHandle, "FinalizeUpload", startTime)
Expand Down
2 changes: 1 addition & 1 deletion internal/ratelimit/throttled_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (b *throttledBucket) CreateObjectChunkWriter(ctx context.Context, req *gcs.
return
}

func (b *throttledBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (*gcs.Object, error) {
func (b *throttledBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (*gcs.MinObject, error) {
// FinalizeUpload is not throttled to prevent permanent data loss in case the
// limiter's burst size is exceeded.
// Note: CreateObjectChunkWriter, a prerequisite for FinalizeUpload,
Expand Down
6 changes: 3 additions & 3 deletions internal/storage/bucket_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (bh *bucketHandle) CreateObjectChunkWriter(ctx context.Context, req *gcs.Cr
return wc, nil
}

func (bh *bucketHandle) FinalizeUpload(ctx context.Context, w gcs.Writer) (o *gcs.Object, err error) {
func (bh *bucketHandle) FinalizeUpload(ctx context.Context, w gcs.Writer) (o *gcs.MinObject, err error) {
if err = w.Close(); err != nil {
var gErr *googleapi.Error
if errors.As(err, &gErr) {
Expand All @@ -282,8 +282,8 @@ func (bh *bucketHandle) FinalizeUpload(ctx context.Context, w gcs.Writer) (o *gc
}

attrs := w.Attrs() // Retrieving the attributes of the created object.
// Converting attrs to type *Object.
o = storageutil.ObjectAttrsToBucketObject(attrs)
// Converting attrs to type *MinObject.
o = storageutil.ObjectAttrsToMinObject(attrs)
return
}

Expand Down
8 changes: 6 additions & 2 deletions internal/storage/caching/fast_stat_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func (b *fastStatBucket) insert(o *gcs.Object) {
b.insertMultiple([]*gcs.Object{o})
}

func (b *fastStatBucket) insertMinObject(o *gcs.MinObject) {
b.insertMultipleMinObjects([]*gcs.MinObject{o})
}

// LOCKS_EXCLUDED(b.mu)
func (b *fastStatBucket) insertFolder(f *gcs.Folder) {
b.mu.Lock()
Expand Down Expand Up @@ -231,7 +235,7 @@ func (b *fastStatBucket) CreateObjectChunkWriter(ctx context.Context, req *gcs.C
return b.wrapped.CreateObjectChunkWriter(ctx, req, chunkSize, callBack)
}

func (b *fastStatBucket) FinalizeUpload(ctx context.Context, writer gcs.Writer) (*gcs.Object, error) {
func (b *fastStatBucket) FinalizeUpload(ctx context.Context, writer gcs.Writer) (*gcs.MinObject, error) {
name := writer.ObjectName()
// Throw away any existing record for this object.
b.invalidate(name)
Expand All @@ -240,7 +244,7 @@ func (b *fastStatBucket) FinalizeUpload(ctx context.Context, writer gcs.Writer)

// Record the new object if err is nil.
if err == nil {
b.insert(o)
b.insertMinObject(o)
}

return o, err
Expand Down
6 changes: 3 additions & 3 deletions internal/storage/caching/fast_stat_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (t *FinalizeUploadTest) CallsEraseAndWrappedWithExpectedParameter() {
// Wrapped
var wrappedWriter gcs.Writer
ExpectCall(t.wrapped, "FinalizeUpload")(Any(), Any()).
WillOnce(DoAll(SaveArg(1, &wrappedWriter), Return(&gcs.Object{}, errors.New(""))))
WillOnce(DoAll(SaveArg(1, &wrappedWriter), Return(&gcs.MinObject{}, errors.New(""))))

// Call
_, _ = t.bucket.FinalizeUpload(context.TODO(), writer)
Expand All @@ -246,7 +246,7 @@ func (t *FinalizeUploadTest) WrappedFails() {
ExpectCall(t.cache, "Erase")(Any())
// Wrapped
ExpectCall(t.wrapped, "FinalizeUpload")(Any(), Any()).
WillOnce(Return(&gcs.Object{}, errors.New("taco")))
WillOnce(Return(&gcs.MinObject{}, errors.New("taco")))

// Call
o, err := t.bucket.FinalizeUpload(context.TODO(), writer)
Expand All @@ -265,7 +265,7 @@ func (t *FinalizeUploadTest) WrappedSucceeds() {
ExpectCall(t.cache, "Erase")(Any())
// Wrapped
ExpectCall(t.wrapped, "FinalizeUpload")(Any(), Any()).
WillOnce(Return(&gcs.Object{}, nil))
WillOnce(Return(&gcs.MinObject{}, nil))
// Insert
ExpectCall(t.cache, "Insert")(Any(), timeutil.TimeEq(t.clock.Now().Add(ttl)))

Expand Down
2 changes: 1 addition & 1 deletion internal/storage/debug_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (b *debugBucket) CreateObjectChunkWriter(ctx context.Context, req *gcs.Crea
return
}

func (b *debugBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (o *gcs.Object, err error) {
func (b *debugBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (o *gcs.MinObject, err error) {
id, desc, start := b.startRequest("FinalizeUpload(%q)", w.ObjectName())
defer b.finishRequest(id, desc, start, &err)

Expand Down
2 changes: 1 addition & 1 deletion internal/storage/fake/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ func (b *bucket) CreateObjectChunkWriter(ctx context.Context, req *gcs.CreateObj
return NewFakeObjectWriter(b, req)
}

func (b *bucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (*gcs.Object, error) {
func (b *bucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (*gcs.MinObject, error) {
b.mu.Lock()
defer b.mu.Unlock()

Expand Down
5 changes: 3 additions & 2 deletions internal/storage/fake/fake_object_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"cloud.google.com/go/storage"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
)

// FakeObjectWriter is a mock implementation of storage.Writer used by FakeBucket.
Expand All @@ -34,7 +35,7 @@ type FakeObjectWriter struct {
storage.ObjectAttrs
bkt *bucket
req *gcs.CreateObjectRequest
Object *gcs.Object // Object created by writer
Object *gcs.MinObject // Object created by writer
}

func (w *FakeObjectWriter) Write(p []byte) (n int, err error) {
Expand All @@ -51,7 +52,7 @@ func (w *FakeObjectWriter) Close() error {

o, err := createOrUpdateFakeObject(w.bkt, w.req, contents)
if err == nil {
w.Object = o
w.Object = storageutil.ConvertObjToMinObject(o)
}

return err
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/gcs/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ type Bucket interface {

// FinalizeUpload closes the storage.Writer which completes the write
// operation and creates an object on GCS.
FinalizeUpload(ctx context.Context, writer Writer) (*Object, error)
FinalizeUpload(ctx context.Context, writer Writer) (*MinObject, error)

// Copy an object to a new name, preserving all metadata. Any existing
// generation of the destination name will be overwritten.
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/mock/testify_mock_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ func (m *TestifyMockBucket) CreateObjectChunkWriter(ctx context.Context, req *gc
return args.Get(0).(gcs.Writer), nil
}

func (m *TestifyMockBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (*gcs.Object, error) {
func (m *TestifyMockBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (*gcs.MinObject, error) {
args := m.Called(ctx, w)
return args.Get(0).(*gcs.Object), args.Error(1)
return args.Get(0).(*gcs.MinObject), args.Error(1)
}

func (m *TestifyMockBucket) CopyObject(ctx context.Context, req *gcs.CopyObjectRequest) (*gcs.Object, error) {
Expand Down
4 changes: 2 additions & 2 deletions internal/storage/mock_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (m *mockBucket) CreateObjectChunkWriter(p0 context.Context, p1 *gcs.CreateO
return
}

func (m *mockBucket) FinalizeUpload(p0 context.Context, p1 gcs.Writer) (o0 *gcs.Object, o1 error) {
func (m *mockBucket) FinalizeUpload(p0 context.Context, p1 gcs.Writer) (o0 *gcs.MinObject, o1 error) {
// Get a file name and line number for the caller.
_, file, line, _ := runtime.Caller(1)

Expand All @@ -179,7 +179,7 @@ func (m *mockBucket) FinalizeUpload(p0 context.Context, p1 gcs.Writer) (o0 *gcs.

// o0 *gcs.Object
if retVals[0] != nil {
o0 = retVals[0].(*gcs.Object)
o0 = retVals[0].(*gcs.MinObject)
}
// o1 error
if retVals[1] != nil {
Expand Down

0 comments on commit 78b69e8

Please sign in to comment.