Skip to content

Commit

Permalink
update buffered write sync and flush methods (#2794)
Browse files Browse the repository at this point in the history
* update bw sync and flush methods

* review comments
  • Loading branch information
ashmeenkaur authored Dec 13, 2024
1 parent c9bf976 commit ed4181d
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 33 deletions.
25 changes: 17 additions & 8 deletions internal/bufferedwrites/buffered_write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,36 +119,45 @@ func (wh *BufferedWriteHandler) Write(data []byte, offset int64) (err error) {

// Sync uploads all the pending full buffers to GCS.
func (wh *BufferedWriteHandler) Sync() (err error) {
// TODO: Will be added after uploadHandler changes are done.
return fmt.Errorf("not implemented")
}
wh.uploadHandler.AwaitBlocksUpload()

// Flush finalizes the upload.
func (wh *BufferedWriteHandler) Flush() (*gcs.Object, error) {
// Fail early if the uploadHandler has failed.
select {
case <-wh.uploadHandler.SignalUploadFailure():
return nil, ErrUploadFailure
return ErrUploadFailure
default:
break
return nil
}
}

// Flush finalizes the upload.
func (wh *BufferedWriteHandler) Flush() (*gcs.Object, error) {
if wh.current != nil {
err := wh.uploadHandler.Upload(wh.current)
if err != nil {
return nil, err
}
wh.current = nil
}

obj, err := wh.uploadHandler.Finalize()
if err != nil {
return nil, fmt.Errorf("BufferedWriteHandler.Flush(): %w", err)
}

err = wh.blockPool.ClearFreeBlockChannel()
if err != nil {
// Only logging an error in case of resource leak as upload succeeded.
logger.Errorf("blockPool.ClearFreeBlockChannel() failed: %v", err)
}

// Return an error along with object if the uploadHandler failed in between.
select {
case <-wh.uploadHandler.SignalUploadFailure():
return obj, ErrUploadFailure
default:
break
}

return obj, nil
}

Expand Down
67 changes: 63 additions & 4 deletions internal/bufferedwrites/buffered_write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/operations"
"github.com/jacobsa/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -39,7 +40,7 @@ func TestBufferedWriteTestSuite(t *testing.T) {

func (testSuite *BufferedWriteTest) SetupTest() {
bucket := fake.NewFakeBucket(timeutil.RealClock(), "FakeBucketName", gcs.NonHierarchical)
bwh, err := NewBWHandler("testObject", bucket, 1024, 10, semaphore.NewWeighted(10))
bwh, err := NewBWHandler("testObject", bucket, blockSize, 10, semaphore.NewWeighted(10))
require.Nil(testSuite.T(), err)
testSuite.bwh = bwh
}
Expand Down Expand Up @@ -135,7 +136,7 @@ func (testSuite *BufferedWriteTest) TestMultipleWrites() {
assert.Equal(testSuite.T(), int64(13), fileInfo.TotalSize)
}

func (testSuite *BufferedWriteTest) TestWrite_SignalUploadFailureInBetween() {
func (testSuite *BufferedWriteTest) TestWriteWithSignalUploadFailureInBetween() {
err := testSuite.bwh.Write([]byte("hello"), 0)
require.Nil(testSuite.T(), err)
fileInfo := testSuite.bwh.WriteFileInfo()
Expand Down Expand Up @@ -176,7 +177,7 @@ func (testSuite *BufferedWriteTest) TestFlushWithNilCurrentBlock() {
assert.Equal(testSuite.T(), uint64(0), obj.Size)
}

func (testSuite *BufferedWriteTest) TestFlush_SignalUploadFailureDuringWrite() {
func (testSuite *BufferedWriteTest) TestFlushWithSignalUploadFailureDuringWrite() {
err := testSuite.bwh.Write([]byte("hi"), 0)
require.Nil(testSuite.T(), err)

Expand All @@ -186,5 +187,63 @@ func (testSuite *BufferedWriteTest) TestFlush_SignalUploadFailureDuringWrite() {
obj, err := testSuite.bwh.Flush()
require.Error(testSuite.T(), err)
assert.Equal(testSuite.T(), err, ErrUploadFailure)
assert.Nil(testSuite.T(), obj)
// Whatever could be finalized, got finalized (empty object in this case).
assert.NotNil(testSuite.T(), obj)
assert.Equal(testSuite.T(), uint64(0), obj.Size)
}

func (testSuite *BufferedWriteTest) TestFlushWithMultiBlockWritesAndSignalUploadFailureInBetween() {
buffer, err := operations.GenerateRandomData(blockSize)
assert.NoError(testSuite.T(), err)
// Upload and sync 5 blocks.
testSuite.TestSync5InProgressBlocks()
// Close the channel to simulate failure in uploader.
close(testSuite.bwh.uploadHandler.SignalUploadFailure())
// Write 5 more blocks.
for i := 0; i < 5; i++ {
err := testSuite.bwh.Write(buffer, int64(blockSize*(i+5)))
require.Error(testSuite.T(), err)
}

obj, err := testSuite.bwh.Flush()

require.Error(testSuite.T(), err)
assert.Equal(testSuite.T(), err, ErrUploadFailure)
// Whatever could be finalized, got finalized.
assert.NotNil(testSuite.T(), obj)
assert.Equal(testSuite.T(), uint64(5*blockSize), obj.Size)
}

func (testSuite *BufferedWriteTest) TestSync5InProgressBlocks() {
buffer, err := operations.GenerateRandomData(blockSize)
assert.NoError(testSuite.T(), err)
// Write 5 blocks.
for i := 0; i < 5; i++ {
err = testSuite.bwh.Write(buffer, int64(blockSize*i))
require.Nil(testSuite.T(), err)
}

// Wait for 5 blocks to upload successfully.
err = testSuite.bwh.Sync()

assert.NoError(testSuite.T(), err)
assert.Equal(testSuite.T(), 0, len(testSuite.bwh.uploadHandler.uploadCh))
assert.Equal(testSuite.T(), 5, len(testSuite.bwh.blockPool.FreeBlocksChannel()))
}

func (testSuite *BufferedWriteTest) TestSyncBlocksWithError() {
buffer, err := operations.GenerateRandomData(blockSize)
assert.NoError(testSuite.T(), err)
// Write 5 blocks.
for i := 0; i < 5; i++ {
err = testSuite.bwh.Write(buffer, int64(blockSize*i))
require.Nil(testSuite.T(), err)
}
// Close the channel to simulate failure in uploader.
close(testSuite.bwh.uploadHandler.SignalUploadFailure())

err = testSuite.bwh.Sync()

assert.Error(testSuite.T(), err)
assert.Equal(testSuite.T(), ErrUploadFailure, err)
}
20 changes: 13 additions & 7 deletions internal/bufferedwrites/upload_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,17 @@ func (uh *UploadHandler) createObjectWriter() (err error) {
// uploader is the single-threaded goroutine that uploads blocks.
func (uh *UploadHandler) uploader() {
for currBlock := range uh.uploadCh {
_, err := io.Copy(uh.writer, currBlock.Reader())
if err != nil {
logger.Errorf("buffered write upload failed for object %s: error in io.Copy: %v", uh.objectName, err)
// Close the channel to signal upload failure.
close(uh.signalUploadFailure)
return
select {
case <-uh.signalUploadFailure:
default:
_, err := io.Copy(uh.writer, currBlock.Reader())
if err != nil {
logger.Errorf("buffered write upload failed for object %s: error in io.Copy: %v", uh.objectName, err)
// Close the channel to signal upload failure.
close(uh.signalUploadFailure)
}
}
uh.wg.Done()

// Put back the uploaded block on the freeBlocksChannel for re-use.
uh.freeBlocksCh <- currBlock
}
Expand Down Expand Up @@ -139,3 +141,7 @@ func (uh *UploadHandler) Finalize() (*gcs.Object, error) {
func (uh *UploadHandler) SignalUploadFailure() chan error {
return uh.signalUploadFailure
}

func (uh *UploadHandler) AwaitBlocksUpload() {
uh.wg.Wait()
}
63 changes: 49 additions & 14 deletions internal/bufferedwrites/upload_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
)

const (
blockSize = 1024
blockSize = 1024
maxBlocks int64 = 5
)

type UploadHandlerTest struct {
Expand All @@ -47,10 +48,9 @@ func TestUploadHandlerTestSuite(t *testing.T) {
}

func (t *UploadHandlerTest) SetupTest() {
var maxBlocks int64 = 5
t.mockBucket = new(storagemock.TestifyMockBucket)
var err error
t.blockPool, err = block.NewBlockPool(blockSize, maxBlocks, semaphore.NewWeighted(5))
t.blockPool, err = block.NewBlockPool(blockSize, maxBlocks, semaphore.NewWeighted(maxBlocks))
require.NoError(t.T(), err)
t.uh = newUploadHandler("testObject", t.mockBucket, maxBlocks, t.blockPool.FreeBlocksChannel(), blockSize)
}
Expand Down Expand Up @@ -85,17 +85,7 @@ func (t *UploadHandlerTest) TestMultipleBlockUpload() {
got := <-t.uh.freeBlocksCh
assert.Equal(t.T(), expect, got)
}
// All goroutines for upload should have exited.
done := make(chan struct{})
go func() {
t.uh.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(100 * time.Millisecond):
t.T().Error("Timeout waiting for WaitGroup")
}
assertAllBlocksProcessed(t.T(), t.uh)
}

func (t *UploadHandlerTest) TestUploadWhenCreateObjectWriterFails() {
Expand Down Expand Up @@ -185,6 +175,8 @@ func (t *UploadHandlerTest) TestUploadSingleBlockThrowsErrorInCopy() {
require.NoError(t.T(), err)
// Expect an error on the signalUploadFailure channel due to error while copying content to GCS writer.
assertUploadFailureSignal(t.T(), t.uh)
assertAllBlocksProcessed(t.T(), t.uh)
assert.Equal(t.T(), 1, len(t.uh.freeBlocksCh))
}

func (t *UploadHandlerTest) TestUploadMultipleBlocksThrowsErrorInCopy() {
Expand Down Expand Up @@ -212,6 +204,8 @@ func (t *UploadHandlerTest) TestUploadMultipleBlocksThrowsErrorInCopy() {
}

assertUploadFailureSignal(t.T(), t.uh)
assertAllBlocksProcessed(t.T(), t.uh)
assert.Equal(t.T(), 4, len(t.uh.freeBlocksCh))
}

func assertUploadFailureSignal(t *testing.T, handler *UploadHandler) {
Expand All @@ -225,6 +219,22 @@ func assertUploadFailureSignal(t *testing.T, handler *UploadHandler) {
}
}

func assertAllBlocksProcessed(t *testing.T, handler *UploadHandler) {
t.Helper()

// All blocks for upload should have been processed.
done := make(chan struct{})
go func() {
handler.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(100 * time.Millisecond):
t.Error("Timeout waiting for WaitGroup")
}
}

func TestSignalUploadFailure(t *testing.T) {
mockSignalUploadFailure := make(chan error)
uploadHandler := &UploadHandler{
Expand All @@ -235,3 +245,28 @@ func TestSignalUploadFailure(t *testing.T) {

assert.Equal(t, mockSignalUploadFailure, actualChannel)
}

func (t *UploadHandlerTest) TestMultipleBlockAwaitBlocksUpload() {
// Create some blocks.
var blocks []block.Block
for i := 0; i < 5; i++ {
b, err := t.blockPool.Get()
require.NoError(t.T(), err)
blocks = append(blocks, b)
}
// CreateObjectChunkWriter -- should be called once.
writer := &storagemock.Writer{}
t.mockBucket.On("CreateObjectChunkWriter", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(writer, nil)
// Upload the blocks.
for _, b := range blocks {
err := t.uh.Upload(b)
require.NoError(t.T(), err)
}

// AwaitBlocksUpload.
t.uh.AwaitBlocksUpload()

assert.Equal(t.T(), 5, len(t.uh.freeBlocksCh))
assert.Equal(t.T(), 0, len(t.uh.uploadCh))
assertAllBlocksProcessed(t.T(), t.uh)
}

0 comments on commit ed4181d

Please sign in to comment.