Skip to content

Commit

Permalink
[CELEBORN-1705] Fix disk buffer size is negative issue
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Fix disk buffer size is negative issue.

Before, when writing for PartitionDataWriter with memory file storage
1. if `isMemoryShuffleFile` is true, increment the memory file storage counter
2. check if `evict` is needed, if that, flush the buffer and then set `isMemoryShuffleFile` to false
3. add data into flushBuffer
4. if memory file storage evicted, the data buffer would be released as disk buffer finally.

Then the disk buffer size would be negative finally, and memory file storage would be always positive.

In this PR, we update the counter after `evict` finished.

### Why are the changes needed?
After no active running application in the celeborn cluster, I found that, it is abnormal per the celeborn worker log.
```
24/11/09 23:30:50,474 INFO [worker-memory-manager-reporter] MemoryManager: Direct memory usage: 276.0 MiB/40.0 GiB, disk buffer size: -748726.0 B, sort memory size: 0.0 B, read buffer size: 0.0 B, memory file storage size : 731.2 KiB
```

```
disk buffer size: -748726.0 B
memory file storage size : 731.2 KiB
```

Both of them are expected to be 0.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?
UT and Integration testing.

<img width="1895" alt="image" src="https://github.com/user-attachments/assets/231dd0cd-e44d-49f7-b18d-2a3eb4f52c3b">

Closes #2916 from turboFei/memory_disk_size.

Authored-by: Wang, Fei <fwang12@ebay.com>
Signed-off-by: mingji <fengmingxiao.fmx@alibaba-inc.com>
  • Loading branch information
turboFei authored and FMX committed Nov 14, 2024
1 parent 81a0d51 commit b755765
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ public boolean memoryFileStorageAvailable() {
return memoryFileStorageCounter.sum() < memoryFileStorageThreshold;
}

public void increaseMemoryFileStorage(int bytes) {
public void incrementMemoryFileStorage(int bytes) {
memoryFileStorageCounter.add(bytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,6 @@ public void write(ByteBuf data) throws IOException {
}

final int numBytes = data.readableBytes();
if (isMemoryShuffleFile.get()) {
MemoryManager.instance().increaseMemoryFileStorage(numBytes);
} else {
MemoryManager.instance().incrementDiskBuffer(numBytes);
if (userCongestionControlContext != null) {
userCongestionControlContext.updateProduceBytes(numBytes);
}
}

synchronized (flushLock) {
if (closed) {
Expand Down Expand Up @@ -351,6 +343,16 @@ public void write(ByteBuf data) throws IOException {
}
}

// update the disk buffer or memory file storage after evict
if (isMemoryShuffleFile.get()) {
MemoryManager.instance().incrementMemoryFileStorage(numBytes);
} else {
MemoryManager.instance().incrementDiskBuffer(numBytes);
if (userCongestionControlContext != null) {
userCongestionControlContext.updateProduceBytes(numBytes);
}
}

data.retain();
flushBuffer.addComponent(true, data);
if (isMemoryShuffleFile.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ public void testWriteAndChunkRead() throws Exception {
@Test
public void testEvictAndChunkRead() throws Exception {
final int threadsNum = 16;
final long memoryFileStorageBefore = MemoryManager.instance().getMemoryFileStorageCounter();
PartitionDataWriter partitionDataWriter =
new ReducePartitionDataWriter(
PartitionDataWriterSuiteUtils.prepareMemoryEvictEnvironment(
Expand Down Expand Up @@ -526,6 +527,9 @@ public void testEvictAndChunkRead() throws Exception {
result.releaseBuffers();

closeChunkServer();

assert storageManager.evictedFileCount().get() > 0;
assert MemoryManager.instance().getMemoryFileStorageCounter() == memoryFileStorageBefore;
}

@Test
Expand Down

0 comments on commit b755765

Please sign in to comment.