Skip to content

Commit

Permalink
Added destroy method to free up the memory buffers (#2839)
Browse files Browse the repository at this point in the history
* added destroy method

* added destroy method

* fixes

* fixes

* fixing the lock and closing the channel

* Looping over uploadCh and closing it.

* removed unused method

* fixing comment.

* Addressed review comments
  • Loading branch information
vadlakondaswetha authored Dec 27, 2024
1 parent 3946230 commit a0dc44e
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 27 deletions.
12 changes: 12 additions & 0 deletions internal/bufferedwrites/buffered_write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,13 @@ func (wh *BufferedWriteHandler) appendBuffer(data []byte) (err error) {

// Sync uploads all the pending full buffers to GCS.
func (wh *BufferedWriteHandler) Sync() (err error) {
// Upload all the pending buffers and release the buffers.
wh.uploadHandler.AwaitBlocksUpload()
err = wh.blockPool.ClearFreeBlockChannel()
if err != nil {
// Only logging an error in case of resource leak as upload succeeded.
logger.Errorf("blockPool.ClearFreeBlockChannel() failed during sync: %v", err)
}

select {
case <-wh.uploadHandler.SignalUploadFailure():
Expand Down Expand Up @@ -229,6 +235,12 @@ func (wh *BufferedWriteHandler) WriteFileInfo() WriteFileInfo {
}
}

func (wh *BufferedWriteHandler) Destroy() error {
// Destroy the upload handler and then free up the buffers.
wh.uploadHandler.Destroy()
return wh.blockPool.ClearFreeBlockChannel()
}

func (wh *BufferedWriteHandler) writeDataForTruncatedSize() error {
// If totalSize is greater than truncatedSize, that means user has
// written more data than they actually truncated in the beginning.
Expand Down
14 changes: 13 additions & 1 deletion internal/bufferedwrites/buffered_write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (testSuite *BufferedWriteTest) TestSync5InProgressBlocks() {

assert.NoError(testSuite.T(), err)
assert.Equal(testSuite.T(), 0, len(testSuite.bwh.uploadHandler.uploadCh))
assert.Equal(testSuite.T(), 5, len(testSuite.bwh.blockPool.FreeBlocksChannel()))
assert.Equal(testSuite.T(), 0, len(testSuite.bwh.blockPool.FreeBlocksChannel()))
}

func (testSuite *BufferedWriteTest) TestSyncBlocksWithError() {
Expand Down Expand Up @@ -347,6 +347,18 @@ func (testSuite *BufferedWriteTest) TestWriteFileInfoWithTruncatedLengthGreaterT

assert.Equal(testSuite.T(), testSuite.bwh.truncatedSize, fileInfo.TotalSize)
}
func (testSuite *BufferedWriteTest) TestDestroyShouldClearFreeBlockChannel() {
// Try to write 4 blocks of data.
contents := strings.Repeat("A", blockSize*4)
err := testSuite.bwh.Write([]byte(contents), 0)
require.Nil(testSuite.T(), err)

err = testSuite.bwh.Destroy()

require.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), 0, len(testSuite.bwh.blockPool.FreeBlocksChannel()))
assert.Equal(testSuite.T(), 0, len(testSuite.bwh.uploadHandler.uploadCh))
}

func (testSuite *BufferedWriteTest) TestUnlinkBeforeWrite() {
testSuite.bwh.Unlink()
Expand Down
20 changes: 20 additions & 0 deletions internal/bufferedwrites/upload_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,23 @@ func (uh *UploadHandler) SignalUploadFailure() chan error {
func (uh *UploadHandler) AwaitBlocksUpload() {
uh.wg.Wait()
}

func (uh *UploadHandler) Destroy() {
// Move all pending blocks to freeBlockCh and close the channel if not done.
for {
select {
case currBlock, ok := <-uh.uploadCh:
// Not ok means channel closed. Return.
if !ok {
return
}
uh.freeBlocksCh <- currBlock
// Marking as wg.Done to ensure any waiters are unblocked.
uh.wg.Done()
default:
// This will get executed when there are no blocks pending in uploadCh and its not closed.
close(uh.uploadCh)
return
}
}
}
80 changes: 60 additions & 20 deletions internal/bufferedwrites/upload_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,19 @@ func (t *UploadHandlerTest) SetupTest() {
})
}

func (t *UploadHandlerTest) SetupSubTest() {
t.SetupTest()
}

func (t *UploadHandlerTest) TestMultipleBlockUpload() {
// Create some blocks.
var blocks []block.Block
for i := 0; i < 5; i++ {
b, err := t.blockPool.Get()
require.NoError(t.T(), err)
blocks = append(blocks, b)
}
// CreateObjectChunkWriter -- should be called once.
writer := &storagemock.Writer{}
mockObj := &gcs.MinObject{}
t.mockBucket.On("CreateObjectChunkWriter", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(writer, nil)
t.mockBucket.On("FinalizeUpload", mock.Anything, writer).Return(mockObj, nil)

// Upload the blocks.
blocks := t.createBlocks(5)
for _, b := range blocks {
err := t.uh.Upload(b)
require.NoError(t.T(), err)
Expand Down Expand Up @@ -189,13 +187,10 @@ func (t *UploadHandlerTest) TestUploadSingleBlockThrowsErrorInCopy() {

func (t *UploadHandlerTest) TestUploadMultipleBlocksThrowsErrorInCopy() {
// Create some blocks.
var blocks []block.Block
blocks := t.createBlocks(4)
for i := 0; i < 4; i++ {
b, err := t.blockPool.Get()
err := blocks[i].Write([]byte("testdata" + strconv.Itoa(i) + " "))
require.NoError(t.T(), err)
err = b.Write([]byte("testdata" + strconv.Itoa(i) + " "))
require.NoError(t.T(), err)
blocks = append(blocks, b)
}
// CreateObjectChunkWriter -- should be called once.
writer := &storagemock.Writer{}
Expand Down Expand Up @@ -255,18 +250,11 @@ func TestSignalUploadFailure(t *testing.T) {
}

func (t *UploadHandlerTest) TestMultipleBlockAwaitBlocksUpload() {
// Create some blocks.
var blocks []block.Block
for i := 0; i < 5; i++ {
b, err := t.blockPool.Get()
require.NoError(t.T(), err)
blocks = append(blocks, b)
}
// CreateObjectChunkWriter -- should be called once.
writer := &storagemock.Writer{}
t.mockBucket.On("CreateObjectChunkWriter", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(writer, nil)
// Upload the blocks.
for _, b := range blocks {
for _, b := range t.createBlocks(5) {
err := t.uh.Upload(b)
require.NoError(t.T(), err)
}
Expand Down Expand Up @@ -349,3 +337,55 @@ func (t *UploadHandlerTest) TestCreateObjectChunkWriterIsCalledWithCorrectReques
err = t.uh.Upload(b)
require.NoError(t.T(), err)
}

func (t *UploadHandlerTest) TestDestroy() {
testCases := []struct {
name string
uploadChClosed bool
}{
{
name: "UploadChNotClosed",
uploadChClosed: false,
},
{
name: "UploadChClosed",
uploadChClosed: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func() {
// Add blocks to uploadCh.
for _, b := range t.createBlocks(5) {
t.uh.uploadCh <- b
t.uh.wg.Add(1)
}
if tc.uploadChClosed {
close(t.uh.uploadCh)
}

t.uh.Destroy()

assertAllBlocksProcessed(t.T(), t.uh)
assert.Equal(t.T(), 5, len(t.uh.freeBlocksCh))
assert.Equal(t.T(), 0, len(t.uh.uploadCh))
// Check if uploadCh is closed.
select {
case <-t.uh.uploadCh:
default:
assert.Fail(t.T(), "uploadCh not closed")
}
})
}
}

func (t *UploadHandlerTest) createBlocks(count int) []block.Block {
var blocks []block.Block
for i := 0; i < count; i++ {
b, err := t.blockPool.Get()
require.NoError(t.T(), err)
blocks = append(blocks, b)
}

return blocks
}
14 changes: 9 additions & 5 deletions internal/fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2558,13 +2558,17 @@ func (fs *fileSystem) ReleaseFileHandle(
ctx context.Context,
op *fuseops.ReleaseFileHandleOp) (err error) {
fs.mu.Lock()
defer fs.mu.Unlock()

// Destroy the handle.
fs.handles[op.Handle].(*handle.FileHandle).Destroy()

// Update the map.
fileHandle := fs.handles[op.Handle].(*handle.FileHandle)
// Update the map. We are okay updating the map before destroy is called
// since destroy is doing only internal cleanup.
delete(fs.handles, op.Handle)
fs.mu.Unlock()

// Destroy the handle.
fileHandle.Lock()
defer fileHandle.Unlock()
fileHandle.Destroy()

return
}
Expand Down
5 changes: 5 additions & 0 deletions internal/fs/handle/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,14 @@ func NewFileHandle(inode *inode.FileInode, fileCacheHandler *file.CacheHandler,

// Destroy any resources associated with the handle, which must not be used
// again.
// LOCKS_REQUIRED(fh.mu)
// LOCK_FUNCTION(fh.inode.mu)
// UNLOCK_FUNCTION(fh.inode.mu)
func (fh *FileHandle) Destroy() {
// Deregister the fileHandle with the inode.
fh.inode.Lock()
fh.inode.DeRegisterFileHandle(fh.readOnly)
fh.inode.Unlock()
if fh.reader != nil {
fh.reader.Destroy()
}
Expand Down
6 changes: 5 additions & 1 deletion internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,11 @@ func (f *FileInode) DeRegisterFileHandle(readOnly bool) {
f.writeHandleCount--

// All write fileHandles associated with bwh are closed. So safe to set bwh to nil.
if f.writeHandleCount == 0 {
if f.writeHandleCount == 0 && f.bwh != nil {
err := f.bwh.Destroy()
if err != nil {
logger.Warnf("Error while destroying the bufferedWritesHandler: %v", err)
}
f.bwh = nil
}
}
Expand Down

0 comments on commit a0dc44e

Please sign in to comment.