Skip to content

Commit

Permalink
Update Flush call to return an object (#2770)
Browse files Browse the repository at this point in the history
* update Flush call to return an object

* review comments
  • Loading branch information
ashmeenkaur authored Dec 11, 2024
1 parent f75b454 commit 1b8139e
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 39 deletions.
17 changes: 13 additions & 4 deletions internal/bufferedwrites/buffered_write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,23 +124,32 @@ func (wh *BufferedWriteHandler) Sync() (err error) {
}

// Flush finalizes the upload.
func (wh *BufferedWriteHandler) Flush() (err error) {
func (wh *BufferedWriteHandler) Flush() (*gcs.Object, error) {
// Fail early if the uploadHandler has failed.
select {
case <-wh.uploadHandler.SignalUploadFailure():
return ErrUploadFailure
return nil, ErrUploadFailure
default:
break
}

if wh.current != nil {
err := wh.uploadHandler.Upload(wh.current)
if err != nil {
return err
return nil, err
}
wh.current = nil
}
return wh.uploadHandler.Finalize()
obj, err := wh.uploadHandler.Finalize()
if err != nil {
return nil, fmt.Errorf("BufferedWriteHandler.Flush(): %w", err)
}
err = wh.blockPool.ClearFreeBlockChannel()
if err != nil {
// Only logging an error in case of resource leak as upload succeeded.
logger.Errorf("blockPool.ClearFreeBlockChannel() failed: %v", err)
}
return obj, nil
}

// SetMtime stores the mtime with the bufferedWriteHandler.
Expand Down
21 changes: 12 additions & 9 deletions internal/bufferedwrites/buffered_write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,26 +152,28 @@ func (testSuite *BufferedWriteTest) TestWrite_SignalUploadFailureInBetween() {

func (testSuite *BufferedWriteTest) TestFlushWithNonNilCurrentBlock() {
err := testSuite.bwh.Write([]byte("hi"), 0)
currentBlock := testSuite.bwh.current
require.Nil(testSuite.T(), err)

err = testSuite.bwh.Flush()
obj, err := testSuite.bwh.Flush()

require.NoError(testSuite.T(), err)
assert.Equal(testSuite.T(), nil, testSuite.bwh.current)
// The current block should be available on the free channel as flush triggers
// an upload before finalize.
freeCh := testSuite.bwh.blockPool.FreeBlocksChannel()
got := <-freeCh
assert.Equal(testSuite.T(), &currentBlock, &got)
// Validate object.
assert.NotNil(testSuite.T(), obj)
assert.Equal(testSuite.T(), uint64(2), obj.Size)
// Validate that all blocks have been freed up.
assert.Equal(testSuite.T(), 0, len(testSuite.bwh.blockPool.FreeBlocksChannel()))
}

func (testSuite *BufferedWriteTest) TestFlushWithNilCurrentBlock() {
require.Nil(testSuite.T(), testSuite.bwh.current)

err := testSuite.bwh.Flush()
obj, err := testSuite.bwh.Flush()

assert.NoError(testSuite.T(), err)
// Validate empty object created.
assert.NotNil(testSuite.T(), obj)
assert.Equal(testSuite.T(), uint64(0), obj.Size)
}

func (testSuite *BufferedWriteTest) TestFlush_SignalUploadFailureDuringWrite() {
Expand All @@ -181,7 +183,8 @@ func (testSuite *BufferedWriteTest) TestFlush_SignalUploadFailureDuringWrite() {
// Close the channel to simulate failure in uploader.
close(testSuite.bwh.uploadHandler.SignalUploadFailure())

err = testSuite.bwh.Flush()
obj, err := testSuite.bwh.Flush()
require.Error(testSuite.T(), err)
assert.Equal(testSuite.T(), err, ErrUploadFailure)
assert.Nil(testSuite.T(), obj)
}
13 changes: 6 additions & 7 deletions internal/bufferedwrites/upload_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type UploadHandler struct {
freeBlocksCh chan block.Block

// writer to resumable upload the blocks to GCS.
writer io.WriteCloser
writer gcs.Writer

// signalUploadFailure channel will propagate the upload error to file
// inode. This signals permanent failure in the buffered write job.
Expand Down Expand Up @@ -116,7 +116,7 @@ func (uh *UploadHandler) uploader() {
}

// Finalize finalizes the upload.
func (uh *UploadHandler) Finalize() error {
func (uh *UploadHandler) Finalize() (*gcs.Object, error) {
uh.wg.Wait()
close(uh.uploadCh)

Expand All @@ -125,16 +125,15 @@ func (uh *UploadHandler) Finalize() error {
// small writes of size less than 1 block.
err := uh.createObjectWriter()
if err != nil {
return fmt.Errorf("createObjectWriter failed for object %s: %w", uh.objectName, err)
return nil, fmt.Errorf("createObjectWriter failed for object %s: %w", uh.objectName, err)
}
}

err := uh.writer.Close()
obj, err := uh.bucket.FinalizeUpload(context.Background(), uh.writer)
if err != nil {
logger.Errorf("UploadHandler.Finalize(%s): %v", uh.objectName, err)
return fmt.Errorf("writer.Close failed for object %s: %w", uh.objectName, err)
return nil, fmt.Errorf("FinalizeUpload failed for object %s: %w", uh.objectName, err)
}
return nil
return obj, nil
}

func (uh *UploadHandler) SignalUploadFailure() chan error {
Expand Down
50 changes: 32 additions & 18 deletions internal/bufferedwrites/upload_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/googlecloudplatform/gcsfuse/v2/internal/block"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
storagemock "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -64,8 +65,9 @@ func (t *UploadHandlerTest) TestMultipleBlockUpload() {
}
// CreateObjectChunkWriter -- should be called once.
writer := &storagemock.Writer{}
mockObj := &gcs.Object{}
t.mockBucket.On("CreateObjectChunkWriter", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(writer, nil)
writer.On("Close").Return(nil)
t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, nil)

// Upload the blocks.
for _, b := range blocks {
Expand All @@ -74,8 +76,10 @@ func (t *UploadHandlerTest) TestMultipleBlockUpload() {
}

// Finalize.
err := t.uh.Finalize()
obj, err := t.uh.Finalize()
require.NoError(t.T(), err)
require.NotNil(t.T(), obj)
assert.Equal(t.T(), mockObj, obj)
// The blocks should be available on the free channel for reuse.
for _, expect := range blocks {
got := <-t.uh.freeBlocksCh
Expand All @@ -94,7 +98,7 @@ func (t *UploadHandlerTest) TestMultipleBlockUpload() {
}
}

func (t *UploadHandlerTest) TestUpload_CreateObjectWriterFails() {
func (t *UploadHandlerTest) TestUploadWhenCreateObjectWriterFails() {
// Create a block.
b, err := t.blockPool.Get()
require.NoError(t.T(), err)
Expand All @@ -111,49 +115,59 @@ func (t *UploadHandlerTest) TestUpload_CreateObjectWriterFails() {

func (t *UploadHandlerTest) TestFinalizeWithWriterAlreadyPresent() {
writer := &storagemock.Writer{}
writer.On("Close").Return(nil)
mockObj := &gcs.Object{}
t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, nil)
t.uh.writer = writer

err := t.uh.Finalize()
obj, err := t.uh.Finalize()

assert.NoError(t.T(), err)
require.NoError(t.T(), err)
require.NotNil(t.T(), obj)
assert.Equal(t.T(), mockObj, obj)
}

func (t *UploadHandlerTest) TestFinalizeWithNoWriter() {
writer := &storagemock.Writer{}
t.mockBucket.On("CreateObjectChunkWriter", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(writer, nil)
assert.Nil(t.T(), t.uh.writer)
writer.On("Close").Return(nil)
mockObj := &gcs.Object{}
t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, nil)

err := t.uh.Finalize()
obj, err := t.uh.Finalize()

assert.NoError(t.T(), err)
require.NoError(t.T(), err)
require.NotNil(t.T(), obj)
assert.Equal(t.T(), mockObj, obj)
}

func (t *UploadHandlerTest) TestFinalizeWithNoWriter_CreateObjectWriterFails() {
func (t *UploadHandlerTest) TestFinalizeWithNoWriterWhenCreateObjectWriterFails() {
t.mockBucket.On("CreateObjectChunkWriter", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("taco"))
assert.Nil(t.T(), t.uh.writer)

err := t.uh.Finalize()
obj, err := t.uh.Finalize()

require.Error(t.T(), err)
assert.ErrorContains(t.T(), err, "taco")
assert.ErrorContains(t.T(), err, "createObjectWriter")
assert.Nil(t.T(), obj)
}

func (t *UploadHandlerTest) TestFinalize_WriterCloseFails() {
func (t *UploadHandlerTest) TestFinalizeWhenFinalizeUploadFails() {
writer := &storagemock.Writer{}
t.mockBucket.On("CreateObjectChunkWriter", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(writer, nil)
assert.Nil(t.T(), t.uh.writer)
writer.On("Close").Return(fmt.Errorf("taco"))
mockObj := &gcs.Object{}
t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, fmt.Errorf("taco"))

err := t.uh.Finalize()
obj, err := t.uh.Finalize()

require.Error(t.T(), err)
assert.ErrorContains(t.T(), err, "writer.Close")
assert.Nil(t.T(), obj)
assert.ErrorContains(t.T(), err, "taco")
assert.ErrorContains(t.T(), err, "FinalizeUpload failed for object")
}

func (t *UploadHandlerTest) TestUploadHandler_singleBlock_ErrorInCopy() {
func (t *UploadHandlerTest) TestUploadSingleBlockThrowsErrorInCopy() {
// Create a block with test data.
b, err := t.blockPool.Get()
require.NoError(t.T(), err)
Expand All @@ -173,7 +187,7 @@ func (t *UploadHandlerTest) TestUploadHandler_singleBlock_ErrorInCopy() {
assertUploadFailureSignal(t.T(), t.uh)
}

func (t *UploadHandlerTest) TestUploadHandler_multipleBlocks_ErrorInCopy() {
func (t *UploadHandlerTest) TestUploadMultipleBlocksThrowsErrorInCopy() {
// Create some blocks.
var blocks []block.Block
for i := 0; i < 4; i++ {
Expand Down Expand Up @@ -211,7 +225,7 @@ func assertUploadFailureSignal(t *testing.T, handler *UploadHandler) {
}
}

func TestBufferedWriteHandler_SignalUploadFailure(t *testing.T) {
func TestSignalUploadFailure(t *testing.T) {
mockSignalUploadFailure := make(chan error)
uploadHandler := &UploadHandler{
signalUploadFailure: mockSignalUploadFailure,
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/mock/testify_mock_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (m *TestifyMockBucket) CreateObjectChunkWriter(ctx context.Context, req *gc
}

func (m *TestifyMockBucket) FinalizeUpload(ctx context.Context, w gcs.Writer) (*gcs.Object, error) {
args := m.Called(ctx, w.ObjectName())
args := m.Called(ctx, w)
return args.Get(0).(*gcs.Object), args.Error(1)
}

Expand Down

0 comments on commit 1b8139e

Please sign in to comment.