diff --git a/internal/bufferedwrites/buffered_write_handler.go b/internal/bufferedwrites/buffered_write_handler.go index 6ad09ba515..28a9b8e851 100644 --- a/internal/bufferedwrites/buffered_write_handler.go +++ b/internal/bufferedwrites/buffered_write_handler.go @@ -158,7 +158,13 @@ func (wh *BufferedWriteHandler) appendBuffer(data []byte) (err error) { // Sync uploads all the pending full buffers to GCS. func (wh *BufferedWriteHandler) Sync() (err error) { + // Upload all the pending buffers and release the buffers. wh.uploadHandler.AwaitBlocksUpload() + err = wh.blockPool.ClearFreeBlockChannel() + if err != nil { + // Only logging an error in case of resource leak as upload succeeded. + logger.Errorf("blockPool.ClearFreeBlockChannel() failed during sync: %v", err) + } select { case <-wh.uploadHandler.SignalUploadFailure(): @@ -229,6 +235,12 @@ func (wh *BufferedWriteHandler) WriteFileInfo() WriteFileInfo { } } +func (wh *BufferedWriteHandler) Destroy() error { + // Destroy the upload handler and then free up the buffers. + wh.uploadHandler.Destroy() + return wh.blockPool.ClearFreeBlockChannel() +} + func (wh *BufferedWriteHandler) writeDataForTruncatedSize() error { // If totalSize is greater than truncatedSize, that means user has // written more data than they actually truncated in the beginning. diff --git a/internal/bufferedwrites/buffered_write_handler_test.go b/internal/bufferedwrites/buffered_write_handler_test.go index 15fe9e9707..996eef99be 100644 --- a/internal/bufferedwrites/buffered_write_handler_test.go +++ b/internal/bufferedwrites/buffered_write_handler_test.go @@ -272,7 +272,7 @@ func (testSuite *BufferedWriteTest) TestSync5InProgressBlocks() { assert.NoError(testSuite.T(), err) assert.Equal(testSuite.T(), 0, len(testSuite.bwh.uploadHandler.uploadCh)) - assert.Equal(testSuite.T(), 5, len(testSuite.bwh.blockPool.FreeBlocksChannel())) + assert.Equal(testSuite.T(), 0, len(testSuite.bwh.blockPool.FreeBlocksChannel())) } func (testSuite *BufferedWriteTest) TestSyncBlocksWithError() { @@ -347,6 +347,18 @@ func (testSuite *BufferedWriteTest) TestWriteFileInfoWithTruncatedLengthGreaterT assert.Equal(testSuite.T(), testSuite.bwh.truncatedSize, fileInfo.TotalSize) } +func (testSuite *BufferedWriteTest) TestDestroyShouldClearFreeBlockChannel() { + // Try to write 4 blocks of data. + contents := strings.Repeat("A", blockSize*4) + err := testSuite.bwh.Write([]byte(contents), 0) + require.Nil(testSuite.T(), err) + + err = testSuite.bwh.Destroy() + + require.Nil(testSuite.T(), err) + assert.Equal(testSuite.T(), 0, len(testSuite.bwh.blockPool.FreeBlocksChannel())) + assert.Equal(testSuite.T(), 0, len(testSuite.bwh.uploadHandler.uploadCh)) +} func (testSuite *BufferedWriteTest) TestUnlinkBeforeWrite() { testSuite.bwh.Unlink() diff --git a/internal/bufferedwrites/upload_handler.go b/internal/bufferedwrites/upload_handler.go index b9cd3fdb33..8a72faa27d 100644 --- a/internal/bufferedwrites/upload_handler.go +++ b/internal/bufferedwrites/upload_handler.go @@ -168,3 +168,23 @@ func (uh *UploadHandler) SignalUploadFailure() chan error { func (uh *UploadHandler) AwaitBlocksUpload() { uh.wg.Wait() } + +func (uh *UploadHandler) Destroy() { + // Move all pending blocks to freeBlockCh and close the channel if not done. + for { + select { + case currBlock, ok := <-uh.uploadCh: + // Not ok means channel closed. Return. + if !ok { + return + } + uh.freeBlocksCh <- currBlock + // Marking as wg.Done to ensure any waiters are unblocked. + uh.wg.Done() + default: + // This will get executed when there are no blocks pending in uploadCh and its not closed. + close(uh.uploadCh) + return + } + } +} diff --git a/internal/bufferedwrites/upload_handler_test.go b/internal/bufferedwrites/upload_handler_test.go index 9908a4ebf6..55c9b70f18 100644 --- a/internal/bufferedwrites/upload_handler_test.go +++ b/internal/bufferedwrites/upload_handler_test.go @@ -63,14 +63,11 @@ func (t *UploadHandlerTest) SetupTest() { }) } +func (t *UploadHandlerTest) SetupSubTest() { + t.SetupTest() +} + func (t *UploadHandlerTest) TestMultipleBlockUpload() { - // Create some blocks. - var blocks []block.Block - for i := 0; i < 5; i++ { - b, err := t.blockPool.Get() - require.NoError(t.T(), err) - blocks = append(blocks, b) - } // CreateObjectChunkWriter -- should be called once. writer := &storagemock.Writer{} mockObj := &gcs.MinObject{} @@ -78,6 +75,7 @@ func (t *UploadHandlerTest) TestMultipleBlockUpload() { t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, nil) // Upload the blocks. + blocks := t.createBlocks(5) for _, b := range blocks { err := t.uh.Upload(b) require.NoError(t.T(), err) @@ -189,13 +187,10 @@ func (t *UploadHandlerTest) TestUploadSingleBlockThrowsErrorInCopy() { func (t *UploadHandlerTest) TestUploadMultipleBlocksThrowsErrorInCopy() { // Create some blocks. - var blocks []block.Block + blocks := t.createBlocks(4) for i := 0; i < 4; i++ { - b, err := t.blockPool.Get() + err := blocks[i].Write([]byte("testdata" + strconv.Itoa(i) + " ")) require.NoError(t.T(), err) - err = b.Write([]byte("testdata" + strconv.Itoa(i) + " ")) - require.NoError(t.T(), err) - blocks = append(blocks, b) } // CreateObjectChunkWriter -- should be called once. writer := &storagemock.Writer{} @@ -255,18 +250,11 @@ func TestSignalUploadFailure(t *testing.T) { } func (t *UploadHandlerTest) TestMultipleBlockAwaitBlocksUpload() { - // Create some blocks. - var blocks []block.Block - for i := 0; i < 5; i++ { - b, err := t.blockPool.Get() - require.NoError(t.T(), err) - blocks = append(blocks, b) - } // CreateObjectChunkWriter -- should be called once. writer := &storagemock.Writer{} t.mockBucket.On("CreateObjectChunkWriter", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(writer, nil) // Upload the blocks. - for _, b := range blocks { + for _, b := range t.createBlocks(5) { err := t.uh.Upload(b) require.NoError(t.T(), err) } @@ -349,3 +337,55 @@ func (t *UploadHandlerTest) TestCreateObjectChunkWriterIsCalledWithCorrectReques err = t.uh.Upload(b) require.NoError(t.T(), err) } + +func (t *UploadHandlerTest) TestDestroy() { + testCases := []struct { + name string + uploadChClosed bool + }{ + { + name: "UploadChNotClosed", + uploadChClosed: false, + }, + { + name: "UploadChClosed", + uploadChClosed: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func() { + // Add blocks to uploadCh. + for _, b := range t.createBlocks(5) { + t.uh.uploadCh <- b + t.uh.wg.Add(1) + } + if tc.uploadChClosed { + close(t.uh.uploadCh) + } + + t.uh.Destroy() + + assertAllBlocksProcessed(t.T(), t.uh) + assert.Equal(t.T(), 5, len(t.uh.freeBlocksCh)) + assert.Equal(t.T(), 0, len(t.uh.uploadCh)) + // Check if uploadCh is closed. + select { + case <-t.uh.uploadCh: + default: + assert.Fail(t.T(), "uploadCh not closed") + } + }) + } +} + +func (t *UploadHandlerTest) createBlocks(count int) []block.Block { + var blocks []block.Block + for i := 0; i < count; i++ { + b, err := t.blockPool.Get() + require.NoError(t.T(), err) + blocks = append(blocks, b) + } + + return blocks +} diff --git a/internal/fs/fs.go b/internal/fs/fs.go index 243e30cc02..b2c4a7c5b0 100644 --- a/internal/fs/fs.go +++ b/internal/fs/fs.go @@ -2558,13 +2558,17 @@ func (fs *fileSystem) ReleaseFileHandle( ctx context.Context, op *fuseops.ReleaseFileHandleOp) (err error) { fs.mu.Lock() - defer fs.mu.Unlock() - - // Destroy the handle. - fs.handles[op.Handle].(*handle.FileHandle).Destroy() - // Update the map. + fileHandle := fs.handles[op.Handle].(*handle.FileHandle) + // Update the map. We are okay updating the map before destroy is called + // since destroy is doing only internal cleanup. delete(fs.handles, op.Handle) + fs.mu.Unlock() + + // Destroy the handle. + fileHandle.Lock() + defer fileHandle.Unlock() + fileHandle.Destroy() return } diff --git a/internal/fs/handle/file.go b/internal/fs/handle/file.go index d2d417ce92..88be496cf2 100644 --- a/internal/fs/handle/file.go +++ b/internal/fs/handle/file.go @@ -71,9 +71,14 @@ func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler, // Destroy any resources associated with the handle, which must not be used // again. +// LOCKS_REQUIRED(fh.mu) +// LOCK_FUNCTION(fh.inode.mu) +// UNLOCK_FUNCTION(fh.inode.mu) func (fh *FileHandle) Destroy() { // Deregister the fileHandle with the inode. + fh.inode.Lock() fh.inode.DeRegisterFileHandle(fh.readOnly) + fh.inode.Unlock() if fh.reader != nil { fh.reader.Destroy() } diff --git a/internal/fs/inode/file.go b/internal/fs/inode/file.go index e3f7632c3c..1b415b74ba 100644 --- a/internal/fs/inode/file.go +++ b/internal/fs/inode/file.go @@ -403,7 +403,11 @@ func (f *FileInode) DeRegisterFileHandle(readOnly bool) { f.writeHandleCount-- // All write fileHandles associated with bwh are closed. So safe to set bwh to nil. - if f.writeHandleCount == 0 { + if f.writeHandleCount == 0 && f.bwh != nil { + err := f.bwh.Destroy() + if err != nil { + logger.Warnf("Error while destroying the bufferedWritesHandler: %v", err) + } f.bwh = nil } }