From b38b1d0990d85a550263d9d8d92a2490533ca120 Mon Sep 17 00:00:00 2001 From: "Xu Han@AutoMQ" Date: Tue, 27 Feb 2024 22:46:30 +0800 Subject: [PATCH] feat(s3stream): sequential allocate memory for record write (#949) Signed-off-by: Robin Han --- .../automq/stream/DirectByteBufSeqAlloc.java | 82 +++++++++++++++++++ .../stream/s3/StreamRecordBatchCodec.java | 5 +- .../stream/DirectByteBufSeqAllocTest.java | 72 ++++++++++++++++ 3 files changed, 158 insertions(+), 1 deletion(-) create mode 100644 s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java create mode 100644 s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java diff --git a/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java b/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java new file mode 100644 index 000000000..59ae5e351 --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java @@ -0,0 +1,82 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream; + +import com.automq.stream.s3.DirectByteBufAlloc; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import java.util.concurrent.atomic.AtomicReference; + +public class DirectByteBufSeqAlloc { + public static final int HUGE_BUF_SIZE = 8 * 1024 * 1024; + // why not use ThreadLocal? the partition open has too much threads + final AtomicReference[] hugeBufArray = new AtomicReference[8]; + private final int allocType; + + public DirectByteBufSeqAlloc(int allocType) { + this.allocType = allocType; + for (int i = 0; i < hugeBufArray.length; i++) { + hugeBufArray[i] = new AtomicReference<>(new HugeBuf(DirectByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType))); + } + } + + public ByteBuf byteBuffer(int capacity) { + if (capacity >= HUGE_BUF_SIZE) { + // if the request capacity is larger than HUGE_BUF_SIZE, just allocate a new ByteBuf + return DirectByteBufAlloc.byteBuffer(capacity, allocType); + } + int bufIndex = Math.abs(Thread.currentThread().hashCode() % hugeBufArray.length); + + AtomicReference bufRef = hugeBufArray[bufIndex]; + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (bufRef) { + HugeBuf hugeBuf = bufRef.get(); + + if (hugeBuf.nextIndex + capacity <= hugeBuf.buf.capacity()) { + // if the request capacity can be satisfied by the current hugeBuf, return a slice of it + int nextIndex = hugeBuf.nextIndex; + hugeBuf.nextIndex += capacity; + ByteBuf slice = hugeBuf.buf.retainedSlice(nextIndex, capacity); + return slice.writerIndex(slice.readerIndex()); + } + + // if the request capacity cannot be satisfied by the current hugeBuf + // 1. slice the remaining of the current hugeBuf and release the hugeBuf + // 2. create a new hugeBuf and slice the remaining of the required capacity + // 3. return the composite ByteBuf of the two slices + CompositeByteBuf cbf = DirectByteBufAlloc.compositeByteBuffer(); + int readLength = hugeBuf.buf.capacity() - hugeBuf.nextIndex; + cbf.addComponent(false, hugeBuf.buf.retainedSlice(hugeBuf.nextIndex, readLength)); + capacity -= readLength; + hugeBuf.buf.release(); + + HugeBuf newHugeBuf = new HugeBuf(DirectByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType)); + bufRef.set(newHugeBuf); + + cbf.addComponent(false, newHugeBuf.buf.retainedSlice(0, capacity)); + newHugeBuf.nextIndex = capacity; + + return cbf; + } + } + + static class HugeBuf { + final ByteBuf buf; + int nextIndex; + + HugeBuf(ByteBuf buf) { + this.buf = buf; + this.nextIndex = 0; + } + } + +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java index 0a54dedea..8df5e422c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java @@ -11,6 +11,7 @@ package com.automq.stream.s3; +import com.automq.stream.DirectByteBufSeqAlloc; import com.automq.stream.s3.model.StreamRecordBatch; import io.netty.buffer.ByteBuf; @@ -25,10 +26,12 @@ public class StreamRecordBatchCodec { + 8 // baseOffset + 4 // lastOffsetDelta + 4; // payload length + private static final DirectByteBufSeqAlloc ENCODE_ALLOC = new DirectByteBufSeqAlloc(ENCODE_RECORD); public static ByteBuf encode(StreamRecordBatch streamRecord) { int totalLength = HEADER_SIZE + streamRecord.size(); // payload - ByteBuf buf = DirectByteBufAlloc.byteBuffer(totalLength, ENCODE_RECORD); + // use sequential allocator to avoid memory fragmentation + ByteBuf buf = ENCODE_ALLOC.byteBuffer(totalLength); buf.writeByte(MAGIC_V0); buf.writeLong(streamRecord.getStreamId()); buf.writeLong(streamRecord.getEpoch()); diff --git a/s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java b/s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java new file mode 100644 index 000000000..5764e9304 --- /dev/null +++ b/s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java @@ -0,0 +1,72 @@ +/* + * Copyright 2024, AutoMQ CO.,LTD. + * + * Use of this software is governed by the Business Source License + * included in the file BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ +package com.automq.stream; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class DirectByteBufSeqAllocTest { + + @Test + public void testAlloc() { + DirectByteBufSeqAlloc alloc = new DirectByteBufSeqAlloc(0); + + AtomicReference bufRef = alloc.hugeBufArray[Math.abs(Thread.currentThread().hashCode() % alloc.hugeBufArray.length)]; + + ByteBuf buf1 = alloc.byteBuffer(12); + buf1.writeLong(1); + buf1.writeInt(2); + + ByteBuf buf2 = alloc.byteBuffer(20); + buf2.writeLong(3); + buf2.writeInt(4); + buf2.writeLong(5); + + ByteBuf buf3 = alloc.byteBuffer(DirectByteBufSeqAlloc.HUGE_BUF_SIZE - 12 - 20 - 4); + + ByteBuf oldHugeBuf = bufRef.get().buf; + + ByteBuf buf4 = alloc.byteBuffer(16); + buf4.writeLong(6); + buf4.writeLong(7); + + assertTrue(oldHugeBuf != bufRef.get().buf); + + assertEquals(1, buf1.readLong()); + assertEquals(2, buf1.readInt()); + assertEquals(3, buf2.readLong()); + assertEquals(4, buf2.readInt()); + assertEquals(5, buf2.readLong()); + assertInstanceOf(CompositeByteBuf.class, buf4); + assertEquals(6, buf4.readLong()); + assertEquals(7, buf4.readLong()); + + buf1.release(); + buf2.release(); + buf3.release(); + buf4.release(); + assertEquals(0, oldHugeBuf.refCnt()); + assertEquals(1, bufRef.get().buf.refCnt()); + + ByteBuf oldHugeBuf2 = bufRef.get().buf; + + alloc.byteBuffer(DirectByteBufSeqAlloc.HUGE_BUF_SIZE - 12).release(); + alloc.byteBuffer(12).release(); + assertEquals(0, oldHugeBuf2.refCnt()); + } + +}