From 92233762e77fd132c8aa17c55c43a46495d24719 Mon Sep 17 00:00:00 2001 From: Ashmeen Kaur <57195160+ashmeenkaur@users.noreply.github.com> Date: Tue, 17 Dec 2024 15:41:26 +0530 Subject: [PATCH] Allow upload handler to create chunk writer for empty synced object (#2797) * allow buffered writes to create writer for empty object * review comments * rename ResolveCreateObjectRequest to NewCreateObjectRequest * fix flaky test --- .../bufferedwrites/buffered_write_handler.go | 48 +++++--- .../buffered_write_handler_test.go | 13 ++- internal/bufferedwrites/upload_handler.go | 45 +++++--- .../bufferedwrites/upload_handler_test.go | 72 +++++++++++- internal/fs/fs.go | 15 +-- internal/fs/handle/dir_handle_test.go | 4 +- internal/fs/inode/dir_test.go | 7 +- internal/fs/inode/file.go | 101 ++++++++++------- internal/fs/inode/file_test.go | 67 ++++++----- internal/fs/local_modifications_test.go | 5 +- internal/gcsx/append_object_creator.go | 14 +-- internal/gcsx/syncer.go | 46 +------- internal/storage/gcs/request_helper.go | 64 +++++++++++ internal/storage/gcs/request_helper_test.go | 105 ++++++++++++++++++ .../operations/write_test.go | 5 +- 15 files changed, 420 insertions(+), 191 deletions(-) create mode 100644 internal/storage/gcs/request_helper.go create mode 100644 internal/storage/gcs/request_helper_test.go diff --git a/internal/bufferedwrites/buffered_write_handler.go b/internal/bufferedwrites/buffered_write_handler.go index 3a2ecfe2ca..97a9a77851 100644 --- a/internal/bufferedwrites/buffered_write_handler.go +++ b/internal/bufferedwrites/buffered_write_handler.go @@ -59,19 +59,37 @@ type WriteFileInfo struct { var ErrOutOfOrderWrite = errors.New("outOfOrder write detected") var ErrUploadFailure = errors.New("error while uploading object to GCS") +type CreateBWHandlerRequest struct { + Object *gcs.Object + ObjectName string + Bucket gcs.Bucket + BlockSize int64 + MaxBlocksPerFile int64 + GlobalMaxBlocksSem *semaphore.Weighted + ChunkTransferTimeoutSecs int64 +} + // NewBWHandler creates the bufferedWriteHandler struct. -func NewBWHandler(objectName string, bucket gcs.Bucket, blockSize int64, maxBlocks int64, globalMaxBlocksSem *semaphore.Weighted) (bwh *BufferedWriteHandler, err error) { - bp, err := block.NewBlockPool(blockSize, maxBlocks, globalMaxBlocksSem) +func NewBWHandler(req *CreateBWHandlerRequest) (bwh *BufferedWriteHandler, err error) { + bp, err := block.NewBlockPool(req.BlockSize, req.MaxBlocksPerFile, req.GlobalMaxBlocksSem) if err != nil { return } bwh = &BufferedWriteHandler{ - current: nil, - blockPool: bp, - uploadHandler: newUploadHandler(objectName, bucket, maxBlocks, bp.FreeBlocksChannel(), blockSize), - totalSize: 0, - mtime: time.Now(), + current: nil, + blockPool: bp, + uploadHandler: newUploadHandler(&CreateUploadHandlerRequest{ + Object: req.Object, + ObjectName: req.ObjectName, + Bucket: req.Bucket, + FreeBlocksCh: bp.FreeBlocksChannel(), + MaxBlocksPerFile: req.MaxBlocksPerFile, + BlockSize: req.BlockSize, + ChunkTransferTimeoutSecs: req.ChunkTransferTimeoutSecs, + }), + totalSize: 0, + mtime: time.Now(), } return } @@ -79,6 +97,14 @@ func NewBWHandler(objectName string, bucket gcs.Bucket, blockSize int64, maxBloc // Write writes the given data to the buffer. It writes to an existing buffer if // the capacity is available otherwise writes to a new buffer. func (wh *BufferedWriteHandler) Write(data []byte, offset int64) (err error) { + // Fail early if the uploadHandler has failed. + select { + case <-wh.uploadHandler.SignalUploadFailure(): + return ErrUploadFailure + default: + break + } + if offset != wh.totalSize && offset != wh.truncatedSize { logger.Errorf("BufferedWriteHandler.OutOfOrderError for object: %s, expectedOffset: %d, actualOffset: %d", wh.uploadHandler.objectName, wh.totalSize, offset) @@ -93,14 +119,6 @@ func (wh *BufferedWriteHandler) Write(data []byte, offset int64) (err error) { } } - // Fail early if the uploadHandler has failed. - select { - case <-wh.uploadHandler.SignalUploadFailure(): - return ErrUploadFailure - default: - break - } - return wh.appendBuffer(data) } diff --git a/internal/bufferedwrites/buffered_write_handler_test.go b/internal/bufferedwrites/buffered_write_handler_test.go index 0c814e995c..2498113dd4 100644 --- a/internal/bufferedwrites/buffered_write_handler_test.go +++ b/internal/bufferedwrites/buffered_write_handler_test.go @@ -29,6 +29,8 @@ import ( "golang.org/x/sync/semaphore" ) +const chunkTransferTimeoutSecs int64 = 10 + type BufferedWriteTest struct { bwh *BufferedWriteHandler suite.Suite @@ -40,7 +42,15 @@ func TestBufferedWriteTestSuite(t *testing.T) { func (testSuite *BufferedWriteTest) SetupTest() { bucket := fake.NewFakeBucket(timeutil.RealClock(), "FakeBucketName", gcs.NonHierarchical) - bwh, err := NewBWHandler("testObject", bucket, blockSize, 10, semaphore.NewWeighted(10)) + bwh, err := NewBWHandler(&CreateBWHandlerRequest{ + Object: nil, + ObjectName: "testObject", + Bucket: bucket, + BlockSize: blockSize, + MaxBlocksPerFile: 10, + GlobalMaxBlocksSem: semaphore.NewWeighted(10), + ChunkTransferTimeoutSecs: chunkTransferTimeoutSecs, + }) require.Nil(testSuite.T(), err) testSuite.bwh = bwh } @@ -236,6 +246,7 @@ func (testSuite *BufferedWriteTest) TestFlushWithMultiBlockWritesAndSignalUpload for i := 0; i < 5; i++ { err := testSuite.bwh.Write(buffer, int64(blockSize*(i+5))) require.Error(testSuite.T(), err) + assert.Equal(testSuite.T(), ErrUploadFailure, err) } obj, err := testSuite.bwh.Flush() diff --git a/internal/bufferedwrites/upload_handler.go b/internal/bufferedwrites/upload_handler.go index a539bd7896..651ba59e32 100644 --- a/internal/bufferedwrites/upload_handler.go +++ b/internal/bufferedwrites/upload_handler.go @@ -45,21 +45,35 @@ type UploadHandler struct { signalUploadFailure chan error // Parameters required for creating a new GCS chunk writer. - bucket gcs.Bucket - objectName string - blockSize int64 + bucket gcs.Bucket + objectName string + obj *gcs.Object + chunkTransferTimeout int64 + blockSize int64 +} + +type CreateUploadHandlerRequest struct { + Object *gcs.Object + ObjectName string + Bucket gcs.Bucket + FreeBlocksCh chan block.Block + MaxBlocksPerFile int64 + BlockSize int64 + ChunkTransferTimeoutSecs int64 } // newUploadHandler creates the UploadHandler struct. -func newUploadHandler(objectName string, bucket gcs.Bucket, maxBlocks int64, freeBlocksCh chan block.Block, blockSize int64) *UploadHandler { +func newUploadHandler(req *CreateUploadHandlerRequest) *UploadHandler { uh := &UploadHandler{ - uploadCh: make(chan block.Block, maxBlocks), - wg: sync.WaitGroup{}, - freeBlocksCh: freeBlocksCh, - bucket: bucket, - objectName: objectName, - blockSize: blockSize, - signalUploadFailure: make(chan error, 1), + uploadCh: make(chan block.Block, req.MaxBlocksPerFile), + wg: sync.WaitGroup{}, + freeBlocksCh: req.FreeBlocksCh, + bucket: req.Bucket, + objectName: req.ObjectName, + obj: req.Object, + blockSize: req.BlockSize, + signalUploadFailure: make(chan error, 1), + chunkTransferTimeout: req.ChunkTransferTimeoutSecs, } return uh } @@ -86,12 +100,7 @@ func (uh *UploadHandler) Upload(block block.Block) error { // createObjectWriter creates a GCS object writer. func (uh *UploadHandler) createObjectWriter() (err error) { - var preCond int64 - req := &gcs.CreateObjectRequest{ - Name: uh.objectName, - GenerationPrecondition: &preCond, - Metadata: make(map[string]string), - } + req := gcs.NewCreateObjectRequest(uh.obj, uh.objectName, nil, uh.chunkTransferTimeout) // We need a new context here, since the first writeFile() call will be complete // (and context will be cancelled) by the time complete upload is done. uh.writer, err = uh.bucket.CreateObjectChunkWriter(context.Background(), req, int(uh.blockSize), nil) @@ -111,9 +120,9 @@ func (uh *UploadHandler) uploader() { close(uh.signalUploadFailure) } } - uh.wg.Done() // Put back the uploaded block on the freeBlocksChannel for re-use. uh.freeBlocksCh <- currBlock + uh.wg.Done() } } diff --git a/internal/bufferedwrites/upload_handler_test.go b/internal/bufferedwrites/upload_handler_test.go index f068e82574..3f28e4d382 100644 --- a/internal/bufferedwrites/upload_handler_test.go +++ b/internal/bufferedwrites/upload_handler_test.go @@ -52,7 +52,15 @@ func (t *UploadHandlerTest) SetupTest() { var err error t.blockPool, err = block.NewBlockPool(blockSize, maxBlocks, semaphore.NewWeighted(maxBlocks)) require.NoError(t.T(), err) - t.uh = newUploadHandler("testObject", t.mockBucket, maxBlocks, t.blockPool.FreeBlocksChannel(), blockSize) + t.uh = newUploadHandler(&CreateUploadHandlerRequest{ + Object: nil, + ObjectName: "testObject", + Bucket: t.mockBucket, + FreeBlocksCh: t.blockPool.FreeBlocksChannel(), + MaxBlocksPerFile: maxBlocks, + BlockSize: blockSize, + ChunkTransferTimeoutSecs: chunkTransferTimeoutSecs, + }) } func (t *UploadHandlerTest) TestMultipleBlockUpload() { @@ -270,3 +278,65 @@ func (t *UploadHandlerTest) TestMultipleBlockAwaitBlocksUpload() { assert.Equal(t.T(), 0, len(t.uh.uploadCh)) assertAllBlocksProcessed(t.T(), t.uh) } + +func (t *UploadHandlerTest) TestCreateObjectChunkWriterIsCalledWithCorrectRequestParametersForEmptyGCSObject() { + t.uh.obj = &gcs.Object{ + Name: t.uh.objectName, + ContentType: "image/png", + Size: 0, + ContentEncoding: "gzip", + Generation: 10, + MetaGeneration: 20, + Acl: nil, + } + + // CreateObjectChunkWriter -- should be called once with correct request parameters. + writer := &storagemock.Writer{} + mockObj := &gcs.Object{} + t.mockBucket.On("CreateObjectChunkWriter", + mock.Anything, + mock.MatchedBy(func(req *gcs.CreateObjectRequest) bool { + return req.Name == t.uh.objectName && + *req.GenerationPrecondition == t.uh.obj.Generation && + *req.MetaGenerationPrecondition == t.uh.obj.MetaGeneration && + req.ContentEncoding == t.uh.obj.ContentEncoding && + req.ContentType == t.uh.obj.ContentType && + req.ChunkTransferTimeoutSecs == chunkTransferTimeoutSecs + }), + mock.Anything, + mock.Anything).Return(writer, nil) + t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, nil) + + // Create a block. + b, err := t.blockPool.Get() + require.NoError(t.T(), err) + // Upload the block. + err = t.uh.Upload(b) + require.NoError(t.T(), err) +} + +func (t *UploadHandlerTest) TestCreateObjectChunkWriterIsCalledWithCorrectRequestParametersForLocalInode() { + assert.Nil(t.T(), t.uh.obj) + + // CreateObjectChunkWriter -- should be called once with correct request parameters. + writer := &storagemock.Writer{} + mockObj := &gcs.Object{} + t.mockBucket.On("CreateObjectChunkWriter", + mock.Anything, + mock.MatchedBy(func(req *gcs.CreateObjectRequest) bool { + return req.Name == t.uh.objectName && + *req.GenerationPrecondition == 0 && + req.MetaGenerationPrecondition == nil && + req.ChunkTransferTimeoutSecs == chunkTransferTimeoutSecs + }), + mock.Anything, + mock.Anything).Return(writer, nil) + t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, nil) + + // Create a block. + b, err := t.blockPool.Get() + require.NoError(t.T(), err) + // Upload the block. + err = t.uh.Upload(b) + require.NoError(t.T(), err) +} diff --git a/internal/fs/fs.go b/internal/fs/fs.go index 92828588bf..922bb0c65e 100644 --- a/internal/fs/fs.go +++ b/internal/fs/fs.go @@ -48,7 +48,6 @@ import ( "github.com/jacobsa/fuse/fuseops" "github.com/jacobsa/fuse/fuseutil" "github.com/jacobsa/timeutil" - "golang.org/x/sync/semaphore" ) type ServerConfig struct { @@ -194,7 +193,6 @@ func NewFileSystem(ctx context.Context, serverCfg *ServerConfig) (fuseutil.FileS newConfig: serverCfg.NewConfig, fileCacheHandler: fileCacheHandler, cacheFileForRangeRead: serverCfg.NewConfig.FileCache.CacheFileForRangeRead, - globalMaxBlocksSem: semaphore.NewWeighted(serverCfg.NewConfig.Write.GlobalMaxBlocks), metricHandle: serverCfg.MetricHandle, } @@ -485,8 +483,6 @@ type fileSystem struct { // random file access. cacheFileForRangeRead bool - globalMaxBlocksSem *semaphore.Weighted - metricHandle common.MetricHandle } @@ -808,8 +804,7 @@ func (fs *fileSystem) mintInode(ic inode.Core) (in inode.Inode) { fs.contentCache, fs.mtimeClock, ic.Local, - &fs.newConfig.Write, - fs.globalMaxBlocksSem) + fs.newConfig) } // Place it in our map of IDs to inodes. @@ -1661,9 +1656,7 @@ func (fs *fileSystem) createFile( // LOCKS_EXCLUDED(fs.mu) // UNLOCK_FUNCTION(fs.mu) // LOCK_FUNCTION(child) -func (fs *fileSystem) createLocalFile( - parentID fuseops.InodeID, - name string) (child inode.Inode, err error) { +func (fs *fileSystem) createLocalFile(ctx context.Context, parentID fuseops.InodeID, name string) (child inode.Inode, err error) { // Find the parent. fs.mu.Lock() parent := fs.dirInodeOrDie(parentID) @@ -1699,7 +1692,7 @@ func (fs *fileSystem) createLocalFile( fs.localFileInodes[child.Name()] = child // Empty file is created to be able to set attributes on the file. fileInode := child.(*inode.FileInode) - if err := fileInode.CreateBufferedOrTempWriter(); err != nil { + if err := fileInode.CreateBufferedOrTempWriter(ctx); err != nil { return nil, err } fs.mu.Unlock() @@ -1729,7 +1722,7 @@ func (fs *fileSystem) CreateFile( if fs.newConfig.Write.CreateEmptyFile { child, err = fs.createFile(ctx, op.Parent, op.Name, op.Mode) } else { - child, err = fs.createLocalFile(op.Parent, op.Name) + child, err = fs.createLocalFile(ctx, op.Parent, op.Name) } if err != nil { diff --git a/internal/fs/handle/dir_handle_test.go b/internal/fs/handle/dir_handle_test.go index 448b46a479..e9d0b46bec 100644 --- a/internal/fs/handle/dir_handle_test.go +++ b/internal/fs/handle/dir_handle_test.go @@ -31,7 +31,6 @@ import ( "github.com/jacobsa/fuse/fuseutil" . "github.com/jacobsa/ogletest" "github.com/jacobsa/timeutil" - "golang.org/x/sync/semaphore" ) func TestDirHandle(t *testing.T) { RunTests(t) } @@ -106,8 +105,7 @@ func (t *DirHandleTest) createLocalFileInode(name string, id fuseops.InodeID) (i contentcache.New("", &t.clock), &t.clock, true, // localFile - &cfg.WriteConfig{}, - semaphore.NewWeighted(math.MaxInt64)) + &cfg.Config{Write: cfg.WriteConfig{GlobalMaxBlocks: math.MaxInt64}}) return } diff --git a/internal/fs/inode/dir_test.go b/internal/fs/inode/dir_test.go index 16b67e5652..6f966ed144 100644 --- a/internal/fs/inode/dir_test.go +++ b/internal/fs/inode/dir_test.go @@ -24,14 +24,12 @@ import ( "time" "github.com/googlecloudplatform/gcsfuse/v2/cfg" - "github.com/googlecloudplatform/gcsfuse/v2/internal/util" - "golang.org/x/sync/semaphore" - "github.com/googlecloudplatform/gcsfuse/v2/internal/cache/metadata" "github.com/googlecloudplatform/gcsfuse/v2/internal/contentcache" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil" + "github.com/googlecloudplatform/gcsfuse/v2/internal/util" "golang.org/x/net/context" "github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx" @@ -208,8 +206,7 @@ func (t *DirTest) createLocalFileInode(parent Name, name string, id fuseops.Inod contentcache.New("", &t.clock), &t.clock, true, //localFile - &cfg.WriteConfig{}, - semaphore.NewWeighted(math.MaxInt64)) + &cfg.Config{Write: cfg.WriteConfig{GlobalMaxBlocks: math.MaxInt64}}) return } diff --git a/internal/fs/inode/file.go b/internal/fs/inode/file.go index fe0a0394f8..71a219eb80 100644 --- a/internal/fs/inode/file.go +++ b/internal/fs/inode/file.go @@ -38,7 +38,7 @@ import ( // A GCS object metadata key for file mtimes. mtimes are UTC, and are stored in // the format defined by time.RFC3339Nano. -const FileMtimeMetadataKey = gcsx.MtimeMetadataKey +const FileMtimeMetadataKey = gcs.MtimeMetadataKey type FileInode struct { ///////////////////////// @@ -93,9 +93,8 @@ type FileInode struct { // Represents if local file has been unlinked. unlinked bool - bwh *bufferedwrites.BufferedWriteHandler - writeConfig *cfg.WriteConfig - globalMaxBlocksSem *semaphore.Weighted + bwh *bufferedwrites.BufferedWriteHandler + config *cfg.Config } var _ Inode = &FileInode{} @@ -118,26 +117,24 @@ func NewFileInode( contentCache *contentcache.ContentCache, mtimeClock timeutil.Clock, localFile bool, - writeConfig *cfg.WriteConfig, - globalMaxBlocksSem *semaphore.Weighted) (f *FileInode) { + cfg *cfg.Config) (f *FileInode) { // Set up the basic struct. var minObj gcs.MinObject if m != nil { minObj = *m } f = &FileInode{ - bucket: bucket, - mtimeClock: mtimeClock, - id: id, - name: name, - attrs: attrs, - localFileCache: localFileCache, - contentCache: contentCache, - src: minObj, - local: localFile, - unlinked: false, - writeConfig: writeConfig, - globalMaxBlocksSem: globalMaxBlocksSem, + bucket: bucket, + mtimeClock: mtimeClock, + id: id, + name: name, + attrs: attrs, + localFileCache: localFileCache, + contentCache: contentCache, + src: minObj, + local: localFile, + unlinked: false, + config: cfg, } f.lc.Init(id) @@ -505,8 +502,8 @@ func (f *FileInode) Write( data []byte, offset int64) (err error) { // For empty GCS files also we will trigger bufferedWrites flow. - if f.src.Size == 0 && f.writeConfig.ExperimentalEnableStreamingWrites { - err = f.ensureBufferedWriteHandler() + if f.src.Size == 0 && f.config.Write.ExperimentalEnableStreamingWrites { + err = f.ensureBufferedWriteHandler(ctx) if err != nil { return } @@ -613,20 +610,7 @@ func (f *FileInode) SetMtime( return } -// Sync writes out contents to GCS. If this fails due to the generation having been -// clobbered, failure is propagated back to the calling function as an error. -// -// After this method succeeds, SourceGeneration will return the new generation -// by which this inode should be known (which may be the same as before). If it -// fails, the generation will not change. -// -// LOCKS_REQUIRED(f.mu) -func (f *FileInode) Sync(ctx context.Context) (err error) { - // If we have not been dirtied, there is nothing to do. - if f.content == nil { - return - } - +func (f *FileInode) fetchLatestGcsObject(ctx context.Context) (*gcs.Object, error) { // When listObjects call is made, we fetch data with projection set as noAcl // which means acls and owner properties are not returned. So the f.src object // here will not have acl information even though there are acls present on @@ -636,13 +620,30 @@ func (f *FileInode) Sync(ctx context.Context) (err error) { // default sets the projection to full, which fetches all the object // properties. latestGcsObj, isClobbered, err := f.clobbered(ctx, true, true) - if isClobbered { err = &gcsfuse_errors.FileClobberedError{ Err: err, } } + return latestGcsObj, err +} + +// Sync writes out contents to GCS. If this fails due to the generation having been +// clobbered, failure is propagated back to the calling function as an error. +// +// After this method succeeds, SourceGeneration will return the new generation +// by which this inode should be known (which may be the same as before). If it +// fails, the generation will not change. +// +// LOCKS_REQUIRED(f.mu) +func (f *FileInode) Sync(ctx context.Context) (err error) { + // If we have not been dirtied, there is nothing to do. + if f.content == nil { + return + } + + latestGcsObj, err := f.fetchLatestGcsObject(ctx) if err != nil { return } @@ -692,8 +693,8 @@ func (f *FileInode) Truncate( ctx context.Context, size int64) (err error) { // For empty GCS files also, we will trigger bufferedWrites flow. - if f.src.Size == 0 && f.writeConfig.ExperimentalEnableStreamingWrites { - err = f.ensureBufferedWriteHandler() + if f.src.Size == 0 && f.config.Write.ExperimentalEnableStreamingWrites { + err = f.ensureBufferedWriteHandler(ctx) if err != nil { return } @@ -725,10 +726,10 @@ func (f *FileInode) CacheEnsureContent(ctx context.Context) (err error) { return } -func (f *FileInode) CreateBufferedOrTempWriter() (err error) { +func (f *FileInode) CreateBufferedOrTempWriter(ctx context.Context) (err error) { // Skip creating empty file when streaming writes are enabled - if f.local && f.writeConfig.ExperimentalEnableStreamingWrites { - err = f.ensureBufferedWriteHandler() + if f.local && f.config.Write.ExperimentalEnableStreamingWrites { + err = f.ensureBufferedWriteHandler(ctx) if err != nil { return } @@ -743,10 +744,26 @@ func (f *FileInode) CreateBufferedOrTempWriter() (err error) { return } -func (f *FileInode) ensureBufferedWriteHandler() error { +func (f *FileInode) ensureBufferedWriteHandler(ctx context.Context) error { var err error + var latestGcsObj *gcs.Object + if !f.local { + latestGcsObj, err = f.fetchLatestGcsObject(ctx) + if err != nil { + return err + } + } + if f.bwh == nil { - f.bwh, err = bufferedwrites.NewBWHandler(f.name.GcsObjectName(), f.bucket, f.writeConfig.BlockSizeMb, f.writeConfig.MaxBlocksPerFile, f.globalMaxBlocksSem) + f.bwh, err = bufferedwrites.NewBWHandler(&bufferedwrites.CreateBWHandlerRequest{ + Object: latestGcsObj, + ObjectName: f.name.GcsObjectName(), + Bucket: f.bucket, + BlockSize: f.config.Write.BlockSizeMb, + MaxBlocksPerFile: f.config.Write.MaxBlocksPerFile, + GlobalMaxBlocksSem: semaphore.NewWeighted(f.config.Write.GlobalMaxBlocks), + ChunkTransferTimeoutSecs: f.config.GcsRetries.ChunkTransferTimeoutSecs, + }) if err != nil { return fmt.Errorf("failed to create bufferedWriteHandler: %w", err) } diff --git a/internal/fs/inode/file_test.go b/internal/fs/inode/file_test.go index 731b14e32b..3c7f69985c 100644 --- a/internal/fs/inode/file_test.go +++ b/internal/fs/inode/file_test.go @@ -26,20 +26,18 @@ import ( "time" "github.com/googlecloudplatform/gcsfuse/v2/cfg" + "github.com/googlecloudplatform/gcsfuse/v2/internal/contentcache" "github.com/googlecloudplatform/gcsfuse/v2/internal/fs/gcsfuse_errors" + "github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/fake" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil" + "github.com/jacobsa/fuse/fuseops" "github.com/jacobsa/syncutil" + "github.com/jacobsa/timeutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "golang.org/x/net/context" - "golang.org/x/sync/semaphore" - - "github.com/googlecloudplatform/gcsfuse/v2/internal/contentcache" - "github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx" - "github.com/jacobsa/fuse/fuseops" - "github.com/jacobsa/timeutil" ) //////////////////////////////////////////////////////////////////////// @@ -145,8 +143,7 @@ func (t *FileTest) createInodeWithLocalParam(fileName string, local bool) { contentcache.New("", &t.clock), &t.clock, local, - &cfg.WriteConfig{}, - semaphore.NewWeighted(math.MaxInt64)) + &cfg.Config{Write: cfg.WriteConfig{GlobalMaxBlocks: math.MaxInt64}}) t.in.Lock() } @@ -415,7 +412,7 @@ func (t *FileTest) TestWriteToLocalFileThenSync() { // Create a local file inode. t.createInodeWithLocalParam("test", true) // Create a temp file for the local inode created above. - err = t.in.CreateBufferedOrTempWriter() + err = t.in.CreateBufferedOrTempWriter(t.ctx) assert.Nil(t.T(), err) // Write some content to temp file. t.clock.AdvanceTime(time.Second) @@ -459,7 +456,7 @@ func (t *FileTest) TestSyncEmptyLocalFile() { t.createInodeWithLocalParam("test", true) creationTime := t.clock.Now() // Create a temp file for the local inode created above. - err = t.in.CreateBufferedOrTempWriter() + err = t.in.CreateBufferedOrTempWriter(t.ctx) assert.Nil(t.T(), err) // Sync. @@ -629,7 +626,7 @@ func (t *FileTest) TestTruncateUpwardForLocalFileShouldUpdateLocalFileAttributes var attrs fuseops.InodeAttributes // Create a local file inode. t.createInodeWithLocalParam("test", true) - err = t.in.CreateBufferedOrTempWriter() + err = t.in.CreateBufferedOrTempWriter(t.ctx) assert.Nil(t.T(), err) // Fetch the attributes and check if the file is empty. attrs, err = t.in.Attributes(t.ctx) @@ -655,7 +652,7 @@ func (t *FileTest) TestTruncateDownwardForLocalFileShouldUpdateLocalFileAttribut var attrs fuseops.InodeAttributes // Create a local file inode. t.createInodeWithLocalParam("test", true) - err = t.in.CreateBufferedOrTempWriter() + err = t.in.CreateBufferedOrTempWriter(t.ctx) assert.Nil(t.T(), err) // Write some data to the local file. err = t.in.Write(t.ctx, []byte("burrito"), 0) @@ -697,8 +694,8 @@ func (t *FileTest) TestTruncateUpwardForLocalFileWhenStreamingWritesAreEnabled() t.Run(tc.name, func() { // Create a local file inode. t.createInodeWithLocalParam("test", true) - t.in.writeConfig = getWriteConfig() - err := t.in.CreateBufferedOrTempWriter() + t.in.config = &cfg.Config{Write: *getWriteConfig()} + err := t.in.CreateBufferedOrTempWriter(t.ctx) assert.Nil(t.T(), err) assert.NotNil(t.T(), t.in.bwh) @@ -750,7 +747,7 @@ func (t *FileTest) TestTruncateUpwardForEmptyGCSFileWhenStreamingWritesAreEnable for _, tc := range tbl { t.Run(tc.name, func() { t.createInodeWithEmptyObject() - t.in.writeConfig = getWriteConfig() + t.in.config = &cfg.Config{Write: *getWriteConfig()} assert.Nil(t.T(), t.in.bwh) // Fetch the attributes and check if the file is empty. attrs, err := t.in.Attributes(t.ctx) @@ -823,7 +820,7 @@ func (t *FileTest) TestTruncateDownwardWhenStreamingWritesAreEnabled() { if tc.fileType == EmptyGCSFile { t.createInodeWithEmptyObject() } - t.in.writeConfig = getWriteConfig() + t.in.config = &cfg.Config{Write: *getWriteConfig()} assert.Nil(t.T(), t.in.bwh) // Fetch the attributes and check if the file is empty. attrs, err := t.in.Attributes(t.ctx) @@ -1057,7 +1054,7 @@ func (t *FileTest) TestTestSetMtimeForLocalFileShouldUpdateLocalFileAttributes() // Create a local file inode. t.createInodeWithLocalParam("test", true) createTime := t.in.mtimeClock.Now() - err = t.in.CreateBufferedOrTempWriter() + err = t.in.CreateBufferedOrTempWriter(t.ctx) assert.Nil(t.T(), err) // Validate the attributes on an empty file. attrs, err = t.in.Attributes(t.ctx) @@ -1087,8 +1084,8 @@ func (t *FileTest) TestSetMtimeForLocalFileWhenStreamingWritesAreEnabled() { var attrs fuseops.InodeAttributes // Create a local file inode. t.createInodeWithLocalParam("test", true) - t.in.writeConfig = getWriteConfig() - err = t.in.CreateBufferedOrTempWriter() + t.in.config = &cfg.Config{Write: *getWriteConfig()} + err = t.in.CreateBufferedOrTempWriter(t.ctx) assert.Nil(t.T(), err) // Set mtime. @@ -1144,7 +1141,7 @@ func (t *FileTest) TestTestCheckInvariantsShouldNotThrowExceptionForLocalFiles() } func (t *FileTest) TestCreateBufferedOrTempWriterShouldCreateEmptyFile() { - err := t.in.CreateBufferedOrTempWriter() + err := t.in.CreateBufferedOrTempWriter(t.ctx) assert.Nil(t.T(), err) assert.NotNil(t.T(), t.in.content) @@ -1156,9 +1153,9 @@ func (t *FileTest) TestCreateBufferedOrTempWriterShouldCreateEmptyFile() { func (t *FileTest) TestCreateBufferedOrTempWriterShouldNotCreateFileWhenStreamingWritesAreEnabled() { t.createInodeWithLocalParam("test", true) - t.in.writeConfig = getWriteConfig() + t.in.config = &cfg.Config{Write: *getWriteConfig()} - err := t.in.CreateBufferedOrTempWriter() + err := t.in.CreateBufferedOrTempWriter(t.ctx) assert.Nil(t.T(), err) assert.Nil(t.T(), t.in.content) @@ -1167,9 +1164,9 @@ func (t *FileTest) TestCreateBufferedOrTempWriterShouldNotCreateFileWhenStreamin func (t *FileTest) TestCreateBufferedOrTempWriterShouldCreateFileForNonLocalFilesForStreamingWrites() { // Enabling buffered writes. - t.in.writeConfig = getWriteConfig() + t.in.config = &cfg.Config{Write: *getWriteConfig()} - err := t.in.CreateBufferedOrTempWriter() + err := t.in.CreateBufferedOrTempWriter(t.ctx) assert.Nil(t.T(), err) assert.NotNil(t.T(), t.in.content) @@ -1185,7 +1182,7 @@ func (t *FileTest) TestUnlinkLocalFile() { // Create a local file inode. t.createInodeWithLocalParam("test", true) // Create a temp file for the local inode created above. - err = t.in.CreateBufferedOrTempWriter() + err = t.in.CreateBufferedOrTempWriter(t.ctx) assert.Nil(t.T(), err) // Unlink. @@ -1227,15 +1224,15 @@ func (t *FileTest) TestReadFileWhenStreamingWritesAreEnabled() { if tc.fileType == LocalFile { // Create a local file inode. t.createInodeWithLocalParam("test", true) - t.in.writeConfig = getWriteConfig() - err := t.in.CreateBufferedOrTempWriter() + t.in.config = &cfg.Config{Write: *getWriteConfig()} + err := t.in.CreateBufferedOrTempWriter(t.ctx) assert.Nil(t.T(), err) assert.NotNil(t.T(), t.in.bwh) } if tc.fileType == EmptyGCSFile { t.createInodeWithEmptyObject() - t.in.writeConfig = getWriteConfig() + t.in.config = &cfg.Config{Write: *getWriteConfig()} } if tc.performWrite { @@ -1257,7 +1254,7 @@ func (t *FileTest) TestReadFileWhenStreamingWritesAreEnabled() { func (t *FileTest) TestReadEmptyGCSFileWhenStreamingWritesAreNotInProgress() { t.createInodeWithEmptyObject() - t.in.writeConfig = getWriteConfig() + t.in.config = &cfg.Config{Write: *getWriteConfig()} data := make([]byte, 10) n, err := t.in.Read(t.ctx, data, 0) @@ -1269,7 +1266,7 @@ func (t *FileTest) TestReadEmptyGCSFileWhenStreamingWritesAreNotInProgress() { func (t *FileTest) TestWriteToLocalFileWithInvalidConfigWhenStreamingWritesAreEnabled() { // Create a local file inode. t.createInodeWithLocalParam("test", true) - t.in.writeConfig.ExperimentalEnableStreamingWrites = true + t.in.config = &cfg.Config{Write: cfg.WriteConfig{ExperimentalEnableStreamingWrites: true}} assert.Nil(t.T(), t.in.bwh) err := t.in.Write(t.ctx, []byte("hi"), 0) @@ -1281,7 +1278,7 @@ func (t *FileTest) TestWriteToLocalFileWithInvalidConfigWhenStreamingWritesAreEn func (t *FileTest) TestWriteToLocalFileWhenStreamingWritesAreEnabled() { // Create a local file inode. t.createInodeWithLocalParam("test", true) - t.in.writeConfig = getWriteConfig() + t.in.config = &cfg.Config{Write: *getWriteConfig()} assert.Nil(t.T(), t.in.bwh) err := t.in.Write(t.ctx, []byte("hi"), 0) @@ -1296,7 +1293,7 @@ func (t *FileTest) TestMultipleWritesToLocalFileWhenStreamingWritesAreEnabled() // Create a local file inode. t.createInodeWithLocalParam("test", true) createTime := t.in.mtimeClock.Now() - t.in.writeConfig = getWriteConfig() + t.in.config = &cfg.Config{Write: *getWriteConfig()} assert.Nil(t.T(), t.in.bwh) err := t.in.Write(t.ctx, []byte("hi"), 0) @@ -1316,7 +1313,7 @@ func (t *FileTest) TestMultipleWritesToLocalFileWhenStreamingWritesAreEnabled() func (t *FileTest) WriteToEmptyGCSFileWhenStreamingWritesAreEnabled() { t.createInodeWithEmptyObject() - t.in.writeConfig = getWriteConfig() + t.in.config = &cfg.Config{Write: *getWriteConfig()} createTime := t.in.mtimeClock.Now() assert.Nil(t.T(), t.in.bwh) @@ -1335,7 +1332,7 @@ func (t *FileTest) WriteToEmptyGCSFileWhenStreamingWritesAreEnabled() { func (t *FileTest) SetMtimeOnEmptyGCSFileWhenStreamingWritesAreEnabled() { t.createInodeWithEmptyObject() - t.in.writeConfig = getWriteConfig() + t.in.config = &cfg.Config{Write: *getWriteConfig()} assert.Nil(t.T(), t.in.bwh) // This test checks if the mtime is updated to GCS. Since test framework @@ -1347,7 +1344,7 @@ func (t *FileTest) SetMtimeOnEmptyGCSFileWhenStreamingWritesAreEnabled() { func (t *FileTest) SetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnabled() { t.createInodeWithEmptyObject() - t.in.writeConfig = getWriteConfig() + t.in.config = &cfg.Config{Write: *getWriteConfig()} assert.Nil(t.T(), t.in.bwh) // Initiate write call. err := t.in.Write(t.ctx, []byte("hi"), 0) diff --git a/internal/fs/local_modifications_test.go b/internal/fs/local_modifications_test.go index 4de361ace7..a1a1fdf440 100644 --- a/internal/fs/local_modifications_test.go +++ b/internal/fs/local_modifications_test.go @@ -33,7 +33,6 @@ import ( "unicode" "unicode/utf8" - "github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil" "github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/operations" @@ -1538,8 +1537,8 @@ func validateObjectAttributes(extendedAttr1, extendedAttr2 *gcs.ExtendedObjectAt ExpectEq(FileContentsSize, minObject2.Size) ExpectNe(minObject1.Generation, minObject2.Generation) ExpectTrue(minObject1.Updated.Before(minObject2.Updated)) - attr1MTime, _ := time.Parse(time.RFC3339Nano, minObject1.Metadata[gcsx.MtimeMetadataKey]) - attr2MTime, _ := time.Parse(time.RFC3339Nano, minObject2.Metadata[gcsx.MtimeMetadataKey]) + attr1MTime, _ := time.Parse(time.RFC3339Nano, minObject1.Metadata[gcs.MtimeMetadataKey]) + attr2MTime, _ := time.Parse(time.RFC3339Nano, minObject2.Metadata[gcs.MtimeMetadataKey]) ExpectTrue(attr1MTime.Before(attr2MTime)) ExpectEq(minObject1.ContentEncoding, minObject2.ContentEncoding) ExpectNe(nil, minObject1.CRC32C) diff --git a/internal/gcsx/append_object_creator.go b/internal/gcsx/append_object_creator.go index 8aede97eda..ad2782d296 100644 --- a/internal/gcsx/append_object_creator.go +++ b/internal/gcsx/append_object_creator.go @@ -96,15 +96,9 @@ func (oc *appendObjectCreator) Create( } // Create a temporary object containing the additional contents. - var zero int64 - tmp, err := oc.bucket.CreateObject( - ctx, - &gcs.CreateObjectRequest{ - Name: tmpName, - GenerationPrecondition: &zero, - Contents: r, - ChunkTransferTimeoutSecs: chunkTransferTimeoutSecs, - }) + req := gcs.NewCreateObjectRequest(nil, tmpName, nil, chunkTransferTimeoutSecs) + req.Contents = r + tmp, err := oc.bucket.CreateObject(ctx, req) if err != nil { err = fmt.Errorf("CreateObject: %w", err) return @@ -132,7 +126,7 @@ func (oc *appendObjectCreator) Create( } if mtime != nil { - MetadataMap[MtimeMetadataKey] = mtime.UTC().Format(time.RFC3339Nano) + MetadataMap[gcs.MtimeMetadataKey] = mtime.UTC().Format(time.RFC3339Nano) } // Compose the old contents plus the new over the old. diff --git a/internal/gcsx/syncer.go b/internal/gcsx/syncer.go index aa32e05e24..3b37fee257 100644 --- a/internal/gcsx/syncer.go +++ b/internal/gcsx/syncer.go @@ -23,11 +23,6 @@ import ( "golang.org/x/net/context" ) -// MtimeMetadataKey objects are created by Syncer.SyncObject and contain a -// metadata field with this key and with a UTC mtime in the format defined -// by time.RFC3339Nano. -const MtimeMetadataKey = "gcsfuse_mtime" - // Syncer is safe for concurrent access. type Syncer interface { // Given an object record and content that was originally derived from that @@ -88,45 +83,8 @@ func (oc *fullObjectCreator) Create( mtime *time.Time, chunkTransferTimeoutSecs int64, r io.Reader) (o *gcs.Object, err error) { - metadataMap := make(map[string]string) - - var req *gcs.CreateObjectRequest - if srcObject == nil { - var precond int64 - req = &gcs.CreateObjectRequest{ - Name: objectName, - Contents: r, - GenerationPrecondition: &precond, - Metadata: metadataMap, - ChunkTransferTimeoutSecs: chunkTransferTimeoutSecs, - } - } else { - for key, value := range srcObject.Metadata { - metadataMap[key] = value - } - - req = &gcs.CreateObjectRequest{ - Name: srcObject.Name, - GenerationPrecondition: &srcObject.Generation, - MetaGenerationPrecondition: &srcObject.MetaGeneration, - Contents: r, - Metadata: metadataMap, - CacheControl: srcObject.CacheControl, - ContentDisposition: srcObject.ContentDisposition, - ContentEncoding: srcObject.ContentEncoding, - ContentType: srcObject.ContentType, - CustomTime: srcObject.CustomTime, - EventBasedHold: srcObject.EventBasedHold, - StorageClass: srcObject.StorageClass, - ChunkTransferTimeoutSecs: chunkTransferTimeoutSecs, - } - } - - // Any existing mtime value will be overwritten with new value. - if mtime != nil { - metadataMap[MtimeMetadataKey] = mtime.UTC().Format(time.RFC3339Nano) - } - + req := gcs.NewCreateObjectRequest(srcObject, objectName, mtime, chunkTransferTimeoutSecs) + req.Contents = r o, err = oc.bucket.CreateObject(ctx, req) if err != nil { err = fmt.Errorf("CreateObject: %w", err) diff --git a/internal/storage/gcs/request_helper.go b/internal/storage/gcs/request_helper.go new file mode 100644 index 0000000000..09c142c8a5 --- /dev/null +++ b/internal/storage/gcs/request_helper.go @@ -0,0 +1,64 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcs + +import ( + "time" +) + +// MtimeMetadataKey objects are created by Syncer.SyncObject and contain a +// metadata field with this key and with a UTC mtime in the format defined +// by time.RFC3339Nano. +const MtimeMetadataKey = "gcsfuse_mtime" + +func NewCreateObjectRequest(srcObject *Object, objectName string, mtime *time.Time, chunkTransferTimeoutSecs int64) *CreateObjectRequest { + metadataMap := make(map[string]string) + var req *CreateObjectRequest + if srcObject == nil { + var preCond int64 + req = &CreateObjectRequest{ + Name: objectName, + GenerationPrecondition: &preCond, + Metadata: metadataMap, + ChunkTransferTimeoutSecs: chunkTransferTimeoutSecs, + } + } else { + for key, value := range srcObject.Metadata { + metadataMap[key] = value + } + + req = &CreateObjectRequest{ + Name: srcObject.Name, + GenerationPrecondition: &srcObject.Generation, + MetaGenerationPrecondition: &srcObject.MetaGeneration, + Metadata: metadataMap, + CacheControl: srcObject.CacheControl, + ContentDisposition: srcObject.ContentDisposition, + ContentEncoding: srcObject.ContentEncoding, + ContentType: srcObject.ContentType, + CustomTime: srcObject.CustomTime, + EventBasedHold: srcObject.EventBasedHold, + StorageClass: srcObject.StorageClass, + ChunkTransferTimeoutSecs: chunkTransferTimeoutSecs, + } + } + + // Any existing mtime value will be overwritten with new value. + if mtime != nil { + metadataMap[MtimeMetadataKey] = mtime.UTC().Format(time.RFC3339Nano) + } + + return req +} diff --git a/internal/storage/gcs/request_helper_test.go b/internal/storage/gcs/request_helper_test.go new file mode 100644 index 0000000000..8e938ce52c --- /dev/null +++ b/internal/storage/gcs/request_helper_test.go @@ -0,0 +1,105 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcs + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestCreateObjectRequest(t *testing.T) { + now := time.Now() + + tests := []struct { + name string + srcObject *Object + objectName string + mtime *time.Time + chunkTransferTimeoutSecs int64 + expectedRequest *CreateObjectRequest + }{ + { + name: "nil_srcObject", + objectName: "new-object.txt", + mtime: &now, + chunkTransferTimeoutSecs: 30, + expectedRequest: &CreateObjectRequest{ + Name: "new-object.txt", + GenerationPrecondition: &[]int64{0}[0], // Default precondition + Metadata: map[string]string{ + MtimeMetadataKey: now.UTC().Format(time.RFC3339Nano), + }, + ChunkTransferTimeoutSecs: 30, + }, + }, + { + name: "existing_srcObject", + srcObject: &Object{ + Name: "existing-object.txt", + Generation: 12345, + MetaGeneration: 67890, + Metadata: map[string]string{"key1": "value1", "key2": "value2"}, + CacheControl: "public, max-age=3600", + ContentDisposition: "attachment; filename=\"myfile.txt\"", + ContentEncoding: "gzip", + ContentType: "text/plain", + CustomTime: now.Add(-24 * time.Hour).String(), + EventBasedHold: true, + StorageClass: "STANDARD", + }, + mtime: &now, + chunkTransferTimeoutSecs: 60, + expectedRequest: &CreateObjectRequest{ + Name: "existing-object.txt", + GenerationPrecondition: &[]int64{12345}[0], + MetaGenerationPrecondition: &[]int64{67890}[0], + Metadata: map[string]string{ + "key1": "value1", + "key2": "value2", + MtimeMetadataKey: now.UTC().Format(time.RFC3339Nano), + }, + CacheControl: "public, max-age=3600", + ContentDisposition: "attachment; filename=\"myfile.txt\"", + ContentEncoding: "gzip", + ContentType: "text/plain", + CustomTime: now.Add(-24 * time.Hour).String(), + EventBasedHold: true, + StorageClass: "STANDARD", + ChunkTransferTimeoutSecs: 60, + }, + }, + { + name: "nil_mtime_nil_srcObject", + objectName: "no-mtime.txt", + chunkTransferTimeoutSecs: 30, + expectedRequest: &CreateObjectRequest{ + Name: "no-mtime.txt", + GenerationPrecondition: &[]int64{0}[0], + Metadata: map[string]string{}, + ChunkTransferTimeoutSecs: 30, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := NewCreateObjectRequest(tt.srcObject, tt.objectName, tt.mtime, tt.chunkTransferTimeoutSecs) + + assert.Equal(t, tt.expectedRequest, req) + }) + } +} diff --git a/tools/integration_tests/operations/write_test.go b/tools/integration_tests/operations/write_test.go index 84a9f39e16..2e72ba3e65 100644 --- a/tools/integration_tests/operations/write_test.go +++ b/tools/integration_tests/operations/write_test.go @@ -25,7 +25,6 @@ import ( "time" "cloud.google.com/go/storage" - "github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil" "github.com/googlecloudplatform/gcsfuse/v2/tools/integration_tests/util/client" @@ -104,8 +103,8 @@ func validateObjectAttributes(attr1, attr2 *storage.ObjectAttrs, t *testing.T) { if attr1.StorageClass != storageClass || attr2.StorageClass != storageClass { t.Errorf("Expected storage class ") } - attr1MTime, _ := time.Parse(time.RFC3339Nano, attr1.Metadata[gcsx.MtimeMetadataKey]) - attr2MTime, _ := time.Parse(time.RFC3339Nano, attr2.Metadata[gcsx.MtimeMetadataKey]) + attr1MTime, _ := time.Parse(time.RFC3339Nano, attr1.Metadata[gcs.MtimeMetadataKey]) + attr2MTime, _ := time.Parse(time.RFC3339Nano, attr2.Metadata[gcs.MtimeMetadataKey]) if attr2MTime.Before(attr1MTime) { t.Errorf("Unexpected MTime received. After operation1: %v, After operation2: %v", attr1MTime, attr2MTime) }