Skip to content

Commit

Permalink
Handling truncate in buffered writes flow (#2775)
Browse files Browse the repository at this point in the history
* Truncate handling

* added comment

* fixing truncate

* PR review comments

* PR review comments
  • Loading branch information
vadlakondaswetha authored Dec 16, 2024
1 parent 840605f commit 6c3060a
Show file tree
Hide file tree
Showing 4 changed files with 332 additions and 5 deletions.
63 changes: 60 additions & 3 deletions internal/bufferedwrites/buffered_write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,18 @@ type BufferedWriteHandler struct {
blockPool *block.BlockPool
uploadHandler *UploadHandler
// Total size of data buffered so far. Some part of buffered data might have
// been uploaded to GCS as well.
// been uploaded to GCS as well. Depending on the state we are in, it might or
// might not include truncatedSize.
totalSize int64
// Stores the mtime value updated by kernel as part of setInodeAttributes call.
mtime time.Time
// Stores the size to truncate. No action is made when truncate is called.
// Will be used as mentioned below:
// 1. During flush if totalSize != truncatedSize, additional dummy data is
// added before flush and uploaded.
// 2. If write is started after the truncate offset, dummy data is created
// as per the truncatedSize and then new data is appended to it.
truncatedSize int64
}

// WriteFileInfo is used as part of serving fileInode attributes (GetInodeAttributes call).
Expand Down Expand Up @@ -71,12 +79,20 @@ func NewBWHandler(objectName string, bucket gcs.Bucket, blockSize int64, maxBloc
// Write writes the given data to the buffer. It writes to an existing buffer if
// the capacity is available otherwise writes to a new buffer.
func (wh *BufferedWriteHandler) Write(data []byte, offset int64) (err error) {
if offset != wh.totalSize {
if offset != wh.totalSize && offset != wh.truncatedSize {
logger.Errorf("BufferedWriteHandler.OutOfOrderError for object: %s, expectedOffset: %d, actualOffset: %d",
wh.uploadHandler.objectName, wh.totalSize, offset)
return ErrOutOfOrderWrite
}

if offset == wh.truncatedSize {
// Check and update if any data filling has to be done.
err = wh.writeDataForTruncatedSize()
if err != nil {
return
}
}

// Fail early if the uploadHandler has failed.
select {
case <-wh.uploadHandler.SignalUploadFailure():
Expand All @@ -85,6 +101,10 @@ func (wh *BufferedWriteHandler) Write(data []byte, offset int64) (err error) {
break
}

return wh.appendBuffer(data)
}

func (wh *BufferedWriteHandler) appendBuffer(data []byte) (err error) {
dataWritten := 0
for dataWritten < len(data) {
if wh.current == nil {
Expand Down Expand Up @@ -131,6 +151,12 @@ func (wh *BufferedWriteHandler) Sync() (err error) {

// Flush finalizes the upload.
func (wh *BufferedWriteHandler) Flush() (*gcs.MinObject, error) {
// In case it is a truncated file, upload empty blocks as required.
err := wh.writeDataForTruncatedSize()
if err != nil {
return nil, err
}

if wh.current != nil {
err := wh.uploadHandler.Upload(wh.current)
if err != nil {
Expand Down Expand Up @@ -166,11 +192,42 @@ func (wh *BufferedWriteHandler) SetMtime(mtime time.Time) {
wh.mtime = mtime
}

func (wh *BufferedWriteHandler) Truncate(size int64) error {
if size < wh.totalSize {
return fmt.Errorf("cannot truncate to lesser size when upload is in progress")
}

wh.truncatedSize = size
return nil
}

// WriteFileInfo returns the file info i.e, how much data has been buffered so far
// and the mtime.
func (wh *BufferedWriteHandler) WriteFileInfo() WriteFileInfo {
return WriteFileInfo{
TotalSize: wh.totalSize,
TotalSize: int64(math.Max(float64(wh.totalSize), float64(wh.truncatedSize))),
Mtime: wh.mtime,
}
}

func (wh *BufferedWriteHandler) writeDataForTruncatedSize() error {
// If totalSize is greater than truncatedSize, that means user has
// written more data than they actually truncated in the beginning.
if wh.totalSize >= wh.truncatedSize {
return nil
}

// Otherwise append dummy data to match truncatedSize.
diff := wh.truncatedSize - wh.totalSize
// Create 1MB of data at a time to avoid OOM
chunkSize := 1024 * 1024
for i := 0; i < int(diff); i += chunkSize {
size := math.Min(float64(chunkSize), float64(int(diff)-i))
err := wh.appendBuffer(make([]byte, int(size)))
if err != nil {
return err
}
}

return nil
}
89 changes: 89 additions & 0 deletions internal/bufferedwrites/buffered_write_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,39 @@ func (testSuite *BufferedWriteTest) TestWriteWithSignalUploadFailureInBetween()
assert.Equal(testSuite.T(), err, ErrUploadFailure)
}

func (testSuite *BufferedWriteTest) TestWriteAtTruncatedOffset() {
// Truncate
err := testSuite.bwh.Truncate(2)
require.NoError(testSuite.T(), err)
require.Equal(testSuite.T(), int64(2), testSuite.bwh.truncatedSize)

// Write at offset = truncatedSize
err = testSuite.bwh.Write([]byte("hello"), 2)

require.Nil(testSuite.T(), err)
fileInfo := testSuite.bwh.WriteFileInfo()
assert.Equal(testSuite.T(), testSuite.bwh.mtime, fileInfo.Mtime)
assert.Equal(testSuite.T(), int64(7), fileInfo.TotalSize)
}

func (testSuite *BufferedWriteTest) TestWriteAfterTruncateAtCurrentSize() {
err := testSuite.bwh.Write([]byte("hello"), 0)
require.Nil(testSuite.T(), err)
require.Equal(testSuite.T(), int64(5), testSuite.bwh.totalSize)
// Truncate
err = testSuite.bwh.Truncate(20)
require.NoError(testSuite.T(), err)
require.Equal(testSuite.T(), int64(20), testSuite.bwh.truncatedSize)
require.Equal(testSuite.T(), int64(20), testSuite.bwh.WriteFileInfo().TotalSize)

// Write at offset=bwh.totalSize
err = testSuite.bwh.Write([]byte("abcde"), 5)

require.Nil(testSuite.T(), err)
assert.Equal(testSuite.T(), int64(10), testSuite.bwh.totalSize)
assert.Equal(testSuite.T(), int64(20), testSuite.bwh.WriteFileInfo().TotalSize)
}

func (testSuite *BufferedWriteTest) TestFlushWithNonNilCurrentBlock() {
err := testSuite.bwh.Write([]byte("hi"), 0)
require.Nil(testSuite.T(), err)
Expand Down Expand Up @@ -247,3 +280,59 @@ func (testSuite *BufferedWriteTest) TestSyncBlocksWithError() {
assert.Error(testSuite.T(), err)
assert.Equal(testSuite.T(), ErrUploadFailure, err)
}

func (testSuite *BufferedWriteTest) TestFlushWithNonZeroTruncatedLengthForEmptyObject() {
require.Nil(testSuite.T(), testSuite.bwh.current)
testSuite.bwh.truncatedSize = 10

_, err := testSuite.bwh.Flush()

assert.NoError(testSuite.T(), err)
assert.Equal(testSuite.T(), testSuite.bwh.truncatedSize, testSuite.bwh.totalSize)
}

func (testSuite *BufferedWriteTest) TestFlushWithTruncatedLengthGreaterThanObjectSize() {
err := testSuite.bwh.Write([]byte("hi"), 0)
require.Nil(testSuite.T(), err)
testSuite.bwh.truncatedSize = 10

_, err = testSuite.bwh.Flush()

assert.NoError(testSuite.T(), err)
assert.Equal(testSuite.T(), testSuite.bwh.truncatedSize, testSuite.bwh.totalSize)
}

func (testSuite *BufferedWriteTest) TestTruncateWithLesserSize() {
testSuite.bwh.totalSize = 10

err := testSuite.bwh.Truncate(2)

assert.Error(testSuite.T(), err)
}

func (testSuite *BufferedWriteTest) TestTruncateWithSizeGreaterThanCurrentObjectSize() {
testSuite.bwh.totalSize = 10

err := testSuite.bwh.Truncate(12)

assert.NoError(testSuite.T(), err)
assert.Equal(testSuite.T(), int64(12), testSuite.bwh.truncatedSize)
}

func (testSuite *BufferedWriteTest) TestWriteFileInfoWithTruncatedLengthLessThanTotalSize() {
testSuite.bwh.totalSize = 10
testSuite.bwh.truncatedSize = 5

fileInfo := testSuite.bwh.WriteFileInfo()

assert.Equal(testSuite.T(), testSuite.bwh.totalSize, fileInfo.TotalSize)
}

func (testSuite *BufferedWriteTest) TestWriteFileInfoWithTruncatedLengthGreaterThanTotalSize() {
testSuite.bwh.totalSize = 10
testSuite.bwh.truncatedSize = 20

fileInfo := testSuite.bwh.WriteFileInfo()

assert.Equal(testSuite.T(), testSuite.bwh.truncatedSize, fileInfo.TotalSize)
}
12 changes: 12 additions & 0 deletions internal/fs/inode/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,18 @@ func (f *FileInode) Sync(ctx context.Context) (err error) {
func (f *FileInode) Truncate(
ctx context.Context,
size int64) (err error) {
// For empty GCS files also, we will trigger bufferedWrites flow.
if f.src.Size == 0 && f.writeConfig.ExperimentalEnableStreamingWrites {
err = f.ensureBufferedWriteHandler()
if err != nil {
return
}
}

if f.bwh != nil {
return f.bwh.Truncate(size)
}

// Make sure f.content != nil.
err = f.ensureContent(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit 6c3060a

Please sign in to comment.