Skip to content

Commit

Permalink
CXF-8931: HttpClientHTTPConduit can't disable the http chunk mode
Browse files Browse the repository at this point in the history
  • Loading branch information
reta committed Aug 20, 2024
1 parent c874b22 commit bc2a3af
Show file tree
Hide file tree
Showing 4 changed files with 424 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.cxf.transport.http;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -507,11 +510,11 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
static class HttpClientPipedOutputStream extends PipedOutputStream {
HttpClientWrappedOutputStream stream;
HTTPClientPolicy csPolicy;
HttpClientBodyPublisher publisher;
CloseableBodyPublisher publisher;
HttpClientPipedOutputStream(HttpClientWrappedOutputStream s,
PipedInputStream pin,
HTTPClientPolicy cp,
HttpClientBodyPublisher bp) throws IOException {
CloseableBodyPublisher bp) throws IOException {
super(pin);
stream = s;
csPolicy = cp;
Expand Down Expand Up @@ -582,45 +585,46 @@ public void close() throws IOException {
}
}
}
private static final class InputStreamSupplier implements Supplier<InputStream> {
final InputStream in;
InputStreamSupplier(InputStream i) {
in = i;
}

public InputStream get() {
return in;
}

/**
* The interface for {@link BodyPublisher}s that implement {@link Closeable} as well.
*/
private interface CloseableBodyPublisher extends BodyPublisher, Closeable {
}
private static final class HttpClientBodyPublisher implements BodyPublisher {
PipedInputStream pin;
HttpClientWrappedOutputStream stream;
long contentLen;

/**
* The {@link BodyPublisher} that wraps around the output stream.
*/
private static final class HttpClientBodyPublisher implements CloseableBodyPublisher {
private Supplier<InputStream> pin;
private HttpClientWrappedOutputStream stream;
private long contentLen;

private HttpClientBodyPublisher(HttpClientWrappedOutputStream s, PipedInputStream pin) {
private HttpClientBodyPublisher(HttpClientWrappedOutputStream s, Supplier<InputStream> pin) {
this.stream = s;
this.pin = pin;
}
synchronized void close() {

public synchronized void close() {
if (stream != null) {
contentLen = stream.contentLen;
stream = null;
}
}

@Override
public synchronized void subscribe(Subscriber<? super ByteBuffer> subscriber) {
if (stream != null) {
stream.connectionComplete = true;
contentLen = stream.contentLen;
if (stream.pout != null) {
synchronized (stream.pout) {
stream.pout.notifyAll();
stream.pout.notifyAll();
}
if (stream != null) {
contentLen = stream.contentLen;
}
BodyPublishers.ofInputStream(new InputStreamSupplier(pin)).subscribe(subscriber);
BodyPublishers.ofInputStream(pin).subscribe(subscriber);
stream = null;
pin = null;
return;
Expand All @@ -637,6 +641,154 @@ public long contentLength() {
return contentLen;
}
}

/**
* The {@link BodyPublisher} that awaits for the output stream to be fully flushed (closed)
* so the content length becomes known (sized). It is used when the chunked transfer is not allowed
* but the content length is not specified up-front.
*/
private static final class HttpClientSizedBodyPublisher implements CloseableBodyPublisher {
private HTTPClientPolicy csPolicy;
private Supplier<ByteArrayInputStream> pin;
private HttpClientWrappedOutputStream stream;
private long contentLen;

private HttpClientSizedBodyPublisher(HttpClientWrappedOutputStream s, HTTPClientPolicy cs,
Supplier<ByteArrayInputStream> pin) {
this.stream = s;
this.csPolicy = cs;
this.pin = pin;
}

public synchronized void close() {
if (stream != null) {
contentLen = stream.contentLen;
stream = null;
}
}

@Override
public synchronized void subscribe(Subscriber<? super ByteBuffer> subscriber) {
if (stream != null) {
stream.connectionComplete = true;
if (stream.pout != null) {
synchronized (stream.pout) {
stream.pout.notifyAll();
}

BodyPublishers.ofInputStream(pin).subscribe(subscriber);
stream = null;
pin = null;
return;
}
}
BodyPublishers.noBody().subscribe(subscriber);
}

@Override
public long contentLength() {
if (stream != null && stream.pout != null) {
final CloseableByteArrayOutputStream baos = (CloseableByteArrayOutputStream) stream.pout;

try {
synchronized (baos) {
if (!baos.closed) {
baos.wait(csPolicy.getConnectionTimeout());
}
}
contentLen = (int) baos.size();
} catch (InterruptedException e) {
//ignore
}
}

return contentLen;
}
}

/**
* The {@link ByteArrayOutputStream} implementation that tracks the closeability state.
*/
private static final class CloseableByteArrayOutputStream extends ByteArrayOutputStream {
private boolean closed;

/**
* Creates a new output stream for user data
*/
CloseableByteArrayOutputStream() {
super(4096);
}

/**
* Writes the specified byte to this output stream.
*
* @param b the byte to be written.
*/
public synchronized void write(int b) {
if (closed) {
return;
}
super.write(b);
}

/**
* Writes <code>len</code> bytes from the specified byte array
* starting at offset <code>off</code> to this output stream.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
*/
public synchronized void write(byte[] b, int off, int len) {
if (closed) {
return;
}
super.write(b, off, len);
}

/**
* Resets the <code>count</code> field of this output
* stream to zero, so that all currently accumulated output in the
* output stream is discarded. The output stream can be used again,
* reusing the already allocated buffer space. If the output stream
* has been closed, then this method has no effect.
*
* @see java.io.ByteArrayInputStream#count
*/
public synchronized void reset() {
if (closed) {
return;
}
super.reset();
}

/**
* After close() has been called, it is no longer possible to write
* to this stream. Further calls to write will have no effect.
*/
public synchronized void close() throws IOException {
closed = true;
super.close();
notifyAll();
}

/**
* Returns new instance of the {@link ByteArrayInputStream} that uses the same underlying buffer as
* this stream. The steam must be closed in order to ensure no further modifications could happen.
* @return new instance of the {@link ByteArrayInputStream}
*/
public ByteArrayInputStream getInputStream() {
if (!closed) {
throw new IllegalStateException("The stream is not closed and underlying buffer "
+ "could still be changed");
}

// Creates new ByteArrayInputStream instance that respects the current state of the buffer
// (since ByteArrayInputStream::toByteArray() does array copy).
return new ByteArrayInputStream(this.buf, 0, this.count);
}
}

class HttpClientWrappedOutputStream extends WrappedOutputStream {

List<Flow.Subscriber<? super ByteBuffer>> subscribers = new LinkedList<>();
Expand All @@ -645,8 +797,8 @@ class HttpClientWrappedOutputStream extends WrappedOutputStream {
int rtimeout;
volatile Throwable exception;
volatile boolean connectionComplete;
PipedOutputStream pout;
HttpClientBodyPublisher publisher;
OutputStream pout;
CloseableBodyPublisher publisher;
HttpRequest request;


Expand Down Expand Up @@ -791,12 +943,20 @@ protected void setProtocolHeaders() throws IOException {
contentLen = 0;
}

final PipedInputStream pin = new PipedInputStream(csPolicy.getChunkLength() <= 0
? 4096 : csPolicy.getChunkLength());

this.publisher = new HttpClientBodyPublisher(this, pin);
if (contentLen != 0) {
pout = new HttpClientPipedOutputStream(this, pin, csPolicy, publisher);
if (csPolicy.isAllowChunking() || contentLen >= 0) {
final PipedInputStream pin = new PipedInputStream(csPolicy.getChunkLength() <= 0
? 4096 : csPolicy.getChunkLength());
this.publisher = new HttpClientBodyPublisher(this, () -> pin);
if (contentLen != 0) {
pout = new HttpClientPipedOutputStream(this, pin, csPolicy, publisher);
}
} else if (contentLen != 0) {
// If chunking is not allowed but the contentLen is unknown (-1), we need to
// buffer the request body stream until it is fully flushed by the client and only
// than send the request.
final CloseableByteArrayOutputStream baos = new CloseableByteArrayOutputStream();
this.publisher = new HttpClientSizedBodyPublisher(this, csPolicy, baos::getInputStream);
pout = baos;
}

HttpRequest.Builder rb = HttpRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cxf.systest.http;

import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import jakarta.activation.DataHandler;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.container.Suspended;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.Response.Status;
import jakarta.ws.rs.core.UriInfo;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.jaxrs.ext.multipart.Attachment;
import org.apache.cxf.jaxrs.ext.multipart.MultipartBody;

@Path("/file-store")
public class FileStore {
private final ConcurrentMap<String, byte[]> store = new ConcurrentHashMap<>();
private final ExecutorService executor = Executors.newFixedThreadPool(2);
@Context private HttpHeaders headers;

@POST
@Consumes("multipart/form-data")
public void addBook(@QueryParam("chunked") boolean chunked,
@Suspended final AsyncResponse response, @Context final UriInfo uri, final MultipartBody body) {

String transferEncoding = headers.getHeaderString("Transfer-Encoding");
if (chunked != Objects.equals("chunked", transferEncoding)) {
response.resume(Response.status(Status.EXPECTATION_FAILED).build());
return;
}

executor.submit(new Runnable() {
public void run() {
for (final Attachment attachment: body.getAllAttachments()) {
final DataHandler handler = attachment.getDataHandler();

if (handler != null) {
final String source = handler.getName();
if (StringUtils.isEmpty(source)) {
response.resume(Response.status(Status.BAD_REQUEST).build());
return;
}

try {
if (store.containsKey(source)) {
response.resume(Response.status(Status.CONFLICT).build());
return;
}

final byte[] content = IOUtils.readBytesFromStream(handler.getInputStream());
if (store.putIfAbsent(source, content) != null) {
response.resume(Response.status(Status.CONFLICT).build());
return;
}
} catch (final Exception ex) {
response.resume(Response.serverError().build());
}

if (response.isSuspended()) {
response.resume(Response.created(uri.getRequestUriBuilder().path(source).build())
.build());
}
}
}

if (response.isSuspended()) {
response.resume(Response.status(Status.BAD_REQUEST).build());
}
}
});
}
}
Loading

0 comments on commit bc2a3af

Please sign in to comment.