From 9465f5cb514d651d441c96318d85eefbf1f6d0ae Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Mon, 19 Aug 2024 17:04:21 -0400 Subject: [PATCH] CXF-8931: HttpClientHTTPConduit can't disable the http chunk mode --- .../transport/http/HttpClientHTTPConduit.java | 216 +++++++++++++++--- .../apache/cxf/systest/http/FileStore.java | 95 ++++++++ .../cxf/systest/http/FileStoreServer.java | 47 ++++ .../systest/http/HTTPConduitChunkingTest.java | 87 +++++++ 4 files changed, 417 insertions(+), 28 deletions(-) create mode 100644 systests/transports/src/test/java/org/apache/cxf/systest/http/FileStore.java create mode 100644 systests/transports/src/test/java/org/apache/cxf/systest/http/FileStoreServer.java create mode 100644 systests/transports/src/test/java/org/apache/cxf/systest/http/HTTPConduitChunkingTest.java diff --git a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java index 0c7be9e1904..284458e60fb 100644 --- a/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java +++ b/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HttpClientHTTPConduit.java @@ -18,6 +18,9 @@ */ package org.apache.cxf.transport.http; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; @@ -507,11 +510,11 @@ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { static class HttpClientPipedOutputStream extends PipedOutputStream { HttpClientWrappedOutputStream stream; HTTPClientPolicy csPolicy; - HttpClientBodyPublisher publisher; + CloseableBodyPublisher publisher; HttpClientPipedOutputStream(HttpClientWrappedOutputStream s, PipedInputStream pin, HTTPClientPolicy cp, - HttpClientBodyPublisher bp) throws IOException { + CloseableBodyPublisher bp) throws IOException { super(pin); stream = s; csPolicy = cp; @@ -582,32 +585,33 @@ public void close() throws IOException { } } } - private static final class InputStreamSupplier implements Supplier { - final InputStream in; - InputStreamSupplier(InputStream i) { - in = i; - } - - public InputStream get() { - return in; - } + + /** + * The interface for {@link BodyPublisher}s that implement {@link Closeable} as well. + */ + private interface CloseableBodyPublisher extends BodyPublisher, Closeable { } - private static final class HttpClientBodyPublisher implements BodyPublisher { - PipedInputStream pin; - HttpClientWrappedOutputStream stream; - long contentLen; + + /** + * The {@link BodyPublisher} that wraps around the output stream. + */ + private static final class HttpClientBodyPublisher implements CloseableBodyPublisher { + private Supplier pin; + private HttpClientWrappedOutputStream stream; + private long contentLen; - private HttpClientBodyPublisher(HttpClientWrappedOutputStream s, PipedInputStream pin) { + private HttpClientBodyPublisher(HttpClientWrappedOutputStream s, Supplier pin) { this.stream = s; this.pin = pin; } - synchronized void close() { + + public synchronized void close() { if (stream != null) { contentLen = stream.contentLen; stream = null; } } - + @Override public synchronized void subscribe(Subscriber subscriber) { if (stream != null) { @@ -615,12 +619,12 @@ public synchronized void subscribe(Subscriber subscriber) { contentLen = stream.contentLen; if (stream.pout != null) { synchronized (stream.pout) { - stream.pout.notifyAll(); + stream.pout.notifyAll(); } if (stream != null) { contentLen = stream.contentLen; } - BodyPublishers.ofInputStream(new InputStreamSupplier(pin)).subscribe(subscriber); + BodyPublishers.ofInputStream(pin).subscribe(subscriber); stream = null; pin = null; return; @@ -637,6 +641,154 @@ public long contentLength() { return contentLen; } } + + /** + * The {@link BodyPublisher} that awaits for the output stream to be fully flushed (closed) + * so the content length becomes known (sized). It is used when the chunked transfer is not allowed + * but the content length is not specified up-front. + */ + private static final class HttpClientSizedBodyPublisher implements CloseableBodyPublisher { + private HTTPClientPolicy csPolicy; + private Supplier pin; + private HttpClientWrappedOutputStream stream; + private long contentLen; + + private HttpClientSizedBodyPublisher(HttpClientWrappedOutputStream s, HTTPClientPolicy cs, + Supplier pin) { + this.stream = s; + this.csPolicy = cs; + this.pin = pin; + } + + public synchronized void close() { + if (stream != null) { + contentLen = stream.contentLen; + stream = null; + } + } + + @Override + public synchronized void subscribe(Subscriber subscriber) { + if (stream != null) { + stream.connectionComplete = true; + if (stream.pout != null) { + synchronized (stream.pout) { + stream.pout.notifyAll(); + } + + BodyPublishers.ofInputStream(pin).subscribe(subscriber); + stream = null; + pin = null; + return; + } + } + BodyPublishers.noBody().subscribe(subscriber); + } + + @Override + public long contentLength() { + if (stream != null && stream.pout != null) { + final CloseableByteArrayOutputStream baos = (CloseableByteArrayOutputStream) stream.pout; + + try { + synchronized (baos) { + if (!baos.closed) { + baos.wait(csPolicy.getConnectionTimeout()); + } + } + contentLen = (int) baos.size(); + } catch (InterruptedException e) { + //ignore + } + } + + return contentLen; + } + } + + /** + * The {@link ByteArrayOutputStream} implementation that tracks the closeability state. + */ + private static final class CloseableByteArrayOutputStream extends ByteArrayOutputStream { + private boolean closed; + + /** + * Creates a new output stream for user data + */ + CloseableByteArrayOutputStream() { + super(4096); + } + + /** + * Writes the specified byte to this output stream. + * + * @param b the byte to be written. + */ + public synchronized void write(int b) { + if (closed) { + return; + } + super.write(b); + } + + /** + * Writes len bytes from the specified byte array + * starting at offset off to this output stream. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + */ + public synchronized void write(byte[] b, int off, int len) { + if (closed) { + return; + } + super.write(b, off, len); + } + + /** + * Resets the count field of this output + * stream to zero, so that all currently accumulated output in the + * output stream is discarded. The output stream can be used again, + * reusing the already allocated buffer space. If the output stream + * has been closed, then this method has no effect. + * + * @see java.io.ByteArrayInputStream#count + */ + public synchronized void reset() { + if (closed) { + return; + } + super.reset(); + } + + /** + * After close() has been called, it is no longer possible to write + * to this stream. Further calls to write will have no effect. + */ + public synchronized void close() throws IOException { + closed = true; + super.close(); + notifyAll(); + } + + /** + * Returns new instance of the {@link ByteArrayInputStream} that uses the same underlying buffer as + * this stream. The steam must be closed in order to ensure no further modifications could happen. + * @return new instance of the {@link ByteArrayInputStream} + */ + public ByteArrayInputStream getInputStream() { + if (!closed) { + throw new IllegalStateException("The stream is not closed and underlying buffer " + + "could still be changed"); + } + + // Creates new ByteArrayInputStream instance that respects the current state of the buffer + // (since ByteArrayInputStream::toByteArray() does array copy). + return new ByteArrayInputStream(this.buf, 0, this.count); + } + } + class HttpClientWrappedOutputStream extends WrappedOutputStream { List> subscribers = new LinkedList<>(); @@ -645,8 +797,8 @@ class HttpClientWrappedOutputStream extends WrappedOutputStream { int rtimeout; volatile Throwable exception; volatile boolean connectionComplete; - PipedOutputStream pout; - HttpClientBodyPublisher publisher; + OutputStream pout; + CloseableBodyPublisher publisher; HttpRequest request; @@ -791,12 +943,20 @@ protected void setProtocolHeaders() throws IOException { contentLen = 0; } - final PipedInputStream pin = new PipedInputStream(csPolicy.getChunkLength() <= 0 - ? 4096 : csPolicy.getChunkLength()); - - this.publisher = new HttpClientBodyPublisher(this, pin); - if (contentLen != 0) { - pout = new HttpClientPipedOutputStream(this, pin, csPolicy, publisher); + if (csPolicy.isAllowChunking() || contentLen >= 0) { + final PipedInputStream pin = new PipedInputStream(csPolicy.getChunkLength() <= 0 + ? 4096 : csPolicy.getChunkLength()); + this.publisher = new HttpClientBodyPublisher(this, () -> pin); + if (contentLen != 0) { + pout = new HttpClientPipedOutputStream(this, pin, csPolicy, publisher); + } + } else if (contentLen != 0) { + // If chunking is not allowed but the contentLen is unknown (-1), we need to + // buffer the request body stream until it is fully flushed by the client and only + // than send the request. + final CloseableByteArrayOutputStream baos = new CloseableByteArrayOutputStream(); + this.publisher = new HttpClientSizedBodyPublisher(this, csPolicy, baos::getInputStream); + pout = baos; } HttpRequest.Builder rb = HttpRequest.newBuilder() diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/http/FileStore.java b/systests/transports/src/test/java/org/apache/cxf/systest/http/FileStore.java new file mode 100644 index 00000000000..23bbb7c6651 --- /dev/null +++ b/systests/transports/src/test/java/org/apache/cxf/systest/http/FileStore.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.http; + +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import jakarta.activation.DataHandler; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.container.AsyncResponse; +import jakarta.ws.rs.container.Suspended; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.Response.Status; +import jakarta.ws.rs.core.UriInfo; +import org.apache.cxf.common.util.StringUtils; +import org.apache.cxf.helpers.IOUtils; +import org.apache.cxf.jaxrs.ext.multipart.Attachment; +import org.apache.cxf.jaxrs.ext.multipart.MultipartBody; + +@Path("/file-store") +public class FileStore { + private final ConcurrentMap store = new ConcurrentHashMap<>(); + @Context private HttpHeaders headers; + + @POST + @Consumes("multipart/form-data") + public void addBook(@QueryParam("chunked") boolean chunked, + @Suspended final AsyncResponse response, @Context final UriInfo uri, final MultipartBody body) { + + String transferEncoding = headers.getHeaderString("Transfer-Encoding"); + if (chunked != Objects.equals("chunked", transferEncoding)) { + response.resume(Response.status(Status.EXPECTATION_FAILED).build()); + return; + } + + for (final Attachment attachment: body.getAllAttachments()) { + final DataHandler handler = attachment.getDataHandler(); + + if (handler != null) { + final String source = handler.getName(); + if (StringUtils.isEmpty(source)) { + response.resume(Response.status(Status.BAD_REQUEST).build()); + return; + } + + try { + if (store.containsKey(source)) { + response.resume(Response.status(Status.CONFLICT).build()); + return; + } + + final byte[] content = IOUtils.readBytesFromStream(handler.getInputStream()); + if (store.putIfAbsent(source, content) != null) { + response.resume(Response.status(Status.CONFLICT).build()); + return; + } + } catch (final Exception ex) { + response.resume(Response.serverError().build()); + } + + if (response.isSuspended()) { + response.resume(Response.created(uri.getRequestUriBuilder().path(source).build()) + .build()); + } + } + } + + if (response.isSuspended()) { + response.resume(Response.status(Status.BAD_REQUEST).build()); + } + } +} diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/http/FileStoreServer.java b/systests/transports/src/test/java/org/apache/cxf/systest/http/FileStoreServer.java new file mode 100644 index 00000000000..1dd6d398b97 --- /dev/null +++ b/systests/transports/src/test/java/org/apache/cxf/systest/http/FileStoreServer.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.http; + +import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; +import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider; +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; + +class FileStoreServer extends AbstractBusTestServerBase { + private org.apache.cxf.endpoint.Server server; + private final String port; + + FileStoreServer(String port) { + this.port = port; + } + + protected void run() { + JAXRSServerFactoryBean sf = new JAXRSServerFactoryBean(); + sf.setResourceClasses(FileStore.class); + sf.setResourceProvider(FileStore.class, new SingletonResourceProvider(new FileStore(), true)); + sf.setAddress("http://localhost:" + port); + server = sf.create(); + } + + public void tearDown() throws Exception { + server.stop(); + server.destroy(); + server = null; + } +} \ No newline at end of file diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/http/HTTPConduitChunkingTest.java b/systests/transports/src/test/java/org/apache/cxf/systest/http/HTTPConduitChunkingTest.java new file mode 100644 index 00000000000..8edff85a7d9 --- /dev/null +++ b/systests/transports/src/test/java/org/apache/cxf/systest/http/HTTPConduitChunkingTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.http; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import jakarta.ws.rs.core.MultivaluedMap; +import jakarta.ws.rs.core.Response; +import org.apache.cxf.jaxrs.client.WebClient; +import org.apache.cxf.jaxrs.ext.multipart.Attachment; +import org.apache.cxf.jaxrs.ext.multipart.MultipartBody; +import org.apache.cxf.jaxrs.impl.MetadataMap; +import org.apache.cxf.jaxrs.model.AbstractResourceInfo; +import org.apache.cxf.jaxrs.provider.MultipartProvider; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized.Parameters; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +@RunWith(value = org.junit.runners.Parameterized.class) +public class HTTPConduitChunkingTest extends AbstractBusClientServerTestBase { + private static final String PORT = allocatePort(FileStoreServer.class); + private final Boolean chunked; + + public HTTPConduitChunkingTest(Boolean chunked) { + this.chunked = chunked; + } + + @BeforeClass + public static void startServers() throws Exception { + AbstractResourceInfo.clearAllMaps(); + assertTrue("server did not launch correctly", launchServer(new FileStoreServer(PORT))); + createStaticBus(); + } + + @Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new Boolean[] {Boolean.FALSE, Boolean.TRUE}); + } + + @Test + public void testChunking() { + final String url = "http://localhost:" + PORT + "/file-store"; + final WebClient webClient = WebClient.create(url, List.of(new MultipartProvider())).query("chunked", chunked); + WebClient.getConfig(webClient).getHttpConduit().getClient().setAllowChunking(chunked); + + try { + final String filename = "keymanagers.jks"; + final MultivaluedMap headers = new MetadataMap<>(); + headers.add("Content-ID", filename); + headers.add("Content-Type", "application/binary"); + headers.add("Content-Disposition", "attachment; filename=" + chunked + "_" + filename); + final Attachment att = new Attachment(getClass().getResourceAsStream("/" + filename), headers); + final MultipartBody entity = new MultipartBody(att); + try (Response response = webClient.header("Content-Type", "multipart/form-data").post(entity)) { + assertThat(response.getStatus(), equalTo(201)); + } + } finally { + webClient.close(); + } + } +}