Skip to content

Commit

Permalink
Refactor the cleaner implementation and add guardrails for cleaner delay
Browse files Browse the repository at this point in the history
  • Loading branch information
reta committed Sep 11, 2024
1 parent f22a3e7 commit 26eaf39
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 51 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/cxf/io/CachedConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public final class CachedConstants {

/**
* The delay (in ms) for cleaning up unclosed {@code CachedOutputStream} instances. 30 minutes
* is specified by default. If the value of the delay is set to 0 (or is negative), the cleaner
* will be disabled.
* is specified by default, the minimum value is 2 seconds. If the value of the delay is set to
* 0 (or is negative), the cleaner will be deactivated.
*/
public static final String CLEANER_DELAY_BUS_PROP =
"bus.io.CachedOutputStreamCleaner.Delay";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,92 @@

public final class DelayedCachedOutputStreamCleaner implements CachedOutputStreamCleaner, BusLifeCycleListener {
private static final Logger LOG = LogUtils.getL7dLogger(DelayedCachedOutputStreamCleaner.class);
private static final long MIN_DELAY = 2000; /* 2 seconds */
private static final DelayedCleaner NOOP_CLEANER = new DelayedCleaner() {
// NOOP
};

private long delay; /* default is 30 minutes, in milliseconds */
private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>();
private Timer timer;
private DelayedCleaner cleaner = NOOP_CLEANER;

private interface DelayedCleaner extends CachedOutputStreamCleaner, Closeable {
@Override
default void register(Closeable closeable) {
}

@Override
default void unregister(Closeable closeable) {
}

@Override
default void close() {
}

@Override
default void clean() {
}

default void forceClean() {
}
}

private static final class DelayedCleanerImpl implements DelayedCleaner {
private final long delay; /* default is 30 minutes, in milliseconds */
private final DelayQueue<DelayedCloseable> queue = new DelayQueue<>();
private final Timer timer;

DelayedCleanerImpl(final long delay) {
this.delay = delay;
this.timer = new Timer("DelayedCachedOutputStreamCleaner", true);
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
clean();
}
}, 0, Math.max(MIN_DELAY, delay >> 1));
}

@Override
public void register(Closeable closeable) {
queue.put(new DelayedCloseable(closeable, delay));
}

@Override
public void unregister(Closeable closeable) {
queue.remove(new DelayedCloseable(closeable, delay));
}

@Override
public void clean() {
final Collection<DelayedCloseable> closeables = new ArrayList<>();
queue.drainTo(closeables);
clean(closeables);
}

@Override
public void forceClean() {
clean(queue);
}

@Override
public void close() {
timer.cancel();
queue.clear();
}

private void clean(Collection<DelayedCloseable> closeables) {
final Iterator<DelayedCloseable> iterator = closeables.iterator();
while (iterator.hasNext()) {
final DelayedCloseable next = iterator.next();
try {
iterator.remove();
LOG.warning("Unclosed (leaked?) stream detected: " + next.closeable);
next.closeable.close();
} catch (final IOException | RuntimeException ex) {
LOG.warning("Unable to close (leaked?) stream: " + ex.getMessage());
}
}
}
}

private static final class DelayedCloseable implements Delayed {
private final Closeable closeable;
Expand Down Expand Up @@ -97,52 +179,45 @@ public void setBus(Bus bus) {
delayValue = (Number) bus.getProperty(CachedConstants.CLEANER_DELAY_BUS_PROP);
busLifeCycleManager = bus.getExtension(BusLifeCycleManager.class);
}


if (cleaner != null) {
cleaner.close();
}

if (delayValue == null) {
delay = TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES);
// Default delay is set to 30 mins
cleaner = new DelayedCleanerImpl(TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES));
} else {
final long value = delayValue.longValue();
if (value > 0) {
delay = value; /* already in milliseconds */
if (value > 0 && value >= MIN_DELAY) {
cleaner = new DelayedCleanerImpl(value); /* already in milliseconds */
} else {
cleaner = NOOP_CLEANER;
if (value < 0) {
LOG.warning("The value of " + CachedConstants.CLEANER_DELAY_BUS_PROP + " property is invalid: "
+ value + " (should be >= " + MIN_DELAY + ", 0 to deactivate)");
}
}
}

if (busLifeCycleManager != null) {
busLifeCycleManager.registerLifeCycleListener(this);
}

if (timer != null) {
timer.cancel();
timer = null;
}

if (delay > 0) {
timer = new Timer("DelayedCachedOutputStreamCleaner", true);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
clean();
}
}, 0, Math.max(1, delay >> 1));
}

}

@Override
public void register(Closeable closeable) {
queue.put(new DelayedCloseable(closeable, delay));
cleaner.register(closeable);
}

@Override
public void unregister(Closeable closeable) {
queue.remove(new DelayedCloseable(closeable, delay));
cleaner.unregister(closeable);
}

@Override
public void clean() {
final Collection<DelayedCloseable> closeables = new ArrayList<>();
queue.drainTo(closeables);
clean(closeables);
cleaner.clean();
}

@Override
Expand All @@ -155,26 +230,10 @@ public void postShutdown() {

@Override
public void preShutdown() {
if (timer != null) {
timer.cancel();
}
cleaner.close();
}

public void forceClean() {
clean(queue);
}

private void clean(Collection<DelayedCloseable> closeables) {
final Iterator<DelayedCloseable> iterator = closeables.iterator();
while (iterator.hasNext()) {
final DelayedCloseable next = iterator.next();
try {
iterator.remove();
LOG.warning("Unclosed (leaked?) stream detected: " + next.closeable);
next.closeable.close();
} catch (final IOException | RuntimeException ex) {
LOG.warning("Unable to close (leaked?) stream: " + ex.getMessage());
}
}
cleaner.forceClean();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ public class DelayedCachedOutputStreamCleanerTest {

@After
public void tearDown() {
bus.shutdown(true);
bus = null;
if (bus != null) {
bus.shutdown(true);
bus = null;
}
}

@Test
Expand All @@ -56,6 +58,8 @@ public void testNoop() {

final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */

assertNoopCleaner(cleaner);
}

@Test
Expand Down Expand Up @@ -161,4 +165,37 @@ public void testBusLifecycle() throws InterruptedException {
await().during(3, TimeUnit.SECONDS).untilAtomic(latch, is(false));
}

@Test
public void testNegativeDelay() throws InterruptedException {
final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, -1);
bus = new ExtensionManagerBus(new HashMap<>(), properties);

final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */

assertNoopCleaner(cleaner);
}

@Test
public void testTooSmallDelay() throws InterruptedException {
final Map<String, Object> properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 1500);
bus = new ExtensionManagerBus(new HashMap<>(), properties);

final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class);
assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */

assertNoopCleaner(cleaner);
}

private void assertNoopCleaner(final CachedOutputStreamCleaner cleaner) {
final AtomicBoolean latch = new AtomicBoolean(false);
final Closeable closeable = () -> latch.compareAndSet(false, true);
cleaner.register(closeable);

final DelayedCachedOutputStreamCleaner delayedCleaner = (DelayedCachedOutputStreamCleaner) cleaner;
delayedCleaner.forceClean();

// Noop, Closeable::close should not be called
assertThat(latch.get(), is(false));
}
}

0 comments on commit 26eaf39

Please sign in to comment.