From 1046474fe5b1c8fd3f2f9224676e8adabe89ceae Mon Sep 17 00:00:00 2001 From: Ashmeen Kaur <57195160+ashmeenkaur@users.noreply.github.com> Date: Mon, 23 Dec 2024 14:52:40 +0530 Subject: [PATCH] handle out of order writes (#2826) --- .../bufferedwrites/buffered_write_handler.go | 5 +- internal/fs/inode/file.go | 85 +++++-- .../fs/inode/file_streaming_writes_test.go | 210 ++++++++++++++++++ 3 files changed, 284 insertions(+), 16 deletions(-) create mode 100644 internal/fs/inode/file_streaming_writes_test.go diff --git a/internal/bufferedwrites/buffered_write_handler.go b/internal/bufferedwrites/buffered_write_handler.go index 97a9a77851..e7bb7a85c6 100644 --- a/internal/bufferedwrites/buffered_write_handler.go +++ b/internal/bufferedwrites/buffered_write_handler.go @@ -88,8 +88,9 @@ func NewBWHandler(req *CreateBWHandlerRequest) (bwh *BufferedWriteHandler, err e BlockSize: req.BlockSize, ChunkTransferTimeoutSecs: req.ChunkTransferTimeoutSecs, }), - totalSize: 0, - mtime: time.Now(), + totalSize: 0, + mtime: time.Now(), + truncatedSize: -1, } return } diff --git a/internal/fs/inode/file.go b/internal/fs/inode/file.go index 71a219eb80..9c60fbf4fd 100644 --- a/internal/fs/inode/file.go +++ b/internal/fs/inode/file.go @@ -500,19 +500,26 @@ func (f *FileInode) Read( func (f *FileInode) Write( ctx context.Context, data []byte, - offset int64) (err error) { + offset int64) error { // For empty GCS files also we will trigger bufferedWrites flow. if f.src.Size == 0 && f.config.Write.ExperimentalEnableStreamingWrites { - err = f.ensureBufferedWriteHandler(ctx) + err := f.ensureBufferedWriteHandler(ctx) if err != nil { - return + return err } } if f.bwh != nil { - return f.bwh.Write(data, offset) + return f.writeUsingBufferedWrites(ctx, data, offset) } + return f.writeUsingTempFile(ctx, data, offset) +} + +// Helper function to serve write for file using temp file. +// +// LOCKS_REQUIRED(f.mu) +func (f *FileInode) writeUsingTempFile(ctx context.Context, data []byte, offset int64) (err error) { // Make sure f.content != nil. err = f.ensureContent(ctx) if err != nil { @@ -527,6 +534,51 @@ func (f *FileInode) Write( return } +// Helper function to serve write for file using buffered writes handler. +// +// LOCKS_REQUIRED(f.mu) +func (f *FileInode) writeUsingBufferedWrites(ctx context.Context, data []byte, offset int64) error { + err := f.bwh.Write(data, offset) + if err == bufferedwrites.ErrOutOfOrderWrite || err == bufferedwrites.ErrUploadFailure { + // Finalize the object. + flushErr := f.flushUsingBufferedWriteHandler() + if flushErr != nil { + return fmt.Errorf("bwh.Write failed: %v, could not finalize what has been written so far: %w", err, flushErr) + } + } + + // Fall back to temp file for Out-Of-Order Writes. + if err == bufferedwrites.ErrOutOfOrderWrite { + return f.writeUsingTempFile(ctx, data, offset) + } + + return err +} + +// Helper function to flush buffered writes handler and update inode state with +// new object. +// +// LOCKS_REQUIRED(f.mu) +func (f *FileInode) flushUsingBufferedWriteHandler() error { + obj, err := f.bwh.Flush() + + var preconditionErr *gcs.PreconditionError + if errors.As(err, &preconditionErr) { + return &gcsfuse_errors.FileClobberedError{ + Err: fmt.Errorf("f.bwh.Flush(): %w", err), + } + } + + // bwh can return a partially synced object along with an error so updating + // inode state before returning error. + f.updateInodeStateAfterSync(obj) + if err != nil { + return fmt.Errorf("f.bwh.Flush(): %w", err) + } + + return nil +} + // Set the mtime for this file. May involve a round trip to GCS. // // LOCKS_REQUIRED(f.mu) @@ -666,21 +718,26 @@ func (f *FileInode) Sync(ctx context.Context) (err error) { err = fmt.Errorf("SyncObject: %w", err) return } - + minObj := storageutil.ConvertObjToMinObject(newObj) // If we wrote out a new object, we need to update our state. - if newObj != nil && !f.localFileCache { - var minObj gcs.MinObject - minObjPtr := storageutil.ConvertObjToMinObject(newObj) - if minObjPtr != nil { - minObj = *minObjPtr - } - f.src = minObj + f.updateInodeStateAfterSync(minObj) + return +} + +func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) { + if minObj != nil && !f.localFileCache { + f.src = *minObj // Convert localFile to nonLocalFile after it is synced to GCS. if f.IsLocal() { f.local = false } - f.content.Destroy() - f.content = nil + if f.content != nil { + f.content.Destroy() + f.content = nil + } + if f.bwh != nil { + f.bwh = nil + } } return diff --git a/internal/fs/inode/file_streaming_writes_test.go b/internal/fs/inode/file_streaming_writes_test.go new file mode 100644 index 0000000000..27bc9eaf15 --- /dev/null +++ b/internal/fs/inode/file_streaming_writes_test.go @@ -0,0 +1,210 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inode + +import ( + "context" + "testing" + "time" + + "github.com/googlecloudplatform/gcsfuse/v2/cfg" + "github.com/googlecloudplatform/gcsfuse/v2/internal/contentcache" + "github.com/googlecloudplatform/gcsfuse/v2/internal/fs/gcsfuse_errors" + "github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx" + "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake" + "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" + "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil" + "github.com/jacobsa/fuse/fuseops" + "github.com/jacobsa/syncutil" + "github.com/jacobsa/timeutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +const localFile = "local" +const emptyGCSFile = "emptyGCS" + +type FileStreamingWritesTest struct { + suite.Suite + ctx context.Context + bucket gcs.Bucket + clock timeutil.SimulatedClock + backingObj *gcs.MinObject + in *FileInode +} + +func TestFileStreamingWritesTestSuite(t *testing.T) { + suite.Run(t, new(FileStreamingWritesTest)) +} + +func (t *FileStreamingWritesTest) SetupTest() { + // Enabling invariant check for all tests. + syncutil.EnableInvariantChecking() + t.ctx = context.Background() + t.clock.SetTime(time.Date(2012, 8, 15, 22, 56, 0, 0, time.Local)) + t.bucket = fake.NewFakeBucket(&t.clock, "some_bucket", gcs.NonHierarchical) + + // Create the inode. + t.createInode(fileName, localFile) +} + +func (t *FileStreamingWritesTest) TearDownTest() { + t.in.Unlock() +} + +func (t *FileStreamingWritesTest) createInode(fileName string, fileType string) { + if fileType != emptyGCSFile && fileType != localFile { + t.T().Errorf("fileType should be either local or empty") + } + + name := NewFileName( + NewRootName(""), + fileName, + ) + syncerBucket := gcsx.NewSyncerBucket( + 1, // Append threshold + ChunkTransferTimeoutSecs, + ".gcsfuse_tmp/", + t.bucket) + + isLocal := false + if fileType == localFile { + t.backingObj = nil + isLocal = true + } + + if fileType == emptyGCSFile { + object, err := storageutil.CreateObject( + t.ctx, + t.bucket, + fileName, + []byte{}) + t.backingObj = storageutil.ConvertObjToMinObject(object) + + assert.Nil(t.T(), err) + } + + t.in = NewFileInode( + fileInodeID, + name, + t.backingObj, + fuseops.InodeAttributes{ + Uid: uid, + Gid: gid, + Mode: fileMode, + }, + &syncerBucket, + false, // localFileCache + contentcache.New("", &t.clock), + &t.clock, + isLocal, + &cfg.Config{}) + + // Set buffered write config for created inode. + t.in.config = &cfg.Config{Write: cfg.WriteConfig{ + MaxBlocksPerFile: 5, + BlockSizeMb: 1, + ExperimentalEnableStreamingWrites: true, + GlobalMaxBlocks: 10, + }} + + // Create write handler for the local inode created above. + err := t.in.CreateBufferedOrTempWriter(t.ctx) + assert.Nil(t.T(), err) + + t.in.Lock() +} + +//////////////////////////////////////////////////////////////////////// +// Tests +//////////////////////////////////////////////////////////////////////// + +func (t *FileStreamingWritesTest) TestOutOfOrderWritesToLocalFileFallBackToTempFile() { + testCases := []struct { + name string + offset int64 + expectedContent string + }{ + { + name: "ahead_of_current_offset", + offset: 5, + expectedContent: "taco\x00hello", + }, + { + name: "zero_offset", + offset: 0, + expectedContent: "hello", + }, + { + name: "before_current_offset", + offset: 2, + expectedContent: "tahello", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func() { + assert.True(t.T(), t.in.IsLocal()) + createTime := t.in.mtimeClock.Now() + err := t.in.Write(t.ctx, []byte("taco"), 0) + require.Nil(t.T(), err) + require.NotNil(t.T(), t.in.bwh) + assert.Equal(t.T(), int64(4), t.in.bwh.WriteFileInfo().TotalSize) + + err = t.in.Write(t.ctx, []byte("hello"), tc.offset) + require.Nil(t.T(), err) + + // Ensure bwh cleared and temp file created. + assert.Nil(t.T(), t.in.bwh) + assert.NotNil(t.T(), t.in.content) + // The inode should agree about the new mtime and size. + attrs, err := t.in.Attributes(t.ctx) + require.Nil(t.T(), err) + assert.Equal(t.T(), uint64(len(tc.expectedContent)), attrs.Size) + assert.WithinDuration(t.T(), attrs.Mtime, createTime, Delta) + // sync file and validate content + err = t.in.Sync(t.ctx) + require.Nil(t.T(), err) + // Read the object's contents. + contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName()) + assert.Nil(t.T(), err) + assert.Equal(t.T(), tc.expectedContent, string(contents)) + t.TearDownTest() + t.SetupTest() + }) + } +} + +func (t *FileStreamingWritesTest) TestOutOfOrderWritesOnClobberedFileThrowsError() { + err := t.in.Write(t.ctx, []byte("hi"), 0) + require.Nil(t.T(), err) + require.NotNil(t.T(), t.in.bwh) + assert.Equal(t.T(), int64(2), t.in.bwh.WriteFileInfo().TotalSize) + // Clobber the file. + objWritten, err := storageutil.CreateObject(t.ctx, t.bucket, fileName, []byte("taco")) + require.Nil(t.T(), err) + + err = t.in.Write(t.ctx, []byte("hello"), 10) + + require.Error(t.T(), err) + var fileClobberedError *gcsfuse_errors.FileClobberedError + assert.ErrorAs(t.T(), err, &fileClobberedError) + // Validate Object on GCS not updated. + statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()} + objGot, _, err := t.bucket.StatObject(t.ctx, statReq) + assert.Nil(t.T(), err) + assert.Equal(t.T(), storageutil.ConvertObjToMinObject(objWritten), objGot) +}