Skip to content

Commit 65d8b1f

Browse files
authored
Merge pull request #1364 from carterkozak/ckozak/UNDERTOW-2142-batch-chunk-proposal
[UNDERTOW-2638] Proposal for a multi-buffer ChunkedStreamSinkConduit.write implementation
2 parents b05669b + dae2a96 commit 65d8b1f

File tree

1 file changed

+106
-9
lines changed

1 file changed

+106
-9
lines changed

core/src/main/java/io/undertow/conduits/ChunkedStreamSinkConduit.java

Lines changed: 106 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import io.undertow.util.HeaderValues;
3636
import io.undertow.util.Headers;
3737
import io.undertow.util.ImmediatePooledByteBuffer;
38+
import org.xnio.Buffers;
3839
import org.xnio.IoUtils;
3940
import io.undertow.connector.ByteBufferPool;
4041
import io.undertow.connector.PooledByteBuffer;
@@ -128,6 +129,109 @@ public int write(final ByteBuffer src) throws IOException {
128129
return doWrite(src);
129130
}
130131

132+
long doWrite(final ByteBuffer[] srcs, int offset, int length) throws IOException {
133+
if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
134+
throw new ClosedChannelException();
135+
}
136+
// Write as many buffers as possible without a chunk-size overflowing an integer.
137+
long totalRemaining = 0;
138+
for (int i = 0; i < length; i++) {
139+
ByteBuffer buf = srcs[i + offset];
140+
int remaining = buf.remaining();
141+
if (totalRemaining + remaining > Integer.MAX_VALUE) {
142+
// Avoid producing chunks too large for clients by reducing the number of buffers
143+
// until total remaining fits within a 32-bit signed integer value. This is safe
144+
// because a single java ByteBuffer has a capacity represented by an integer.
145+
length = i;
146+
break;
147+
}
148+
totalRemaining += remaining;
149+
}
150+
if(totalRemaining == 0) {
151+
return 0;
152+
}
153+
int remaining = (int) totalRemaining;
154+
this.state |= FLAG_FIRST_DATA_WRITTEN;
155+
int oldLimit = srcs[length - 1].limit();
156+
boolean dataRemaining = false; //set to true if there is data in src that still needs to be written out
157+
if (chunkleft == 0 && !chunkingSepBuffer.hasRemaining()) {
158+
chunkingBuffer.clear();
159+
putIntAsHexString(chunkingBuffer, remaining);
160+
chunkingBuffer.put(CRLF);
161+
chunkingBuffer.flip();
162+
chunkingSepBuffer.clear();
163+
chunkingSepBuffer.put(CRLF);
164+
chunkingSepBuffer.flip();
165+
state |= FLAG_WRITTEN_FIRST_CHUNK;
166+
chunkleft = remaining;
167+
} else {
168+
int maxRemaining = chunkleft;
169+
for (int i = 0; i < length; i++) {
170+
ByteBuffer buf = srcs[offset + i];
171+
int bufRemaining = buf.remaining();
172+
if (bufRemaining >= maxRemaining) {
173+
length = i + 1;
174+
oldLimit = buf.limit();
175+
dataRemaining = true;
176+
buf.limit(buf.position() + maxRemaining);
177+
break;
178+
}
179+
maxRemaining -= bufRemaining;
180+
}
181+
}
182+
try {
183+
int chunkingSize = chunkingBuffer.remaining();
184+
int chunkingSepSize = chunkingSepBuffer.remaining();
185+
if (chunkingSize > 0 || chunkingSepSize > 0 || lastChunkBuffer != null) {
186+
int originalRemaining = (int) Buffers.remaining(srcs, offset, length);
187+
long result;
188+
if (lastChunkBuffer == null || dataRemaining) {
189+
// chunkingBuffer
190+
// srcs (taking into account offset+length)
191+
// chunkingSepBuffer
192+
final ByteBuffer[] buf = new ByteBuffer[2 + length];
193+
buf[0] = chunkingBuffer;
194+
System.arraycopy(srcs, offset , buf, 1, length);
195+
buf[length + 1] = chunkingSepBuffer;
196+
result = next.write(buf, 0, buf.length);
197+
} else {
198+
// chunkingBuffer
199+
// srcs (taking into account offset+length)
200+
// lastChunkBuffer
201+
final ByteBuffer[] buf = new ByteBuffer[2 + length];
202+
buf[0] = chunkingBuffer;
203+
System.arraycopy(srcs, offset , buf, 1, length);
204+
buf[length + 1] = lastChunkBuffer.getBuffer();
205+
if (anyAreSet(state, CONF_FLAG_PASS_CLOSE)) {
206+
result = next.writeFinal(buf, 0, buf.length);
207+
} else {
208+
result = next.write(buf, 0, buf.length);
209+
}
210+
if (Buffers.remaining(srcs, offset, length) == 0) {
211+
state |= FLAG_WRITES_SHUTDOWN;
212+
}
213+
if (!lastChunkBuffer.getBuffer().hasRemaining()) {
214+
state |= FLAG_NEXT_SHUTDOWN;
215+
lastChunkBuffer.close();
216+
}
217+
}
218+
int srcWritten = originalRemaining - (int) Buffers.remaining(srcs, offset, length);
219+
chunkleft -= srcWritten;
220+
if (result < chunkingSize) {
221+
return 0;
222+
} else {
223+
return srcWritten;
224+
}
225+
} else {
226+
long result = next.write(srcs, offset, length);
227+
chunkleft -= result;
228+
return result;
229+
230+
}
231+
} finally {
232+
srcs[length - 1].limit(oldLimit);
233+
}
234+
}
131235

132236
int doWrite(final ByteBuffer src) throws IOException {
133237
if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
@@ -195,7 +299,6 @@ int doWrite(final ByteBuffer src) throws IOException {
195299
} finally {
196300
src.limit(oldLimit);
197301
}
198-
199302
}
200303

201304
@Override
@@ -217,13 +320,7 @@ public void truncateWrites() throws IOException {
217320

218321
@Override
219322
public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
220-
for (int i = 0; i < length; i++) {
221-
ByteBuffer srcBuffer = srcs[offset + i];
222-
if (srcBuffer.hasRemaining()) {
223-
return write(srcBuffer);
224-
}
225-
}
226-
return 0;
323+
return doWrite(srcs, offset, length);
227324
}
228325

229326
@Override
@@ -382,7 +479,7 @@ private void createLastChunk(final boolean writeFinal) throws UnsupportedEncodin
382479
lastChunkBuffer.put(CRLF);
383480
}
384481
//horrible hack
385-
//there is a situation where we can get a buffer leak here if the connection is terminated abnormaly
482+
//there is a situation where we can get a buffer leak here if the connection is terminated abnormally
386483
//this should be fixed once this channel has its lifecycle tied to the connection, same as fixed length
387484
lastChunkBuffer.flip();
388485
ByteBuffer data = ByteBuffer.allocate(lastChunkBuffer.remaining());

0 commit comments

Comments
 (0)