diff --git a/internal/fs/fs.go b/internal/fs/fs.go index 922bb0c65e..c1cafc1524 100644 --- a/internal/fs/fs.go +++ b/internal/fs/fs.go @@ -1737,7 +1737,8 @@ func (fs *fileSystem) CreateFile( handleID := fs.nextHandleID fs.nextHandleID++ - fs.handles[handleID] = handle.NewFileHandle(child.(*inode.FileInode), fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle) + // Creating new file is always a write operation, hence passing readOnly as false. + fs.handles[handleID] = handle.NewFileHandle(child.(*inode.FileInode), fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, false) op.Handle = handleID fs.mu.Unlock() @@ -2372,16 +2373,24 @@ func (fs *fileSystem) OpenFile( ctx context.Context, op *fuseops.OpenFileOp) (err error) { fs.mu.Lock() - defer fs.mu.Unlock() // Find the inode. in := fs.fileInodeOrDie(op.Inode) + // Follow lock ordering rules to get inode lock. + // Inode lock is required to register fileHandle with the inode. + fs.mu.Unlock() + in.Lock() + defer in.Unlock() + + // Get the fs lock again. + fs.mu.Lock() + defer fs.mu.Unlock() // Allocate a handle. handleID := fs.nextHandleID fs.nextHandleID++ - fs.handles[handleID] = handle.NewFileHandle(in, fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle) + fs.handles[handleID] = handle.NewFileHandle(in, fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, op.OpenFlags.IsReadOnly()) op.Handle = handleID // When we observe object generations that we didn't create, we assign them diff --git a/internal/fs/handle/file.go b/internal/fs/handle/file.go index 0d5c77962c..d2d417ce92 100644 --- a/internal/fs/handle/file.go +++ b/internal/fs/handle/file.go @@ -47,16 +47,23 @@ type FileHandle struct { // will be downloaded for random reads as well too. cacheFileForRangeRead bool metricHandle common.MetricHandle + // For now, we will consider the files which are open in append mode also as write, + // as we are not doing anything special for append. When required we will + // define an enum instead of boolean to hold the type of open. + readOnly bool } -func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle common.MetricHandle) (fh *FileHandle) { +// LOCKS_REQUIRED(fh.inode.mu) +func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle common.MetricHandle, readOnly bool) (fh *FileHandle) { fh = &FileHandle{ inode: inode, fileCacheHandler: fileCacheHandler, cacheFileForRangeRead: cacheFileForRangeRead, metricHandle: metricHandle, + readOnly: readOnly, } + fh.inode.RegisterFileHandle(fh.readOnly) fh.mu = syncutil.NewInvariantMutex(fh.checkInvariants) return @@ -65,6 +72,8 @@ func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, // Destroy any resources associated with the handle, which must not be used // again. func (fh *FileHandle) Destroy() { + // Deregister the fileHandle with the inode. + fh.inode.DeRegisterFileHandle(fh.readOnly) if fh.reader != nil { fh.reader.Destroy() } diff --git a/internal/fs/inode/file.go b/internal/fs/inode/file.go index 9c60fbf4fd..78aa009c16 100644 --- a/internal/fs/inode/file.go +++ b/internal/fs/inode/file.go @@ -27,6 +27,7 @@ import ( "github.com/googlecloudplatform/gcsfuse/v2/internal/contentcache" "github.com/googlecloudplatform/gcsfuse/v2/internal/fs/gcsfuse_errors" "github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx" + "github.com/googlecloudplatform/gcsfuse/v2/internal/logger" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs" "github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil" "github.com/jacobsa/fuse/fuseops" @@ -95,6 +96,15 @@ type FileInode struct { bwh *bufferedwrites.BufferedWriteHandler config *cfg.Config + + // Once write is started on the file i.e, bwh is initialized, any fileHandles + // opened in write mode before or after this and not yet closed are considered + // as writing to the file even though they are not writing. + // In case of successful flush, we will set bwh to nil. But in case of error, + // we will keep returning that error to all the fileHandles open during that time + // and set bwh to nil after all fileHandlers are closed. + // writeHandleCount tracks the count of open fileHandles in write mode. + writeHandleCount int32 } var _ Inode = &FileInode{} @@ -369,6 +379,31 @@ func (f *FileInode) DecrementLookupCount(n uint64) (destroy bool) { return } +// LOCKS_REQUIRED(f.mu) +func (f *FileInode) RegisterFileHandle(readOnly bool) { + if !readOnly { + f.writeHandleCount++ + } +} + +// LOCKS_REQUIRED(f.mu) +func (f *FileInode) DeRegisterFileHandle(readOnly bool) { + if readOnly { + return + } + + if f.writeHandleCount <= 0 { + logger.Errorf("Mismatch in number of write file handles for inode :%d", f.id) + } + + f.writeHandleCount-- + + // All write fileHandles associated with bwh are closed. So safe to set bwh to nil. + if f.writeHandleCount == 0 { + f.bwh = nil + } +} + // LOCKS_REQUIRED(f.mu) func (f *FileInode) Destroy() (err error) { f.destroyed = true @@ -802,6 +837,11 @@ func (f *FileInode) CreateBufferedOrTempWriter(ctx context.Context) (err error) } func (f *FileInode) ensureBufferedWriteHandler(ctx context.Context) error { + // bwh already initialized, do nothing. + if f.bwh != nil { + return nil + } + var err error var latestGcsObj *gcs.Object if !f.local { diff --git a/internal/fs/inode/file_test.go b/internal/fs/inode/file_test.go index 3c7f69985c..0666f8846e 100644 --- a/internal/fs/inode/file_test.go +++ b/internal/fs/inode/file_test.go @@ -36,6 +36,7 @@ import ( "github.com/jacobsa/syncutil" "github.com/jacobsa/timeutil" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "golang.org/x/net/context" ) @@ -1311,7 +1312,7 @@ func (t *FileTest) TestMultipleWritesToLocalFileWhenStreamingWritesAreEnabled() assert.WithinDuration(t.T(), attrs.Mtime, createTime, Delta) } -func (t *FileTest) WriteToEmptyGCSFileWhenStreamingWritesAreEnabled() { +func (t *FileTest) TestWriteToEmptyGCSFileWhenStreamingWritesAreEnabled() { t.createInodeWithEmptyObject() t.in.config = &cfg.Config{Write: *getWriteConfig()} createTime := t.in.mtimeClock.Now() @@ -1326,11 +1327,11 @@ func (t *FileTest) WriteToEmptyGCSFileWhenStreamingWritesAreEnabled() { // The inode should agree about the new mtime. attrs, err := t.in.Attributes(t.ctx) assert.Nil(t.T(), err) - assert.Equal(t.T(), int64(2), attrs.Size) + assert.Equal(t.T(), uint64(2), attrs.Size) assert.WithinDuration(t.T(), attrs.Mtime, createTime, Delta) } -func (t *FileTest) SetMtimeOnEmptyGCSFileWhenStreamingWritesAreEnabled() { +func (t *FileTest) TestSetMtimeOnEmptyGCSFileWhenStreamingWritesAreEnabled() { t.createInodeWithEmptyObject() t.in.config = &cfg.Config{Write: *getWriteConfig()} assert.Nil(t.T(), t.in.bwh) @@ -1342,7 +1343,7 @@ func (t *FileTest) SetMtimeOnEmptyGCSFileWhenStreamingWritesAreEnabled() { assert.Nil(t.T(), t.in.bwh) } -func (t *FileTest) SetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnabled() { +func (t *FileTest) TestSetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnabled() { t.createInodeWithEmptyObject() t.in.config = &cfg.Config{Write: *getWriteConfig()} assert.Nil(t.T(), t.in.bwh) @@ -1351,7 +1352,7 @@ func (t *FileTest) SetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnable assert.Nil(t.T(), err) assert.NotNil(t.T(), t.in.bwh) writeFileInfo := t.in.bwh.WriteFileInfo() - assert.Equal(t.T(), 2, writeFileInfo.TotalSize) + assert.Equal(t.T(), int64(2), writeFileInfo.TotalSize) // Set mtime. mtime := time.Now().UTC().Add(123 * time.Second) @@ -1366,6 +1367,92 @@ func (t *FileTest) SetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnable assert.Equal(t.T(), attrs.Atime, mtime) } +func (t *FileTest) TestRegisterFileHandle() { + tbl := []struct { + name string + readonly bool + currentVal int32 + expectedVal int32 + }{ + { + name: "ReadOnlyHandle", + readonly: true, + currentVal: 0, + expectedVal: 0, + }, + { + name: "ZeroCurrentValueForWriteHandle", + readonly: false, + currentVal: 0, + expectedVal: 1, + }, + { + name: "NonZeroCurrentValueForWriteHandle", + readonly: false, + currentVal: 5, + expectedVal: 6, + }, + } + for _, tc := range tbl { + t.Run(tc.name, func() { + t.in.writeHandleCount = tc.currentVal + + t.in.RegisterFileHandle(tc.readonly) + + assert.Equal(t.T(), tc.expectedVal, t.in.writeHandleCount) + }) + } +} + +func (t *FileTest) TestDeRegisterFileHandle() { + tbl := []struct { + name string + readonly bool + currentVal int32 + expectedVal int32 + isBwhNil bool + }{ + { + name: "ReadOnlyHandle", + readonly: true, + currentVal: 10, + expectedVal: 10, + isBwhNil: false, + }, + { + name: "NonZeroCurrentValueForWriteHandle", + readonly: false, + currentVal: 10, + expectedVal: 9, + isBwhNil: false, + }, + { + name: "LastWriteHandleToDeregister", + readonly: false, + currentVal: 1, + expectedVal: 0, + isBwhNil: true, + }, + } + for _, tc := range tbl { + t.Run(tc.name, func() { + t.in.config = &cfg.Config{Write: *getWriteConfig()} + t.in.writeHandleCount = tc.currentVal + err := t.in.ensureBufferedWriteHandler(t.ctx) + require.NoError(t.T(), err) + + t.in.DeRegisterFileHandle(tc.readonly) + + assert.Equal(t.T(), tc.expectedVal, t.in.writeHandleCount) + if tc.isBwhNil { + assert.Nil(t.T(), t.in.bwh) + } else { + assert.NotNil(t.T(), t.in.bwh) + } + }) + } +} + func getWriteConfig() *cfg.WriteConfig { return &cfg.WriteConfig{ MaxBlocksPerFile: 10,