Skip to content

Commit

Permalink
CXF-8931: HttpClientHTTPConduit can't disable the http chunk mode
Browse files Browse the repository at this point in the history
  • Loading branch information
reta committed Aug 26, 2024
1 parent ad9ba43 commit 770b387
Show file tree
Hide file tree
Showing 4 changed files with 478 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.cxf.transport.http;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -507,11 +510,11 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
static class HttpClientPipedOutputStream extends PipedOutputStream {
HttpClientWrappedOutputStream stream;
HTTPClientPolicy csPolicy;
HttpClientBodyPublisher publisher;
CloseableBodyPublisher publisher;
HttpClientPipedOutputStream(HttpClientWrappedOutputStream s,
PipedInputStream pin,
HTTPClientPolicy cp,
HttpClientBodyPublisher bp) throws IOException {
CloseableBodyPublisher bp) throws IOException {
super(pin);
stream = s;
csPolicy = cp;
Expand Down Expand Up @@ -582,45 +585,46 @@ public void close() throws IOException {
}
}
}
private static final class InputStreamSupplier implements Supplier<InputStream> {
final InputStream in;
InputStreamSupplier(InputStream i) {
in = i;
}

public InputStream get() {
return in;
}

/**
* The interface for {@link BodyPublisher}s that implement {@link Closeable} as well.
*/
private interface CloseableBodyPublisher extends BodyPublisher, Closeable {
}
private static final class HttpClientBodyPublisher implements BodyPublisher {
PipedInputStream pin;
HttpClientWrappedOutputStream stream;
long contentLen;

/**
* The {@link BodyPublisher} that wraps around the output stream.
*/
private static final class HttpClientBodyPublisher implements CloseableBodyPublisher {
private Supplier<InputStream> pin;
private HttpClientWrappedOutputStream stream;
private long contentLen;

private HttpClientBodyPublisher(HttpClientWrappedOutputStream s, PipedInputStream pin) {
private HttpClientBodyPublisher(HttpClientWrappedOutputStream s, Supplier<InputStream> pin) {
this.stream = s;
this.pin = pin;
}
synchronized void close() {

public synchronized void close() {
if (stream != null) {
contentLen = stream.contentLen;
stream = null;
}
}

@Override
public synchronized void subscribe(Subscriber<? super ByteBuffer> subscriber) {
if (stream != null) {
stream.connectionComplete = true;
contentLen = stream.contentLen;
if (stream.pout != null) {
synchronized (stream.pout) {
stream.pout.notifyAll();
stream.pout.notifyAll();
}
if (stream != null) {
contentLen = stream.contentLen;
}
BodyPublishers.ofInputStream(new InputStreamSupplier(pin)).subscribe(subscriber);
BodyPublishers.ofInputStream(pin).subscribe(subscriber);
stream = null;
pin = null;
return;
Expand All @@ -637,6 +641,154 @@ public long contentLength() {
return contentLen;
}
}

/**
* The {@link BodyPublisher} that awaits for the output stream to be fully flushed (closed)
* so the content length becomes known (sized). It is used when the chunked transfer is not allowed
* but the content length is not specified up-front.
*/
private static final class HttpClientSizedBodyPublisher implements CloseableBodyPublisher {
private HTTPClientPolicy csPolicy;
private Supplier<ByteArrayInputStream> pin;
private HttpClientWrappedOutputStream stream;
private long contentLen;

private HttpClientSizedBodyPublisher(HttpClientWrappedOutputStream s, HTTPClientPolicy cs,
Supplier<ByteArrayInputStream> pin) {
this.stream = s;
this.csPolicy = cs;
this.pin = pin;
}

public synchronized void close() {
if (stream != null) {
contentLen = stream.contentLen;
stream = null;
}
}

@Override
public synchronized void subscribe(Subscriber<? super ByteBuffer> subscriber) {
if (stream != null) {
stream.connectionComplete = true;
if (stream.pout != null) {
synchronized (stream.pout) {
stream.pout.notifyAll();
}

BodyPublishers.ofInputStream(pin).subscribe(subscriber);
stream = null;
pin = null;
return;
}
}
BodyPublishers.noBody().subscribe(subscriber);
}

@Override
public long contentLength() {
if (stream != null && stream.pout != null) {
final CloseableByteArrayOutputStream baos = (CloseableByteArrayOutputStream) stream.pout;

try {
synchronized (baos) {
if (!baos.closed) {
baos.wait(csPolicy.getConnectionTimeout());
}
}
contentLen = (int) baos.size();
} catch (InterruptedException e) {
//ignore
}
}

return contentLen;
}
}

/**
* The {@link ByteArrayOutputStream} implementation that tracks the closeability state.
*/
private static final class CloseableByteArrayOutputStream extends ByteArrayOutputStream {
private boolean closed;

/**
* Creates a new output stream for user data
*/
CloseableByteArrayOutputStream() {
super(4096);
}

/**
* Writes the specified byte to this output stream.
*
* @param b the byte to be written.
*/
public synchronized void write(int b) {
if (closed) {
return;
}
super.write(b);
}

/**
* Writes <code>len</code> bytes from the specified byte array
* starting at offset <code>off</code> to this output stream.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
*/
public synchronized void write(byte[] b, int off, int len) {
if (closed) {
return;
}
super.write(b, off, len);
}

/**
* Resets the <code>count</code> field of this output
* stream to zero, so that all currently accumulated output in the
* output stream is discarded. The output stream can be used again,
* reusing the already allocated buffer space. If the output stream
* has been closed, then this method has no effect.
*
* @see java.io.ByteArrayInputStream#count
*/
public synchronized void reset() {
if (closed) {
return;
}
super.reset();
}

/**
* After close() has been called, it is no longer possible to write
* to this stream. Further calls to write will have no effect.
*/
public synchronized void close() throws IOException {
closed = true;
super.close();
notifyAll();
}

/**
* Returns new instance of the {@link ByteArrayInputStream} that uses the same underlying buffer as
* this stream. The steam must be closed in order to ensure no further modifications could happen.
* @return new instance of the {@link ByteArrayInputStream}
*/
public ByteArrayInputStream getInputStream() {
if (!closed) {
throw new IllegalStateException("The stream is not closed and underlying buffer "
+ "could still be changed");
}

// Creates new ByteArrayInputStream instance that respects the current state of the buffer
// (since ByteArrayInputStream::toByteArray() does array copy).
return new ByteArrayInputStream(this.buf, 0, this.count);
}
}

class HttpClientWrappedOutputStream extends WrappedOutputStream {

List<Flow.Subscriber<? super ByteBuffer>> subscribers = new LinkedList<>();
Expand All @@ -645,8 +797,8 @@ class HttpClientWrappedOutputStream extends WrappedOutputStream {
int rtimeout;
volatile Throwable exception;
volatile boolean connectionComplete;
PipedOutputStream pout;
HttpClientBodyPublisher publisher;
OutputStream pout;
CloseableBodyPublisher publisher;
HttpRequest request;


Expand Down Expand Up @@ -791,12 +943,20 @@ protected void setProtocolHeaders() throws IOException {
contentLen = 0;
}

final PipedInputStream pin = new PipedInputStream(csPolicy.getChunkLength() <= 0
? 4096 : csPolicy.getChunkLength());

this.publisher = new HttpClientBodyPublisher(this, pin);
if (contentLen != 0) {
pout = new HttpClientPipedOutputStream(this, pin, csPolicy, publisher);
if (csPolicy.isAllowChunking() || contentLen >= 0) {
final PipedInputStream pin = new PipedInputStream(csPolicy.getChunkLength() <= 0
? 4096 : csPolicy.getChunkLength());
this.publisher = new HttpClientBodyPublisher(this, () -> pin);
if (contentLen != 0) {
pout = new HttpClientPipedOutputStream(this, pin, csPolicy, publisher);
}
} else if (contentLen != 0) {
// If chunking is not allowed but the contentLen is unknown (-1), we need to
// buffer the request body stream until it is fully flushed by the client and only
// than send the request.
final CloseableByteArrayOutputStream baos = new CloseableByteArrayOutputStream();
this.publisher = new HttpClientSizedBodyPublisher(this, csPolicy, baos::getInputStream);
pout = baos;
}

HttpRequest.Builder rb = HttpRequest.newBuilder()
Expand Down
Loading

0 comments on commit 770b387

Please sign in to comment.