diff --git a/core/pom.xml b/core/pom.xml index a88ab76ea42..210b4942b4e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -175,6 +175,11 @@ saaj-impl test + + org.awaitility + awaitility + test + diff --git a/core/src/main/java/org/apache/cxf/bus/extension/ExtensionManagerBus.java b/core/src/main/java/org/apache/cxf/bus/extension/ExtensionManagerBus.java index 4c12756d6ac..510a3019edc 100644 --- a/core/src/main/java/org/apache/cxf/bus/extension/ExtensionManagerBus.java +++ b/core/src/main/java/org/apache/cxf/bus/extension/ExtensionManagerBus.java @@ -40,6 +40,8 @@ import org.apache.cxf.configuration.NullConfigurer; import org.apache.cxf.feature.Feature; import org.apache.cxf.interceptor.AbstractBasicInterceptorProvider; +import org.apache.cxf.io.CachedOutputStreamCleaner; +import org.apache.cxf.io.DelayedCachedOutputStreamCleaner; import org.apache.cxf.resource.DefaultResourceManager; import org.apache.cxf.resource.ObjectTypeResolver; import org.apache.cxf.resource.PropertiesResolver; @@ -141,6 +143,11 @@ public InputStream getAsStream(String name) { if (null == this.getExtension(BindingFactoryManager.class)) { new BindingFactoryManagerImpl(this); } + + if (null == this.getExtension(CachedOutputStreamCleaner.class)) { + this.extensions.put(CachedOutputStreamCleaner.class, DelayedCachedOutputStreamCleaner.create(this)); + } + extensionManager.load(new String[] {ExtensionManagerImpl.BUS_EXTENSION_RESOURCE}); extensionManager.activateAllByType(ResourceResolver.class); diff --git a/core/src/main/java/org/apache/cxf/io/CachedConstants.java b/core/src/main/java/org/apache/cxf/io/CachedConstants.java index 24ba8d8347b..ad18d2f9149 100644 --- a/core/src/main/java/org/apache/cxf/io/CachedConstants.java +++ b/core/src/main/java/org/apache/cxf/io/CachedConstants.java @@ -71,6 +71,14 @@ public final class CachedConstants { public static final String CIPHER_TRANSFORMATION_BUS_PROP = "bus.io.CachedOutputStream.CipherTransformation"; + /** + * The delay (in ms) for cleaning up unclosed {@code CachedOutputStream} instances. 30 minutes + * is specified by default. If the value of the delay is set to 0 (or is negative), the cleaner + * will be disabled. + */ + public static final String CLEANER_DELAY_BUS_PROP = + "bus.io.CachedOutputStreamCleaner.Delay"; + private CachedConstants() { // complete } diff --git a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java index ea8ce0d625d..3ba937d04b0 100644 --- a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java +++ b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java @@ -22,6 +22,7 @@ import java.io.BufferedOutputStream; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; +import java.io.Closeable; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; @@ -93,6 +94,7 @@ public class CachedOutputStream extends OutputStream { private List callbacks; private List streamList = new ArrayList<>(); + private CachedOutputStreamCleaner cachedOutputStreamCleaner; public CachedOutputStream() { this(defaultThreshold); @@ -127,6 +129,8 @@ private void readBusProperties() { outputDir = f; } } + + cachedOutputStreamCleaner = b.getExtension(CachedOutputStreamCleaner.class); } } @@ -279,6 +283,9 @@ public void resetOut(OutputStream out, boolean copyOldContent) throws IOExceptio } } finally { streamList.remove(currentStream); + if (cachedOutputStreamCleaner != null) { + cachedOutputStreamCleaner.unregister(currentStream); + } deleteTempFile(); inmem = true; } @@ -481,6 +488,9 @@ private void createFileOutputStream() throws IOException { bout.writeTo(currentStream); inmem = false; streamList.add(currentStream); + if (cachedOutputStreamCleaner != null) { + cachedOutputStreamCleaner.register(this); + } } catch (Exception ex) { //Could be IOException or SecurityException or other issues. //Don't care what, just keep it in memory. @@ -512,6 +522,10 @@ public InputStream getInputStream() throws IOException { try { InputStream fileInputStream = new TransferableFileInputStream(tempFile); streamList.add(fileInputStream); + if (cachedOutputStreamCleaner != null) { + cachedOutputStreamCleaner.register(fileInputStream); + } + if (cipherTransformation != null) { fileInputStream = new CipherInputStream(fileInputStream, ciphers.getDecryptor()) { boolean closed; @@ -537,7 +551,7 @@ private synchronized void deleteTempFile() { FileUtils.delete(file); } } - private boolean maybeDeleteTempFile(Object stream) { + private boolean maybeDeleteTempFile(Closeable stream) { boolean postClosedInvoked = false; streamList.remove(stream); if (!inmem && tempFile != null && streamList.isEmpty() && allowDeleteOfFile) { @@ -549,6 +563,9 @@ private boolean maybeDeleteTempFile(Object stream) { //ignore } postClosedInvoked = true; + if (cachedOutputStreamCleaner != null) { + cachedOutputStreamCleaner.unregister(this); + } } deleteTempFile(); currentStream = new LoadingByteArrayOutputStream(1024); @@ -665,6 +682,9 @@ public void close() throws IOException { if (!closed) { super.close(); maybeDeleteTempFile(this); + if (cachedOutputStreamCleaner != null) { + cachedOutputStreamCleaner.unregister(this); + } } closed = true; } diff --git a/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java b/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java new file mode 100644 index 00000000000..3d6361a4c95 --- /dev/null +++ b/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.io; + +import java.io.Closeable; + +/** + * The {@link Bus} extension to clean up unclosed {@link CachedOutputStream} instances (and alike) backed by + * temporary files (leading to disk fill, see https://issues.apache.org/jira/browse/CXF-7396. + */ +public interface CachedOutputStreamCleaner { + /** + * Run the clean up + */ + void clean(); + + /** + * Register the stream instance for the clean up + */ + void unregister(Closeable closeable); + + /** + * Unregister the stream instance from the clean up (closed properly) + */ + void register(Closeable closeable); +} diff --git a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java new file mode 100644 index 00000000000..fed4ca79f1c --- /dev/null +++ b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java @@ -0,0 +1,191 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.io; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.Objects; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +import org.apache.cxf.Bus; +import org.apache.cxf.buslifecycle.BusLifeCycleListener; +import org.apache.cxf.buslifecycle.BusLifeCycleManager; +import org.apache.cxf.common.logging.LogUtils; + +public final class DelayedCachedOutputStreamCleaner implements CachedOutputStreamCleaner, BusLifeCycleListener { + private static final Logger LOG = LogUtils.getL7dLogger(DelayedCachedOutputStreamCleaner.class); + + private final long delay; /* default is 30 minutes */ + private final DelayQueue queue = new DelayQueue<>(); + private final Timer timer; + + private static final class DelayedCloseable implements Delayed { + private final Closeable closeable; + private final long expiredAt; + + DelayedCloseable(final Closeable closeable, final long delay) { + this.closeable = closeable; + this.expiredAt = System.nanoTime() + delay; + } + + @Override + public int compareTo(Delayed o) { + return Long.compare(getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS)); + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(expiredAt - System.nanoTime(), TimeUnit.NANOSECONDS); + } + + @Override + public int hashCode() { + return Objects.hash(closeable); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + final DelayedCloseable other = (DelayedCloseable) obj; + return Objects.equals(closeable, other.closeable); + } + } + + protected DelayedCachedOutputStreamCleaner(long delay, TimeUnit unit) { + this.delay = TimeUnit.NANOSECONDS.convert(delay, unit); + this.timer = new Timer("DelayedCachedOutputStreamCleaner", true); + this.timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + clean(); + } + }, 0, TimeUnit.MILLISECONDS.convert(Math.max(1, delay >> 1), unit)); + } + + @Override + public void register(Closeable closeable) { + queue.put(new DelayedCloseable(closeable, delay)); + } + + @Override + public void unregister(Closeable closeable) { + queue.remove(new DelayedCloseable(closeable, delay)); + } + + @Override + public void clean() { + final Collection closeables = new ArrayList<>(); + queue.drainTo(closeables); + clean(closeables); + } + + @Override + public void initComplete() { + } + + @Override + public void postShutdown() { + } + + @Override + public void preShutdown() { + timer.cancel(); + } + + public void forceClean() { + clean(queue); + } + + private void clean(Collection closeables) { + final Iterator iterator = closeables.iterator(); + while (iterator.hasNext()) { + final DelayedCloseable next = iterator.next(); + try { + iterator.remove(); + LOG.warning("Unclosed (leaked?) stream detected: " + next.closeable); + next.closeable.close(); + } catch (final IOException | RuntimeException ex) { + LOG.warning("Unable to close (leaked?) stream: " + ex.getMessage()); + } + } + } + + public static CachedOutputStreamCleaner create(Bus bus) { + Number delayValue = null; + BusLifeCycleManager busLifeCycleManager = null; + + if (bus != null) { + delayValue = (Number) bus.getProperty(CachedConstants.CLEANER_DELAY_BUS_PROP); + busLifeCycleManager = bus.getExtension(BusLifeCycleManager.class); + } + + if (delayValue == null) { + final DelayedCachedOutputStreamCleaner cleaner = + new DelayedCachedOutputStreamCleaner(30, TimeUnit.MINUTES); + if (busLifeCycleManager != null) { + busLifeCycleManager.registerLifeCycleListener(cleaner); + } + return cleaner; + } else { + final long delay = delayValue.longValue(); + if (delay > 0) { + final DelayedCachedOutputStreamCleaner cleaner = + new DelayedCachedOutputStreamCleaner(delay, TimeUnit.MILLISECONDS); + if (busLifeCycleManager != null) { + busLifeCycleManager.registerLifeCycleListener(cleaner); + } + return cleaner; + } else { + return new CachedOutputStreamCleaner() { + @Override + public void unregister(Closeable closeable) { + } + + @Override + public void register(Closeable closeable) { + } + + @Override + public void clean() { + } + }; /* noop */ + } + } + } +} diff --git a/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java b/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java new file mode 100644 index 00000000000..72401067c51 --- /dev/null +++ b/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.io; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; +import org.apache.cxf.bus.managers.CXFBusLifeCycleManager; +import org.apache.cxf.buslifecycle.BusLifeCycleManager; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; + +public class DelayedCachedOutputStreamCleanerTest { + private Bus bus; + + @Before + public void setUp() { + bus = BusFactory.getDefaultBus(true); + } + + @After + public void tearDown() { + bus.shutdown(true); + bus = null; + } + + @Test + public void testNoop() { + bus.setProperty(CachedConstants.CLEANER_DELAY_BUS_PROP, 0); + final CachedOutputStreamCleaner cleaner = DelayedCachedOutputStreamCleaner.create(bus); + assertThat(cleaner, not(instanceOf(DelayedCachedOutputStreamCleaner.class))); /* noop */ + } + + @Test + public void testForceClean() throws InterruptedException { + final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); + assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); + + final AtomicBoolean latch = new AtomicBoolean(false); + final Closeable closeable = () -> latch.compareAndSet(false, true); + cleaner.register(closeable); + + final DelayedCachedOutputStreamCleaner delayedCleaner = (DelayedCachedOutputStreamCleaner) cleaner; + delayedCleaner.forceClean(); + + // Await for Closeable::close to be called + assertThat(latch.get(), is(true)); + } + + @Test + public void testClean() throws InterruptedException { + final AtomicInteger latch = new AtomicInteger(); + final Closeable closeable1 = () -> latch.incrementAndGet(); + final Closeable closeable2 = () -> latch.incrementAndGet(); + + bus.setProperty(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500); /* 2.5 seconds */ + final CachedOutputStreamCleaner cleaner = DelayedCachedOutputStreamCleaner.create(bus); + + cleaner.register(closeable1); + cleaner.register(closeable2); + + // Await for Closeable::close to be called on schedule + await().atMost(5, TimeUnit.SECONDS).untilAtomic(latch, equalTo(2)); + assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); + } + + @Test + public void testForceCleanForEmpty() throws InterruptedException { + final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); + assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); + + final AtomicBoolean latch = new AtomicBoolean(false); + final Closeable closeable = () -> latch.compareAndSet(false, true); + + cleaner.register(closeable); + cleaner.unregister(closeable); + + final DelayedCachedOutputStreamCleaner delayedCleaner = (DelayedCachedOutputStreamCleaner) cleaner; + delayedCleaner.forceClean(); + + // Closeable::close should not be called + assertThat(latch.get(), is(false)); + } + + @Test + public void testForceCleanException() throws InterruptedException { + final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); + assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); + + final AtomicInteger latch = new AtomicInteger(); + final Closeable closeable2 = () -> latch.incrementAndGet(); + final Closeable closeable1 = () -> { + latch.incrementAndGet(); + throw new IOException("Simulated"); + }; + cleaner.register(closeable1); + cleaner.register(closeable2); + + final DelayedCachedOutputStreamCleaner delayedCleaner = (DelayedCachedOutputStreamCleaner) cleaner; + delayedCleaner.forceClean(); + + // Try to call force clean one more time + delayedCleaner.forceClean(); + + // Await for Closeable::close to be called + assertThat(latch.get(), equalTo(2)); + } + + @Test + public void testBusLifecycle() throws InterruptedException { + @SuppressWarnings("unused") + final BusLifeCycleManager manager = new CXFBusLifeCycleManager(bus); + + final AtomicBoolean latch = new AtomicBoolean(); + final Closeable closeable = () -> latch.compareAndSet(false, true); + + bus.setProperty(CachedConstants.CLEANER_DELAY_BUS_PROP, 2500); /* 2.5 seconds */ + final CachedOutputStreamCleaner cleaner = DelayedCachedOutputStreamCleaner.create(bus); + cleaner.register(closeable); + + // Closes the bus, the cleaner should cancel the internal timer(s) + bus.shutdown(true); + + // The Closeable::close should not be called since timer(s) is cancelled + await().during(3, TimeUnit.SECONDS).untilAtomic(latch, is(false)); + } + +} diff --git a/parent/pom.xml b/parent/pom.xml index 0255e964599..1326263b5b9 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -275,7 +275,6 @@ 1.6.3_1 1.2_5 1.1.4c_6 - 4.2.2 diff --git a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java index 47dfd025f43..89e551e21a7 100644 --- a/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java +++ b/systests/jaxws/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java @@ -68,6 +68,8 @@ import org.apache.cxf.frontend.ClientProxy; import org.apache.cxf.helpers.FileUtils; import org.apache.cxf.io.CachedOutputStream; +import org.apache.cxf.io.CachedOutputStreamCleaner; +import org.apache.cxf.io.DelayedCachedOutputStreamCleaner; import org.apache.cxf.jaxws.DispatchImpl; import org.apache.cxf.message.Message; import org.apache.cxf.staxutils.StaxUtils; @@ -126,9 +128,10 @@ public class ClientServerTest extends AbstractBusClientServerTestBase { public static void startServers() throws Exception { // set up configuration to enable schema validation URL url = ClientServerTest.class.getResource("fault-stack-trace.xml"); + // Create bus first so it will be shared between the server and clients + createStaticBus(url.toString()); assertNotNull("cannot find test resource", url); assertTrue("server did not launch correctly", launchServer(Server.class, true)); - createStaticBus(url.toString()); } @Test @@ -1076,6 +1079,45 @@ public void testEchoProviderThresholdAsyncThrows() throws Exception { FileUtils.removeDir(f); } + @Test + public void testEchoProviderThresholdTimeout() throws Exception { + final File f = Files.createTempDir(); + LOG.info("Using temp folder: " + f.getAbsolutePath()); + + System.setProperty("org.apache.cxf.io.CachedOutputStream.OutputDirectory", f.getAbsolutePath()); + CachedOutputStream.setDefaultThreshold(5); + + String requestString = ""; + Service service = Service.create(serviceName); + service.addPort(fakePortName, jakarta.xml.ws.soap.SOAPBinding.SOAP11HTTP_BINDING, + "http://localhost:" + PORT + "/SoapContext/AsyncEchoProvider"); + Dispatch dispatcher = service.createDispatch(fakePortName, + StreamSource.class, + Service.Mode.PAYLOAD); + dispatcher.getRequestContext().put("jakarta.xml.ws.client.receiveTimeout", "1000"); + dispatcher.getRequestContext().put("jakarta.xml.ws.client.connectionTimeout", "1000"); + + StreamSource request = new StreamSource(new ByteArrayInputStream(requestString.getBytes())); + try { + // Expecting java.net.SocketTimeoutException: Read timed out + StreamSource response = dispatcher.invoke(request); + assertEquals(requestString, StaxUtils.toString(response)); + } catch (final WebServiceException ex) { + ((DispatchImpl)dispatcher).getClient().close(); + } + + //give the server side a little time to process it's part and close the files + if (f.list().length > 0) { + final CachedOutputStreamCleaner cleaner = getBus().getExtension(CachedOutputStreamCleaner.class); + if (cleaner instanceof DelayedCachedOutputStreamCleaner) { + ((DelayedCachedOutputStreamCleaner) cleaner).forceClean(); + } + } + + assertEquals("Expected no files but there is at list one", 0, f.list().length); + FileUtils.removeDir(f); + } + @Test public void testEchoProviderAsyncDecoupledEndpoints() throws Exception { String requestString = "";