From 26eaf390e4f962697074ecb693f324936d070dbb Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 11 Sep 2024 08:45:42 -0400 Subject: [PATCH] Refactor the cleaner implementation and add guardrails for cleaner delay --- .../org/apache/cxf/io/CachedConstants.java | 4 +- .../io/DelayedCachedOutputStreamCleaner.java | 153 ++++++++++++------ .../DelayedCachedOutputStreamCleanerTest.java | 41 ++++- 3 files changed, 147 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/org/apache/cxf/io/CachedConstants.java b/core/src/main/java/org/apache/cxf/io/CachedConstants.java index ad18d2f9149..1b23f66e995 100644 --- a/core/src/main/java/org/apache/cxf/io/CachedConstants.java +++ b/core/src/main/java/org/apache/cxf/io/CachedConstants.java @@ -73,8 +73,8 @@ public final class CachedConstants { /** * The delay (in ms) for cleaning up unclosed {@code CachedOutputStream} instances. 30 minutes - * is specified by default. If the value of the delay is set to 0 (or is negative), the cleaner - * will be disabled. + * is specified by default, the minimum value is 2 seconds. If the value of the delay is set to + * 0 (or is negative), the cleaner will be deactivated. */ public static final String CLEANER_DELAY_BUS_PROP = "bus.io.CachedOutputStreamCleaner.Delay"; diff --git a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java index 7a790a3fa68..3cccf15d8aa 100644 --- a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java +++ b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java @@ -40,10 +40,92 @@ public final class DelayedCachedOutputStreamCleaner implements CachedOutputStreamCleaner, BusLifeCycleListener { private static final Logger LOG = LogUtils.getL7dLogger(DelayedCachedOutputStreamCleaner.class); + private static final long MIN_DELAY = 2000; /* 2 seconds */ + private static final DelayedCleaner NOOP_CLEANER = new DelayedCleaner() { + // NOOP + }; - private long delay; /* default is 30 minutes, in milliseconds */ - private final DelayQueue queue = new DelayQueue<>(); - private Timer timer; + private DelayedCleaner cleaner = NOOP_CLEANER; + + private interface DelayedCleaner extends CachedOutputStreamCleaner, Closeable { + @Override + default void register(Closeable closeable) { + } + + @Override + default void unregister(Closeable closeable) { + } + + @Override + default void close() { + } + + @Override + default void clean() { + } + + default void forceClean() { + } + } + + private static final class DelayedCleanerImpl implements DelayedCleaner { + private final long delay; /* default is 30 minutes, in milliseconds */ + private final DelayQueue queue = new DelayQueue<>(); + private final Timer timer; + + DelayedCleanerImpl(final long delay) { + this.delay = delay; + this.timer = new Timer("DelayedCachedOutputStreamCleaner", true); + this.timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + clean(); + } + }, 0, Math.max(MIN_DELAY, delay >> 1)); + } + + @Override + public void register(Closeable closeable) { + queue.put(new DelayedCloseable(closeable, delay)); + } + + @Override + public void unregister(Closeable closeable) { + queue.remove(new DelayedCloseable(closeable, delay)); + } + + @Override + public void clean() { + final Collection closeables = new ArrayList<>(); + queue.drainTo(closeables); + clean(closeables); + } + + @Override + public void forceClean() { + clean(queue); + } + + @Override + public void close() { + timer.cancel(); + queue.clear(); + } + + private void clean(Collection closeables) { + final Iterator iterator = closeables.iterator(); + while (iterator.hasNext()) { + final DelayedCloseable next = iterator.next(); + try { + iterator.remove(); + LOG.warning("Unclosed (leaked?) stream detected: " + next.closeable); + next.closeable.close(); + } catch (final IOException | RuntimeException ex) { + LOG.warning("Unable to close (leaked?) stream: " + ex.getMessage()); + } + } + } + } private static final class DelayedCloseable implements Delayed { private final Closeable closeable; @@ -97,52 +179,45 @@ public void setBus(Bus bus) { delayValue = (Number) bus.getProperty(CachedConstants.CLEANER_DELAY_BUS_PROP); busLifeCycleManager = bus.getExtension(BusLifeCycleManager.class); } - + + if (cleaner != null) { + cleaner.close(); + } + if (delayValue == null) { - delay = TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES); + // Default delay is set to 30 mins + cleaner = new DelayedCleanerImpl(TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES)); } else { final long value = delayValue.longValue(); - if (value > 0) { - delay = value; /* already in milliseconds */ + if (value > 0 && value >= MIN_DELAY) { + cleaner = new DelayedCleanerImpl(value); /* already in milliseconds */ + } else { + cleaner = NOOP_CLEANER; + if (value < 0) { + LOG.warning("The value of " + CachedConstants.CLEANER_DELAY_BUS_PROP + " property is invalid: " + + value + " (should be >= " + MIN_DELAY + ", 0 to deactivate)"); + } } } if (busLifeCycleManager != null) { busLifeCycleManager.registerLifeCycleListener(this); } - - if (timer != null) { - timer.cancel(); - timer = null; - } - - if (delay > 0) { - timer = new Timer("DelayedCachedOutputStreamCleaner", true); - timer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - clean(); - } - }, 0, Math.max(1, delay >> 1)); - } - } - + @Override public void register(Closeable closeable) { - queue.put(new DelayedCloseable(closeable, delay)); + cleaner.register(closeable); } @Override public void unregister(Closeable closeable) { - queue.remove(new DelayedCloseable(closeable, delay)); + cleaner.unregister(closeable); } @Override public void clean() { - final Collection closeables = new ArrayList<>(); - queue.drainTo(closeables); - clean(closeables); + cleaner.clean(); } @Override @@ -155,26 +230,10 @@ public void postShutdown() { @Override public void preShutdown() { - if (timer != null) { - timer.cancel(); - } + cleaner.close(); } public void forceClean() { - clean(queue); - } - - private void clean(Collection closeables) { - final Iterator iterator = closeables.iterator(); - while (iterator.hasNext()) { - final DelayedCloseable next = iterator.next(); - try { - iterator.remove(); - LOG.warning("Unclosed (leaked?) stream detected: " + next.closeable); - next.closeable.close(); - } catch (final IOException | RuntimeException ex) { - LOG.warning("Unable to close (leaked?) stream: " + ex.getMessage()); - } - } + cleaner.forceClean(); } } diff --git a/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java b/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java index a6c9993d08d..3f0603aa7f3 100644 --- a/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java +++ b/core/src/test/java/org/apache/cxf/io/DelayedCachedOutputStreamCleanerTest.java @@ -45,8 +45,10 @@ public class DelayedCachedOutputStreamCleanerTest { @After public void tearDown() { - bus.shutdown(true); - bus = null; + if (bus != null) { + bus.shutdown(true); + bus = null; + } } @Test @@ -56,6 +58,8 @@ public void testNoop() { final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */ + + assertNoopCleaner(cleaner); } @Test @@ -161,4 +165,37 @@ public void testBusLifecycle() throws InterruptedException { await().during(3, TimeUnit.SECONDS).untilAtomic(latch, is(false)); } + @Test + public void testNegativeDelay() throws InterruptedException { + final Map properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, -1); + bus = new ExtensionManagerBus(new HashMap<>(), properties); + + final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); + assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */ + + assertNoopCleaner(cleaner); + } + + @Test + public void testTooSmallDelay() throws InterruptedException { + final Map properties = Collections.singletonMap(CachedConstants.CLEANER_DELAY_BUS_PROP, 1500); + bus = new ExtensionManagerBus(new HashMap<>(), properties); + + final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); + assertThat(cleaner, instanceOf(DelayedCachedOutputStreamCleaner.class)); /* noop */ + + assertNoopCleaner(cleaner); + } + + private void assertNoopCleaner(final CachedOutputStreamCleaner cleaner) { + final AtomicBoolean latch = new AtomicBoolean(false); + final Closeable closeable = () -> latch.compareAndSet(false, true); + cleaner.register(closeable); + + final DelayedCachedOutputStreamCleaner delayedCleaner = (DelayedCachedOutputStreamCleaner) cleaner; + delayedCleaner.forceClean(); + + // Noop, Closeable::close should not be called + assertThat(latch.get(), is(false)); + } }