Skip to content

Commit

Permalink
handle out of order writes (#2826)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashmeenkaur authored Dec 23, 2024
1 parent 9b80d44 commit 1046474
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 16 deletions.
5 changes: 3 additions & 2 deletions internal/bufferedwrites/buffered_write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ func NewBWHandler(req *CreateBWHandlerRequest) (bwh *BufferedWriteHandler, err e
BlockSize: req.BlockSize,
ChunkTransferTimeoutSecs: req.ChunkTransferTimeoutSecs,
}),
totalSize: 0,
mtime: time.Now(),
totalSize: 0,
mtime: time.Now(),
truncatedSize: -1,
}
return
}
Expand Down
85 changes: 71 additions & 14 deletions internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,19 +500,26 @@ func (f *FileInode) Read(
func (f *FileInode) Write(
ctx context.Context,
data []byte,
offset int64) (err error) {
offset int64) error {
// For empty GCS files also we will trigger bufferedWrites flow.
if f.src.Size == 0 && f.config.Write.ExperimentalEnableStreamingWrites {
err = f.ensureBufferedWriteHandler(ctx)
err := f.ensureBufferedWriteHandler(ctx)
if err != nil {
return
return err
}
}

if f.bwh != nil {
return f.bwh.Write(data, offset)
return f.writeUsingBufferedWrites(ctx, data, offset)
}

return f.writeUsingTempFile(ctx, data, offset)
}

// Helper function to serve write for file using temp file.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) writeUsingTempFile(ctx context.Context, data []byte, offset int64) (err error) {
// Make sure f.content != nil.
err = f.ensureContent(ctx)
if err != nil {
Expand All @@ -527,6 +534,51 @@ func (f *FileInode) Write(
return
}

// Helper function to serve write for file using buffered writes handler.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) writeUsingBufferedWrites(ctx context.Context, data []byte, offset int64) error {
err := f.bwh.Write(data, offset)
if err == bufferedwrites.ErrOutOfOrderWrite || err == bufferedwrites.ErrUploadFailure {
// Finalize the object.
flushErr := f.flushUsingBufferedWriteHandler()
if flushErr != nil {
return fmt.Errorf("bwh.Write failed: %v, could not finalize what has been written so far: %w", err, flushErr)
}
}

// Fall back to temp file for Out-Of-Order Writes.
if err == bufferedwrites.ErrOutOfOrderWrite {
return f.writeUsingTempFile(ctx, data, offset)
}

return err
}

// Helper function to flush buffered writes handler and update inode state with
// new object.
//
// LOCKS_REQUIRED(f.mu)
func (f *FileInode) flushUsingBufferedWriteHandler() error {
obj, err := f.bwh.Flush()

var preconditionErr *gcs.PreconditionError
if errors.As(err, &preconditionErr) {
return &gcsfuse_errors.FileClobberedError{
Err: fmt.Errorf("f.bwh.Flush(): %w", err),
}
}

// bwh can return a partially synced object along with an error so updating
// inode state before returning error.
f.updateInodeStateAfterSync(obj)
if err != nil {
return fmt.Errorf("f.bwh.Flush(): %w", err)
}

return nil
}

// Set the mtime for this file. May involve a round trip to GCS.
//
// LOCKS_REQUIRED(f.mu)
Expand Down Expand Up @@ -666,21 +718,26 @@ func (f *FileInode) Sync(ctx context.Context) (err error) {
err = fmt.Errorf("SyncObject: %w", err)
return
}

minObj := storageutil.ConvertObjToMinObject(newObj)
// If we wrote out a new object, we need to update our state.
if newObj != nil && !f.localFileCache {
var minObj gcs.MinObject
minObjPtr := storageutil.ConvertObjToMinObject(newObj)
if minObjPtr != nil {
minObj = *minObjPtr
}
f.src = minObj
f.updateInodeStateAfterSync(minObj)
return
}

func (f *FileInode) updateInodeStateAfterSync(minObj *gcs.MinObject) {
if minObj != nil && !f.localFileCache {
f.src = *minObj
// Convert localFile to nonLocalFile after it is synced to GCS.
if f.IsLocal() {
f.local = false
}
f.content.Destroy()
f.content = nil
if f.content != nil {
f.content.Destroy()
f.content = nil
}
if f.bwh != nil {
f.bwh = nil
}
}

return
Expand Down
210 changes: 210 additions & 0 deletions internal/fs/inode/file_streaming_writes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package inode

import (
"context"
"testing"
"time"

"github.com/googlecloudplatform/gcsfuse/v2/cfg"
"github.com/googlecloudplatform/gcsfuse/v2/internal/contentcache"
"github.com/googlecloudplatform/gcsfuse/v2/internal/fs/gcsfuse_errors"
"github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/syncutil"
"github.com/jacobsa/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

const localFile = "local"
const emptyGCSFile = "emptyGCS"

type FileStreamingWritesTest struct {
suite.Suite
ctx context.Context
bucket gcs.Bucket
clock timeutil.SimulatedClock
backingObj *gcs.MinObject
in *FileInode
}

func TestFileStreamingWritesTestSuite(t *testing.T) {
suite.Run(t, new(FileStreamingWritesTest))
}

func (t *FileStreamingWritesTest) SetupTest() {
// Enabling invariant check for all tests.
syncutil.EnableInvariantChecking()
t.ctx = context.Background()
t.clock.SetTime(time.Date(2012, 8, 15, 22, 56, 0, 0, time.Local))
t.bucket = fake.NewFakeBucket(&t.clock, "some_bucket", gcs.NonHierarchical)

// Create the inode.
t.createInode(fileName, localFile)
}

func (t *FileStreamingWritesTest) TearDownTest() {
t.in.Unlock()
}

func (t *FileStreamingWritesTest) createInode(fileName string, fileType string) {
if fileType != emptyGCSFile && fileType != localFile {
t.T().Errorf("fileType should be either local or empty")
}

name := NewFileName(
NewRootName(""),
fileName,
)
syncerBucket := gcsx.NewSyncerBucket(
1, // Append threshold
ChunkTransferTimeoutSecs,
".gcsfuse_tmp/",
t.bucket)

isLocal := false
if fileType == localFile {
t.backingObj = nil
isLocal = true
}

if fileType == emptyGCSFile {
object, err := storageutil.CreateObject(
t.ctx,
t.bucket,
fileName,
[]byte{})
t.backingObj = storageutil.ConvertObjToMinObject(object)

assert.Nil(t.T(), err)
}

t.in = NewFileInode(
fileInodeID,
name,
t.backingObj,
fuseops.InodeAttributes{
Uid: uid,
Gid: gid,
Mode: fileMode,
},
&syncerBucket,
false, // localFileCache
contentcache.New("", &t.clock),
&t.clock,
isLocal,
&cfg.Config{})

// Set buffered write config for created inode.
t.in.config = &cfg.Config{Write: cfg.WriteConfig{
MaxBlocksPerFile: 5,
BlockSizeMb: 1,
ExperimentalEnableStreamingWrites: true,
GlobalMaxBlocks: 10,
}}

// Create write handler for the local inode created above.
err := t.in.CreateBufferedOrTempWriter(t.ctx)
assert.Nil(t.T(), err)

t.in.Lock()
}

////////////////////////////////////////////////////////////////////////
// Tests
////////////////////////////////////////////////////////////////////////

func (t *FileStreamingWritesTest) TestOutOfOrderWritesToLocalFileFallBackToTempFile() {
testCases := []struct {
name string
offset int64
expectedContent string
}{
{
name: "ahead_of_current_offset",
offset: 5,
expectedContent: "taco\x00hello",
},
{
name: "zero_offset",
offset: 0,
expectedContent: "hello",
},
{
name: "before_current_offset",
offset: 2,
expectedContent: "tahello",
},
}

for _, tc := range testCases {
t.Run(tc.name, func() {
assert.True(t.T(), t.in.IsLocal())
createTime := t.in.mtimeClock.Now()
err := t.in.Write(t.ctx, []byte("taco"), 0)
require.Nil(t.T(), err)
require.NotNil(t.T(), t.in.bwh)
assert.Equal(t.T(), int64(4), t.in.bwh.WriteFileInfo().TotalSize)

err = t.in.Write(t.ctx, []byte("hello"), tc.offset)
require.Nil(t.T(), err)

// Ensure bwh cleared and temp file created.
assert.Nil(t.T(), t.in.bwh)
assert.NotNil(t.T(), t.in.content)
// The inode should agree about the new mtime and size.
attrs, err := t.in.Attributes(t.ctx)
require.Nil(t.T(), err)
assert.Equal(t.T(), uint64(len(tc.expectedContent)), attrs.Size)
assert.WithinDuration(t.T(), attrs.Mtime, createTime, Delta)
// sync file and validate content
err = t.in.Sync(t.ctx)
require.Nil(t.T(), err)
// Read the object's contents.
contents, err := storageutil.ReadObject(t.ctx, t.bucket, t.in.Name().GcsObjectName())
assert.Nil(t.T(), err)
assert.Equal(t.T(), tc.expectedContent, string(contents))
t.TearDownTest()
t.SetupTest()
})
}
}

func (t *FileStreamingWritesTest) TestOutOfOrderWritesOnClobberedFileThrowsError() {
err := t.in.Write(t.ctx, []byte("hi"), 0)
require.Nil(t.T(), err)
require.NotNil(t.T(), t.in.bwh)
assert.Equal(t.T(), int64(2), t.in.bwh.WriteFileInfo().TotalSize)
// Clobber the file.
objWritten, err := storageutil.CreateObject(t.ctx, t.bucket, fileName, []byte("taco"))
require.Nil(t.T(), err)

err = t.in.Write(t.ctx, []byte("hello"), 10)

require.Error(t.T(), err)
var fileClobberedError *gcsfuse_errors.FileClobberedError
assert.ErrorAs(t.T(), err, &fileClobberedError)
// Validate Object on GCS not updated.
statReq := &gcs.StatObjectRequest{Name: t.in.Name().GcsObjectName()}
objGot, _, err := t.bucket.StatObject(t.ctx, statReq)
assert.Nil(t.T(), err)
assert.Equal(t.T(), storageutil.ConvertObjToMinObject(objWritten), objGot)
}

0 comments on commit 1046474

Please sign in to comment.