diff --git a/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java b/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java
new file mode 100644
index 000000000..59ae5e351
--- /dev/null
+++ b/s3stream/src/main/java/com/automq/stream/DirectByteBufSeqAlloc.java
@@ -0,0 +1,82 @@
+ * Copyright 2024, AutoMQ CO.,LTD.
+ *
+ * Use of this software is governed by the Business Source License
+ * included in the file BSL.md
+ *
+ * As of the Change Date specified in that file, in accordance with
+ * the Business Source License, use of this software will be governed
+ * by the Apache License, Version 2.0
+ */
+package com.automq.stream;
+import com.automq.stream.s3.DirectByteBufAlloc;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import java.util.concurrent.atomic.AtomicReference;
+public class DirectByteBufSeqAlloc {
+    public static final int HUGE_BUF_SIZE = 8 * 1024 * 1024;
+    // why not use ThreadLocal? the partition open has too much threads
+    final AtomicReference<HugeBuf>[] hugeBufArray = new AtomicReference[8];
+    private final int allocType;
+    public DirectByteBufSeqAlloc(int allocType) {
+        this.allocType = allocType;
+        for (int i = 0; i < hugeBufArray.length; i++) {
+            hugeBufArray[i] = new AtomicReference<>(new HugeBuf(DirectByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType)));
+        }
+    }
+    public ByteBuf byteBuffer(int capacity) {
+        if (capacity >= HUGE_BUF_SIZE) {
+            // if the request capacity is larger than HUGE_BUF_SIZE, just allocate a new ByteBuf
+            return DirectByteBufAlloc.byteBuffer(capacity, allocType);
+        }
+        int bufIndex = Math.abs(Thread.currentThread().hashCode() % hugeBufArray.length);
+        AtomicReference<HugeBuf> bufRef = hugeBufArray[bufIndex];
+        //noinspection SynchronizationOnLocalVariableOrMethodParameter
+        synchronized (bufRef) {
+            HugeBuf hugeBuf = bufRef.get();
+            if (hugeBuf.nextIndex + capacity <= hugeBuf.buf.capacity()) {
+                // if the request capacity can be satisfied by the current hugeBuf, return a slice of it
+                int nextIndex = hugeBuf.nextIndex;
+                hugeBuf.nextIndex += capacity;
+                ByteBuf slice = hugeBuf.buf.retainedSlice(nextIndex, capacity);
+                return slice.writerIndex(slice.readerIndex());
+            }
+            // if the request capacity cannot be satisfied by the current hugeBuf
+            // 1. slice the remaining of the current hugeBuf and release the hugeBuf
+            // 2. create a new hugeBuf and slice the remaining of the required capacity
+            // 3. return the composite ByteBuf of the two slices
+            CompositeByteBuf cbf = DirectByteBufAlloc.compositeByteBuffer();
+            int readLength = hugeBuf.buf.capacity() - hugeBuf.nextIndex;
+            cbf.addComponent(false, hugeBuf.buf.retainedSlice(hugeBuf.nextIndex, readLength));
+            capacity -= readLength;
+            hugeBuf.buf.release();
+            HugeBuf newHugeBuf = new HugeBuf(DirectByteBufAlloc.byteBuffer(HUGE_BUF_SIZE, allocType));
+            bufRef.set(newHugeBuf);
+            cbf.addComponent(false, newHugeBuf.buf.retainedSlice(0, capacity));
+            newHugeBuf.nextIndex = capacity;
+            return cbf;
+        }
+    }
+    static class HugeBuf {
+        final ByteBuf buf;
+        int nextIndex;
+        HugeBuf(ByteBuf buf) {
+            this.buf = buf;
+            this.nextIndex = 0;
+        }
+    }
diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java
index 0a54dedea..8df5e422c 100644
--- a/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java
+++ b/s3stream/src/main/java/com/automq/stream/s3/StreamRecordBatchCodec.java
@@ -11,6 +11,7 @@
 package com.automq.stream.s3;
+import com.automq.stream.DirectByteBufSeqAlloc;
 import com.automq.stream.s3.model.StreamRecordBatch;
 import io.netty.buffer.ByteBuf;
@@ -25,10 +26,12 @@ public class StreamRecordBatchCodec {
             + 8 // baseOffset
             + 4 // lastOffsetDelta
             + 4; // payload length
+    private static final DirectByteBufSeqAlloc ENCODE_ALLOC = new DirectByteBufSeqAlloc(ENCODE_RECORD);
     public static ByteBuf encode(StreamRecordBatch streamRecord) {
         int totalLength = HEADER_SIZE + streamRecord.size(); // payload
-        ByteBuf buf = DirectByteBufAlloc.byteBuffer(totalLength, ENCODE_RECORD);
+        // use sequential allocator to avoid memory fragmentation
+        ByteBuf buf = ENCODE_ALLOC.byteBuffer(totalLength);
diff --git a/s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java b/s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java
new file mode 100644
index 000000000..5764e9304
--- /dev/null
+++ b/s3stream/src/test/java/com/automq/stream/DirectByteBufSeqAllocTest.java
@@ -0,0 +1,72 @@
+ * Copyright 2024, AutoMQ CO.,LTD.
+ *
+ * Use of this software is governed by the Business Source License
+ * included in the file BSL.md
+ *
+ * As of the Change Date specified in that file, in accordance with
+ * the Business Source License, use of this software will be governed
+ * by the Apache License, Version 2.0
+ */
+package com.automq.stream;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
+import java.util.concurrent.atomic.AtomicReference;
+import org.junit.jupiter.api.Test;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+public class DirectByteBufSeqAllocTest {
+    @Test
+    public void testAlloc() {
+        DirectByteBufSeqAlloc alloc = new DirectByteBufSeqAlloc(0);
+        AtomicReference<DirectByteBufSeqAlloc.HugeBuf> bufRef = alloc.hugeBufArray[Math.abs(Thread.currentThread().hashCode() % alloc.hugeBufArray.length)];
+        ByteBuf buf1 = alloc.byteBuffer(12);
+        buf1.writeLong(1);
+        buf1.writeInt(2);
+        ByteBuf buf2 = alloc.byteBuffer(20);
+        buf2.writeLong(3);
+        buf2.writeInt(4);
+        buf2.writeLong(5);
+        ByteBuf buf3 = alloc.byteBuffer(DirectByteBufSeqAlloc.HUGE_BUF_SIZE - 12 - 20 - 4);
+        ByteBuf oldHugeBuf = bufRef.get().buf;
+        ByteBuf buf4 = alloc.byteBuffer(16);
+        buf4.writeLong(6);
+        buf4.writeLong(7);
+        assertTrue(oldHugeBuf != bufRef.get().buf);
+        assertEquals(1, buf1.readLong());
+        assertEquals(2, buf1.readInt());
+        assertEquals(3, buf2.readLong());
+        assertEquals(4, buf2.readInt());
+        assertEquals(5, buf2.readLong());
+        assertInstanceOf(CompositeByteBuf.class, buf4);
+        assertEquals(6, buf4.readLong());
+        assertEquals(7, buf4.readLong());
+        buf1.release();
+        buf2.release();
+        buf3.release();
+        buf4.release();
+        assertEquals(0, oldHugeBuf.refCnt());
+        assertEquals(1, bufRef.get().buf.refCnt());
+        ByteBuf oldHugeBuf2 = bufRef.get().buf;
+        alloc.byteBuffer(DirectByteBufSeqAlloc.HUGE_BUF_SIZE - 12).release();
+        alloc.byteBuffer(12).release();
+        assertEquals(0, oldHugeBuf2.refCnt());
+    }