Skip to content

Commit

Permalink
Setting bwh to nil after all fileHandles are closed (#2823)
Browse files Browse the repository at this point in the history
* fixing conflicts

* fixing nil assert

# Conflicts:
#	internal/fs/inode/file_test.go

* not nil fix

* storing fileHandle open semantics

* Added unit tests for fileInode

* Registering and deregistering.

* ADDED COMMENTS

* fixing panic

* fixing tests

* PR comment
  • Loading branch information
vadlakondaswetha authored Dec 24, 2024
1 parent 2eea4f0 commit 88ff04c
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 9 deletions.
15 changes: 12 additions & 3 deletions internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1737,7 +1737,8 @@ func (fs *fileSystem) CreateFile(
handleID := fs.nextHandleID
fs.nextHandleID++

fs.handles[handleID] = handle.NewFileHandle(child.(*inode.FileInode), fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle)
// Creating new file is always a write operation, hence passing readOnly as false.
fs.handles[handleID] = handle.NewFileHandle(child.(*inode.FileInode), fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, false)
op.Handle = handleID

fs.mu.Unlock()
Expand Down Expand Up @@ -2372,16 +2373,24 @@ func (fs *fileSystem) OpenFile(
ctx context.Context,
op *fuseops.OpenFileOp) (err error) {
fs.mu.Lock()
defer fs.mu.Unlock()

// Find the inode.
in := fs.fileInodeOrDie(op.Inode)
// Follow lock ordering rules to get inode lock.
// Inode lock is required to register fileHandle with the inode.
fs.mu.Unlock()
in.Lock()
defer in.Unlock()

// Get the fs lock again.
fs.mu.Lock()
defer fs.mu.Unlock()

// Allocate a handle.
handleID := fs.nextHandleID
fs.nextHandleID++

fs.handles[handleID] = handle.NewFileHandle(in, fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle)
fs.handles[handleID] = handle.NewFileHandle(in, fs.fileCacheHandler, fs.cacheFileForRangeRead, fs.metricHandle, op.OpenFlags.IsReadOnly())
op.Handle = handleID

// When we observe object generations that we didn't create, we assign them
Expand Down
11 changes: 10 additions & 1 deletion internal/fs/handle/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,23 @@ type FileHandle struct {
// will be downloaded for random reads as well too.
cacheFileForRangeRead bool
metricHandle common.MetricHandle
// For now, we will consider the files which are open in append mode also as write,
// as we are not doing anything special for append. When required we will
// define an enum instead of boolean to hold the type of open.
readOnly bool
}

func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle common.MetricHandle) (fh *FileHandle) {
// LOCKS_REQUIRED(fh.inode.mu)
func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, cacheFileForRangeRead bool, metricHandle common.MetricHandle, readOnly bool) (fh *FileHandle) {
fh = &FileHandle{
inode: inode,
fileCacheHandler: fileCacheHandler,
cacheFileForRangeRead: cacheFileForRangeRead,
metricHandle: metricHandle,
readOnly: readOnly,
}

fh.inode.RegisterFileHandle(fh.readOnly)
fh.mu = syncutil.NewInvariantMutex(fh.checkInvariants)

return
Expand All @@ -65,6 +72,8 @@ func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler,
// Destroy any resources associated with the handle, which must not be used
// again.
func (fh *FileHandle) Destroy() {
// Deregister the fileHandle with the inode.
fh.inode.DeRegisterFileHandle(fh.readOnly)
if fh.reader != nil {
fh.reader.Destroy()
}
Expand Down
40 changes: 40 additions & 0 deletions internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/googlecloudplatform/gcsfuse/v2/internal/contentcache"
"github.com/googlecloudplatform/gcsfuse/v2/internal/fs/gcsfuse_errors"
"github.com/googlecloudplatform/gcsfuse/v2/internal/gcsx"
"github.com/googlecloudplatform/gcsfuse/v2/internal/logger"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/gcs"
"github.com/googlecloudplatform/gcsfuse/v2/internal/storage/storageutil"
"github.com/jacobsa/fuse/fuseops"
Expand Down Expand Up @@ -95,6 +96,15 @@ type FileInode struct {

bwh *bufferedwrites.BufferedWriteHandler
config *cfg.Config

// Once write is started on the file i.e, bwh is initialized, any fileHandles
// opened in write mode before or after this and not yet closed are considered
// as writing to the file even though they are not writing.
// In case of successful flush, we will set bwh to nil. But in case of error,
// we will keep returning that error to all the fileHandles open during that time
// and set bwh to nil after all fileHandlers are closed.
// writeHandleCount tracks the count of open fileHandles in write mode.
writeHandleCount int32
}

var _ Inode = &FileInode{}
Expand Down Expand Up @@ -369,6 +379,31 @@ func (f *FileInode) DecrementLookupCount(n uint64) (destroy bool) {
return
}

// LOCKS_REQUIRED(f.mu)
func (f *FileInode) RegisterFileHandle(readOnly bool) {
if !readOnly {
f.writeHandleCount++
}
}

// LOCKS_REQUIRED(f.mu)
func (f *FileInode) DeRegisterFileHandle(readOnly bool) {
if readOnly {
return
}

if f.writeHandleCount <= 0 {
logger.Errorf("Mismatch in number of write file handles for inode :%d", f.id)
}

f.writeHandleCount--

// All write fileHandles associated with bwh are closed. So safe to set bwh to nil.
if f.writeHandleCount == 0 {
f.bwh = nil
}
}

// LOCKS_REQUIRED(f.mu)
func (f *FileInode) Destroy() (err error) {
f.destroyed = true
Expand Down Expand Up @@ -802,6 +837,11 @@ func (f *FileInode) CreateBufferedOrTempWriter(ctx context.Context) (err error)
}

func (f *FileInode) ensureBufferedWriteHandler(ctx context.Context) error {
// bwh already initialized, do nothing.
if f.bwh != nil {
return nil
}

var err error
var latestGcsObj *gcs.Object
if !f.local {
Expand Down
97 changes: 92 additions & 5 deletions internal/fs/inode/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/jacobsa/syncutil"
"github.com/jacobsa/timeutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -1311,7 +1312,7 @@ func (t *FileTest) TestMultipleWritesToLocalFileWhenStreamingWritesAreEnabled()
assert.WithinDuration(t.T(), attrs.Mtime, createTime, Delta)
}

func (t *FileTest) WriteToEmptyGCSFileWhenStreamingWritesAreEnabled() {
func (t *FileTest) TestWriteToEmptyGCSFileWhenStreamingWritesAreEnabled() {
t.createInodeWithEmptyObject()
t.in.config = &cfg.Config{Write: *getWriteConfig()}
createTime := t.in.mtimeClock.Now()
Expand All @@ -1326,11 +1327,11 @@ func (t *FileTest) WriteToEmptyGCSFileWhenStreamingWritesAreEnabled() {
// The inode should agree about the new mtime.
attrs, err := t.in.Attributes(t.ctx)
assert.Nil(t.T(), err)
assert.Equal(t.T(), int64(2), attrs.Size)
assert.Equal(t.T(), uint64(2), attrs.Size)
assert.WithinDuration(t.T(), attrs.Mtime, createTime, Delta)
}

func (t *FileTest) SetMtimeOnEmptyGCSFileWhenStreamingWritesAreEnabled() {
func (t *FileTest) TestSetMtimeOnEmptyGCSFileWhenStreamingWritesAreEnabled() {
t.createInodeWithEmptyObject()
t.in.config = &cfg.Config{Write: *getWriteConfig()}
assert.Nil(t.T(), t.in.bwh)
Expand All @@ -1342,7 +1343,7 @@ func (t *FileTest) SetMtimeOnEmptyGCSFileWhenStreamingWritesAreEnabled() {
assert.Nil(t.T(), t.in.bwh)
}

func (t *FileTest) SetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnabled() {
func (t *FileTest) TestSetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnabled() {
t.createInodeWithEmptyObject()
t.in.config = &cfg.Config{Write: *getWriteConfig()}
assert.Nil(t.T(), t.in.bwh)
Expand All @@ -1351,7 +1352,7 @@ func (t *FileTest) SetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnable
assert.Nil(t.T(), err)
assert.NotNil(t.T(), t.in.bwh)
writeFileInfo := t.in.bwh.WriteFileInfo()
assert.Equal(t.T(), 2, writeFileInfo.TotalSize)
assert.Equal(t.T(), int64(2), writeFileInfo.TotalSize)

// Set mtime.
mtime := time.Now().UTC().Add(123 * time.Second)
Expand All @@ -1366,6 +1367,92 @@ func (t *FileTest) SetMtimeOnEmptyGCSFileAfterWritesWhenStreamingWritesAreEnable
assert.Equal(t.T(), attrs.Atime, mtime)
}

func (t *FileTest) TestRegisterFileHandle() {
tbl := []struct {
name string
readonly bool
currentVal int32
expectedVal int32
}{
{
name: "ReadOnlyHandle",
readonly: true,
currentVal: 0,
expectedVal: 0,
},
{
name: "ZeroCurrentValueForWriteHandle",
readonly: false,
currentVal: 0,
expectedVal: 1,
},
{
name: "NonZeroCurrentValueForWriteHandle",
readonly: false,
currentVal: 5,
expectedVal: 6,
},
}
for _, tc := range tbl {
t.Run(tc.name, func() {
t.in.writeHandleCount = tc.currentVal

t.in.RegisterFileHandle(tc.readonly)

assert.Equal(t.T(), tc.expectedVal, t.in.writeHandleCount)
})
}
}

func (t *FileTest) TestDeRegisterFileHandle() {
tbl := []struct {
name string
readonly bool
currentVal int32
expectedVal int32
isBwhNil bool
}{
{
name: "ReadOnlyHandle",
readonly: true,
currentVal: 10,
expectedVal: 10,
isBwhNil: false,
},
{
name: "NonZeroCurrentValueForWriteHandle",
readonly: false,
currentVal: 10,
expectedVal: 9,
isBwhNil: false,
},
{
name: "LastWriteHandleToDeregister",
readonly: false,
currentVal: 1,
expectedVal: 0,
isBwhNil: true,
},
}
for _, tc := range tbl {
t.Run(tc.name, func() {
t.in.config = &cfg.Config{Write: *getWriteConfig()}
t.in.writeHandleCount = tc.currentVal
err := t.in.ensureBufferedWriteHandler(t.ctx)
require.NoError(t.T(), err)

t.in.DeRegisterFileHandle(tc.readonly)

assert.Equal(t.T(), tc.expectedVal, t.in.writeHandleCount)
if tc.isBwhNil {
assert.Nil(t.T(), t.in.bwh)
} else {
assert.NotNil(t.T(), t.in.bwh)
}
})
}
}

func getWriteConfig() *cfg.WriteConfig {
return &cfg.WriteConfig{
MaxBlocksPerFile: 10,
Expand Down

0 comments on commit 88ff04c

Please sign in to comment.