Skip to content

Commit

Permalink
CXF-7396: CachedOutputStream doesn't delete temp files
Browse files Browse the repository at this point in the history
  • Loading branch information
reta committed Sep 8, 2024
1 parent e094bb4 commit 80ca5ba
Show file tree
Hide file tree
Showing 9 changed files with 477 additions and 3 deletions.
5 changes: 5 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@
<artifactId>saaj-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.cxf.configuration.NullConfigurer;
import org.apache.cxf.feature.Feature;
import org.apache.cxf.interceptor.AbstractBasicInterceptorProvider;
import org.apache.cxf.io.CachedOutputStreamCleaner;
import org.apache.cxf.io.DelayedCachedOutputStreamCleaner;
import org.apache.cxf.resource.DefaultResourceManager;
import org.apache.cxf.resource.ObjectTypeResolver;
import org.apache.cxf.resource.PropertiesResolver;
Expand Down Expand Up @@ -141,6 +143,11 @@ public InputStream getAsStream(String name) {
if (null == this.getExtension(BindingFactoryManager.class)) {
new BindingFactoryManagerImpl(this);
}

if (null == this.getExtension(CachedOutputStreamCleaner.class)) {
this.extensions.put(CachedOutputStreamCleaner.class, DelayedCachedOutputStreamCleaner.create(this));
}

extensionManager.load(new String[] {ExtensionManagerImpl.BUS_EXTENSION_RESOURCE});
extensionManager.activateAllByType(ResourceResolver.class);

Expand Down
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/cxf/io/CachedConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ public final class CachedConstants {
public static final String CIPHER_TRANSFORMATION_BUS_PROP =
"bus.io.CachedOutputStream.CipherTransformation";

/**
* The delay (in ms) for cleaning up unclosed {@code CachedOutputStream} instances. 30 minutes
* is specified by default. If the value of the delay is set to 0 (or is negative), the cleaner
* will be disabled.
*/
public static final String CLEANER_DELAY_BUS_PROP =
"bus.io.CachedOutputStreamCleaner.Delay";

private CachedConstants() {
// complete
}
Expand Down
22 changes: 21 additions & 1 deletion core/src/main/java/org/apache/cxf/io/CachedOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -93,6 +94,7 @@ public class CachedOutputStream extends OutputStream {
private List<CachedOutputStreamCallback> callbacks;

private List<Object> streamList = new ArrayList<>();
private CachedOutputStreamCleaner cachedOutputStreamCleaner;

public CachedOutputStream() {
this(defaultThreshold);
Expand Down Expand Up @@ -127,6 +129,8 @@ private void readBusProperties() {
outputDir = f;
}
}

cachedOutputStreamCleaner = b.getExtension(CachedOutputStreamCleaner.class);
}
}

Expand Down Expand Up @@ -279,6 +283,9 @@ public void resetOut(OutputStream out, boolean copyOldContent) throws IOExceptio
}
} finally {
streamList.remove(currentStream);
if (cachedOutputStreamCleaner != null) {
cachedOutputStreamCleaner.unregister(currentStream);
}
deleteTempFile();
inmem = true;
}
Expand Down Expand Up @@ -481,6 +488,9 @@ private void createFileOutputStream() throws IOException {
bout.writeTo(currentStream);
inmem = false;
streamList.add(currentStream);
if (cachedOutputStreamCleaner != null) {
cachedOutputStreamCleaner.register(this);
}
} catch (Exception ex) {
//Could be IOException or SecurityException or other issues.
//Don't care what, just keep it in memory.
Expand Down Expand Up @@ -512,6 +522,10 @@ public InputStream getInputStream() throws IOException {
try {
InputStream fileInputStream = new TransferableFileInputStream(tempFile);
streamList.add(fileInputStream);
if (cachedOutputStreamCleaner != null) {
cachedOutputStreamCleaner.register(fileInputStream);
}

if (cipherTransformation != null) {
fileInputStream = new CipherInputStream(fileInputStream, ciphers.getDecryptor()) {
boolean closed;
Expand All @@ -537,7 +551,7 @@ private synchronized void deleteTempFile() {
FileUtils.delete(file);
}
}
private boolean maybeDeleteTempFile(Object stream) {
private boolean maybeDeleteTempFile(Closeable stream) {
boolean postClosedInvoked = false;
streamList.remove(stream);
if (!inmem && tempFile != null && streamList.isEmpty() && allowDeleteOfFile) {
Expand All @@ -549,6 +563,9 @@ private boolean maybeDeleteTempFile(Object stream) {
//ignore
}
postClosedInvoked = true;
if (cachedOutputStreamCleaner != null) {
cachedOutputStreamCleaner.unregister(this);
}
}
deleteTempFile();
currentStream = new LoadingByteArrayOutputStream(1024);
Expand Down Expand Up @@ -665,6 +682,9 @@ public void close() throws IOException {
if (!closed) {
super.close();
maybeDeleteTempFile(this);
if (cachedOutputStreamCleaner != null) {
cachedOutputStreamCleaner.unregister(this);
}
}
closed = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cxf.io;

import java.io.Closeable;

/**
* The {@link Bus} extension to clean up unclosed {@link CachedOutputStream} instances (and alike) backed by
* temporary files (leading to disk fill, see https://issues.apache.org/jira/browse/CXF-7396.
*/
public interface CachedOutputStreamCleaner {
/**
* Run the clean up
*/
void clean();

/**
* Register the stream instance for the clean up
*/
void unregister(Closeable closeable);

/**
* Unregister the stream instance from the clean up (closed properly)
*/
void register(Closeable closeable);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cxf.io;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import org.apache.cxf.Bus;
import org.apache.cxf.buslifecycle.BusLifeCycleListener;
import org.apache.cxf.buslifecycle.BusLifeCycleManager;
import org.apache.cxf.common.logging.LogUtils;

public final class DelayedCachedOutputStreamCleaner implements CachedOutputStreamCleaner, BusLifeCycleListener {
private static final Logger LOG = LogUtils.getL7dLogger(DelayedCachedOutputStreamCleaner.class);

private final long delay; /* default is 30 minutes */
private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>();
private final Timer timer;

private static final class DelayedCloseable implements Delayed {
private final Closeable closeable;
private final long expiredAt;

DelayedCloseable(final Closeable closeable, final long delay) {
this.closeable = closeable;
this.expiredAt = System.nanoTime() + delay;
}

@Override
public int compareTo(Delayed o) {
return Long.compare(getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
}

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expiredAt - System.nanoTime(), TimeUnit.NANOSECONDS);
}

@Override
public int hashCode() {
return Objects.hash(closeable);
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}

if (obj == null) {
return false;
}

if (getClass() != obj.getClass()) {
return false;
}

final DelayedCloseable other = (DelayedCloseable) obj;
return Objects.equals(closeable, other.closeable);
}
}

protected DelayedCachedOutputStreamCleaner(long delay, TimeUnit unit) {
this.delay = TimeUnit.NANOSECONDS.convert(delay, unit);
this.timer = new Timer("DelayedCachedOutputStreamCleaner", true);
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
clean();
}
}, 0, TimeUnit.MILLISECONDS.convert(Math.max(1, delay >> 1), unit));
}

@Override
public void register(Closeable closeable) {
queue.put(new DelayedCloseable(closeable, delay));
}

@Override
public void unregister(Closeable closeable) {
queue.remove(new DelayedCloseable(closeable, delay));
}

@Override
public void clean() {
final Collection<DelayedCloseable> closeables = new ArrayList<>();
queue.drainTo(closeables);
clean(closeables);
}

@Override
public void initComplete() {
}

@Override
public void postShutdown() {
}

@Override
public void preShutdown() {
timer.cancel();
}

public void forceClean() {
clean(queue);
}

private void clean(Collection<DelayedCloseable> closeables) {
final Iterator<DelayedCloseable> iterator = closeables.iterator();
while (iterator.hasNext()) {
final DelayedCloseable next = iterator.next();
try {
iterator.remove();
LOG.warning("Unclosed (leaked?) stream detected: " + next.closeable);
next.closeable.close();
} catch (final IOException | RuntimeException ex) {
LOG.warning("Unable to close (leaked?) stream: " + ex.getMessage());
}
}
}

public static CachedOutputStreamCleaner create(Bus bus) {
Number delayValue = null;
BusLifeCycleManager busLifeCycleManager = null;

if (bus != null) {
delayValue = (Number) bus.getProperty(CachedConstants.CLEANER_DELAY_BUS_PROP);
busLifeCycleManager = bus.getExtension(BusLifeCycleManager.class);
}

if (delayValue == null) {
final DelayedCachedOutputStreamCleaner cleaner =
new DelayedCachedOutputStreamCleaner(30, TimeUnit.MINUTES);
if (busLifeCycleManager != null) {
busLifeCycleManager.registerLifeCycleListener(cleaner);
}
return cleaner;
} else {
final long delay = delayValue.longValue();
if (delay > 0) {
final DelayedCachedOutputStreamCleaner cleaner =
new DelayedCachedOutputStreamCleaner(delay, TimeUnit.MILLISECONDS);
if (busLifeCycleManager != null) {
busLifeCycleManager.registerLifeCycleListener(cleaner);
}
return cleaner;
} else {
return new CachedOutputStreamCleaner() {
@Override
public void unregister(Closeable closeable) {
}

@Override
public void register(Closeable closeable) {
}

@Override
public void clean() {
}
}; /* noop */
}
}
}
}
Loading

0 comments on commit 80ca5ba

Please sign in to comment.