Skip to content

Commit

Permalink
Allow upload handler to create chunk writer for empty synced object (#…
Browse files Browse the repository at this point in the history
…2797)

* allow buffered writes to create writer for empty object

* review comments

* rename ResolveCreateObjectRequest to NewCreateObjectRequest

* fix flaky test
  • Loading branch information
ashmeenkaur authored Dec 17, 2024
1 parent cb48129 commit 9223376
Show file tree
Hide file tree
Showing 15 changed files with 420 additions and 191 deletions.
48 changes: 33 additions & 15 deletions internal/bufferedwrites/buffered_write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,26 +59,52 @@ type WriteFileInfo struct {
var ErrOutOfOrderWrite = errors.New("outOfOrder write detected")
var ErrUploadFailure = errors.New("error while uploading object to GCS")

type CreateBWHandlerRequest struct {
Object *gcs.Object
ObjectName string
Bucket gcs.Bucket
BlockSize int64
MaxBlocksPerFile int64
GlobalMaxBlocksSem *semaphore.Weighted
ChunkTransferTimeoutSecs int64
}

// NewBWHandler creates the bufferedWriteHandler struct.
func NewBWHandler(objectName string, bucket gcs.Bucket, blockSize int64, maxBlocks int64, globalMaxBlocksSem *semaphore.Weighted) (bwh *BufferedWriteHandler, err error) {
bp, err := block.NewBlockPool(blockSize, maxBlocks, globalMaxBlocksSem)
func NewBWHandler(req *CreateBWHandlerRequest) (bwh *BufferedWriteHandler, err error) {
bp, err := block.NewBlockPool(req.BlockSize, req.MaxBlocksPerFile, req.GlobalMaxBlocksSem)
if err != nil {
return
}

bwh = &BufferedWriteHandler{
current: nil,
blockPool: bp,
uploadHandler: newUploadHandler(objectName, bucket, maxBlocks, bp.FreeBlocksChannel(), blockSize),
totalSize: 0,
mtime: time.Now(),
current: nil,
blockPool: bp,
uploadHandler: newUploadHandler(&CreateUploadHandlerRequest{
Object: req.Object,
ObjectName: req.ObjectName,
Bucket: req.Bucket,
FreeBlocksCh: bp.FreeBlocksChannel(),
MaxBlocksPerFile: req.MaxBlocksPerFile,
BlockSize: req.BlockSize,
ChunkTransferTimeoutSecs: req.ChunkTransferTimeoutSecs,
}),
totalSize: 0,
mtime: time.Now(),
}
return
}

// Write writes the given data to the buffer. It writes to an existing buffer if
// the capacity is available otherwise writes to a new buffer.
func (wh *BufferedWriteHandler) Write(data []byte, offset int64) (err error) {
// Fail early if the uploadHandler has failed.
select {
case <-wh.uploadHandler.SignalUploadFailure():
return ErrUploadFailure
default:
break
}

if offset != wh.totalSize && offset != wh.truncatedSize {
logger.Errorf("BufferedWriteHandler.OutOfOrderError for object: %s, expectedOffset: %d, actualOffset: %d",
wh.uploadHandler.objectName, wh.totalSize, offset)
Expand All @@ -93,14 +119,6 @@ func (wh *BufferedWriteHandler) Write(data []byte, offset int64) (err error) {
}
}

// Fail early if the uploadHandler has failed.
select {
case <-wh.uploadHandler.SignalUploadFailure():
return ErrUploadFailure
default:
break
}

return wh.appendBuffer(data)
}

Expand Down
13 changes: 12 additions & 1 deletion internal/bufferedwrites/buffered_write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"golang.org/x/sync/semaphore"
)

const chunkTransferTimeoutSecs int64 = 10

type BufferedWriteTest struct {
bwh *BufferedWriteHandler
suite.Suite
Expand All @@ -40,7 +42,15 @@ func TestBufferedWriteTestSuite(t *testing.T) {

func (testSuite *BufferedWriteTest) SetupTest() {
bucket := fake.NewFakeBucket(timeutil.RealClock(), "FakeBucketName", gcs.NonHierarchical)
bwh, err := NewBWHandler("testObject", bucket, blockSize, 10, semaphore.NewWeighted(10))
bwh, err := NewBWHandler(&CreateBWHandlerRequest{
Object: nil,
ObjectName: "testObject",
Bucket: bucket,
BlockSize: blockSize,
MaxBlocksPerFile: 10,
GlobalMaxBlocksSem: semaphore.NewWeighted(10),
ChunkTransferTimeoutSecs: chunkTransferTimeoutSecs,
})
require.Nil(testSuite.T(), err)
testSuite.bwh = bwh
}
Expand Down Expand Up @@ -236,6 +246,7 @@ func (testSuite *BufferedWriteTest) TestFlushWithMultiBlockWritesAndSignalUpload
for i := 0; i < 5; i++ {
err := testSuite.bwh.Write(buffer, int64(blockSize*(i+5)))
require.Error(testSuite.T(), err)
assert.Equal(testSuite.T(), ErrUploadFailure, err)
}

obj, err := testSuite.bwh.Flush()
Expand Down
45 changes: 27 additions & 18 deletions internal/bufferedwrites/upload_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,35 @@ type UploadHandler struct {
signalUploadFailure chan error

// Parameters required for creating a new GCS chunk writer.
bucket gcs.Bucket
objectName string
blockSize int64
bucket gcs.Bucket
objectName string
obj *gcs.Object
chunkTransferTimeout int64
blockSize int64
}

type CreateUploadHandlerRequest struct {
Object *gcs.Object
ObjectName string
Bucket gcs.Bucket
FreeBlocksCh chan block.Block
MaxBlocksPerFile int64
BlockSize int64
ChunkTransferTimeoutSecs int64
}

// newUploadHandler creates the UploadHandler struct.
func newUploadHandler(objectName string, bucket gcs.Bucket, maxBlocks int64, freeBlocksCh chan block.Block, blockSize int64) *UploadHandler {
func newUploadHandler(req *CreateUploadHandlerRequest) *UploadHandler {
uh := &UploadHandler{
uploadCh: make(chan block.Block, maxBlocks),
wg: sync.WaitGroup{},
freeBlocksCh: freeBlocksCh,
bucket: bucket,
objectName: objectName,
blockSize: blockSize,
signalUploadFailure: make(chan error, 1),
uploadCh: make(chan block.Block, req.MaxBlocksPerFile),
wg: sync.WaitGroup{},
freeBlocksCh: req.FreeBlocksCh,
bucket: req.Bucket,
objectName: req.ObjectName,
obj: req.Object,
blockSize: req.BlockSize,
signalUploadFailure: make(chan error, 1),
chunkTransferTimeout: req.ChunkTransferTimeoutSecs,
}
return uh
}
Expand All @@ -86,12 +100,7 @@ func (uh *UploadHandler) Upload(block block.Block) error {

// createObjectWriter creates a GCS object writer.
func (uh *UploadHandler) createObjectWriter() (err error) {
var preCond int64
req := &gcs.CreateObjectRequest{
Name: uh.objectName,
GenerationPrecondition: &preCond,
Metadata: make(map[string]string),
}
req := gcs.NewCreateObjectRequest(uh.obj, uh.objectName, nil, uh.chunkTransferTimeout)
// We need a new context here, since the first writeFile() call will be complete
// (and context will be cancelled) by the time complete upload is done.
uh.writer, err = uh.bucket.CreateObjectChunkWriter(context.Background(), req, int(uh.blockSize), nil)
Expand All @@ -111,9 +120,9 @@ func (uh *UploadHandler) uploader() {
close(uh.signalUploadFailure)
}
}
uh.wg.Done()
// Put back the uploaded block on the freeBlocksChannel for re-use.
uh.freeBlocksCh <- currBlock
uh.wg.Done()
}
}

Expand Down
72 changes: 71 additions & 1 deletion internal/bufferedwrites/upload_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@ func (t *UploadHandlerTest) SetupTest() {
var err error
t.blockPool, err = block.NewBlockPool(blockSize, maxBlocks, semaphore.NewWeighted(maxBlocks))
require.NoError(t.T(), err)
t.uh = newUploadHandler("testObject", t.mockBucket, maxBlocks, t.blockPool.FreeBlocksChannel(), blockSize)
t.uh = newUploadHandler(&CreateUploadHandlerRequest{
Object: nil,
ObjectName: "testObject",
Bucket: t.mockBucket,
FreeBlocksCh: t.blockPool.FreeBlocksChannel(),
MaxBlocksPerFile: maxBlocks,
BlockSize: blockSize,
ChunkTransferTimeoutSecs: chunkTransferTimeoutSecs,
})
}

func (t *UploadHandlerTest) TestMultipleBlockUpload() {
Expand Down Expand Up @@ -270,3 +278,65 @@ func (t *UploadHandlerTest) TestMultipleBlockAwaitBlocksUpload() {
assert.Equal(t.T(), 0, len(t.uh.uploadCh))
assertAllBlocksProcessed(t.T(), t.uh)
}

func (t *UploadHandlerTest) TestCreateObjectChunkWriterIsCalledWithCorrectRequestParametersForEmptyGCSObject() {
t.uh.obj = &gcs.Object{
Name: t.uh.objectName,
ContentType: "image/png",
Size: 0,
ContentEncoding: "gzip",
Generation: 10,
MetaGeneration: 20,
Acl: nil,
}

// CreateObjectChunkWriter -- should be called once with correct request parameters.
writer := &storagemock.Writer{}
mockObj := &gcs.Object{}
t.mockBucket.On("CreateObjectChunkWriter",
mock.Anything,
mock.MatchedBy(func(req *gcs.CreateObjectRequest) bool {
return req.Name == t.uh.objectName &&
*req.GenerationPrecondition == t.uh.obj.Generation &&
*req.MetaGenerationPrecondition == t.uh.obj.MetaGeneration &&
req.ContentEncoding == t.uh.obj.ContentEncoding &&
req.ContentType == t.uh.obj.ContentType &&
req.ChunkTransferTimeoutSecs == chunkTransferTimeoutSecs
}),
mock.Anything,
mock.Anything).Return(writer, nil)
t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, nil)

// Create a block.
b, err := t.blockPool.Get()
require.NoError(t.T(), err)
// Upload the block.
err = t.uh.Upload(b)
require.NoError(t.T(), err)
}

func (t *UploadHandlerTest) TestCreateObjectChunkWriterIsCalledWithCorrectRequestParametersForLocalInode() {
assert.Nil(t.T(), t.uh.obj)

// CreateObjectChunkWriter -- should be called once with correct request parameters.
writer := &storagemock.Writer{}
mockObj := &gcs.Object{}
t.mockBucket.On("CreateObjectChunkWriter",
mock.Anything,
mock.MatchedBy(func(req *gcs.CreateObjectRequest) bool {
return req.Name == t.uh.objectName &&
*req.GenerationPrecondition == 0 &&
req.MetaGenerationPrecondition == nil &&
req.ChunkTransferTimeoutSecs == chunkTransferTimeoutSecs
}),
mock.Anything,
mock.Anything).Return(writer, nil)
t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, nil)

// Create a block.
b, err := t.blockPool.Get()
require.NoError(t.T(), err)
// Upload the block.
err = t.uh.Upload(b)
require.NoError(t.T(), err)
}
15 changes: 4 additions & 11 deletions internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ import (
"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/fuse/fuseutil"
"github.com/jacobsa/timeutil"
"golang.org/x/sync/semaphore"
)

type ServerConfig struct {
Expand Down Expand Up @@ -194,7 +193,6 @@ func NewFileSystem(ctx context.Context, serverCfg *ServerConfig) (fuseutil.FileS
newConfig: serverCfg.NewConfig,
fileCacheHandler: fileCacheHandler,
cacheFileForRangeRead: serverCfg.NewConfig.FileCache.CacheFileForRangeRead,
globalMaxBlocksSem: semaphore.NewWeighted(serverCfg.NewConfig.Write.GlobalMaxBlocks),
metricHandle: serverCfg.MetricHandle,
}

Expand Down Expand Up @@ -485,8 +483,6 @@ type fileSystem struct {
// random file access.
cacheFileForRangeRead bool

globalMaxBlocksSem *semaphore.Weighted

metricHandle common.MetricHandle
}

Expand Down Expand Up @@ -808,8 +804,7 @@ func (fs *fileSystem) mintInode(ic inode.Core) (in inode.Inode) {
fs.contentCache,
fs.mtimeClock,
ic.Local,
&fs.newConfig.Write,
fs.globalMaxBlocksSem)
fs.newConfig)
}

// Place it in our map of IDs to inodes.
Expand Down Expand Up @@ -1661,9 +1656,7 @@ func (fs *fileSystem) createFile(
// LOCKS_EXCLUDED(fs.mu)
// UNLOCK_FUNCTION(fs.mu)
// LOCK_FUNCTION(child)
func (fs *fileSystem) createLocalFile(
parentID fuseops.InodeID,
name string) (child inode.Inode, err error) {
func (fs *fileSystem) createLocalFile(ctx context.Context, parentID fuseops.InodeID, name string) (child inode.Inode, err error) {
// Find the parent.
fs.mu.Lock()
parent := fs.dirInodeOrDie(parentID)
Expand Down Expand Up @@ -1699,7 +1692,7 @@ func (fs *fileSystem) createLocalFile(
fs.localFileInodes[child.Name()] = child
// Empty file is created to be able to set attributes on the file.
fileInode := child.(*inode.FileInode)
if err := fileInode.CreateBufferedOrTempWriter(); err != nil {
if err := fileInode.CreateBufferedOrTempWriter(ctx); err != nil {
return nil, err
}
fs.mu.Unlock()
Expand Down Expand Up @@ -1729,7 +1722,7 @@ func (fs *fileSystem) CreateFile(
if fs.newConfig.Write.CreateEmptyFile {
child, err = fs.createFile(ctx, op.Parent, op.Name, op.Mode)
} else {
child, err = fs.createLocalFile(op.Parent, op.Name)
child, err = fs.createLocalFile(ctx, op.Parent, op.Name)
}

if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions internal/fs/handle/dir_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/jacobsa/fuse/fuseutil"
. "github.com/jacobsa/ogletest"
"github.com/jacobsa/timeutil"
"golang.org/x/sync/semaphore"
)

func TestDirHandle(t *testing.T) { RunTests(t) }
Expand Down Expand Up @@ -106,8 +105,7 @@ func (t *DirHandleTest) createLocalFileInode(name string, id fuseops.InodeID) (i
contentcache.New("", &t.clock),
&t.clock,
true, // localFile
&cfg.WriteConfig{},
semaphore.NewWeighted(math.MaxInt64))
&cfg.Config{Write: cfg.WriteConfig{GlobalMaxBlocks: math.MaxInt64}})
return
}

Expand Down
7 changes: 2 additions & 5 deletions internal/fs/inode/dir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import (
"time"

"github.com/googlecloudplatform/gcsfuse/v2/cfg"
"github.com/googlecloudplatform/gcsfuse/v2/internal/util"
"golang.org/x/sync/semaphore"

"github.com/googlecloudplatform/gcsfuse/v2/internal/cache/metadata"
"github.com/googlecloudplatform/gcsfuse/v2/internal/contentcache"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/googlecloudplatform/gcsfuse/v2/internal/util"
"golang.org/x/net/context"

"github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx"
Expand Down Expand Up @@ -208,8 +206,7 @@ func (t *DirTest) createLocalFileInode(parent Name, name string, id fuseops.Inod
contentcache.New("", &t.clock),
&t.clock,
true, //localFile
&cfg.WriteConfig{},
semaphore.NewWeighted(math.MaxInt64))
&cfg.Config{Write: cfg.WriteConfig{GlobalMaxBlocks: math.MaxInt64}})
return
}

Expand Down
Loading

0 comments on commit 9223376

Please sign in to comment.