From aeb719b10d81cf9f740cdbbb73beac3b4a2a9b2d Mon Sep 17 00:00:00 2001 From: Jano Date: Sun, 20 Sep 2020 19:13:18 +0100 Subject: [PATCH] move logic from BoundedExecutor to KeySequentialBoundedExecutor --- README.md | 2 +- pom.xml | 2 +- .../com/jano7/executor/BoundedExecutor.java | 97 ---------- .../KeySequentialBoundedExecutor.java | 61 +++++- .../jano7/executor/BoundedExecutorTest.java | 176 ------------------ .../KeySequentialBoundedExecutorTest.java | 170 ++++++++++++++++- .../executor/KeySequentialRunnerTest.java | 30 ++- .../com/jano7/executor/TaskQueueTest.java | 2 +- .../java/com/jano7/executor/TestUtil.java | 30 --- .../java/com/jano7/executor/TestUtils.java | 77 ++++++++ 10 files changed, 314 insertions(+), 333 deletions(-) delete mode 100644 src/main/java/com/jano7/executor/BoundedExecutor.java delete mode 100644 src/test/java/com/jano7/executor/BoundedExecutorTest.java delete mode 100644 src/test/java/com/jano7/executor/TestUtil.java create mode 100644 src/test/java/com/jano7/executor/TestUtils.java diff --git a/README.md b/README.md index 7fd8dbc..9086288 100644 --- a/README.md +++ b/README.md @@ -101,6 +101,6 @@ from multiple threads without synchronization. com.jano7 executor - 2.0.1 + 2.0.2 ``` diff --git a/pom.xml b/pom.xml index 020f777..9b8fa67 100644 --- a/pom.xml +++ b/pom.xml @@ -152,7 +152,7 @@ scm:git:git://github.com/jano7/executor.git scm:git:git@github.com:jano7/executor.git https://github.com/jano7/executor - executor-2.0.1 + executor-2.0.2 diff --git a/src/main/java/com/jano7/executor/BoundedExecutor.java b/src/main/java/com/jano7/executor/BoundedExecutor.java deleted file mode 100644 index 81b7fd0..0000000 --- a/src/main/java/com/jano7/executor/BoundedExecutor.java +++ /dev/null @@ -1,97 +0,0 @@ -/* -MIT License - -Copyright (c) 2020 Jan Gaspar - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -*/ -package com.jano7.executor; - -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -import static com.jano7.executor.BoundedStrategy.BLOCK; -import static com.jano7.executor.Util.checkNotNull; - -public final class BoundedExecutor implements DrainableExecutor { - - private final int maxTasks; - - private final Semaphore semaphore; - - private final Executor underlyingExecutor; - - private final Runnable acquire; - - private boolean drained = false; - - public BoundedExecutor(int maxTasks, BoundedStrategy onTasksExceeded, Executor underlyingExecutor) { - this.maxTasks = maxTasks; - this.semaphore = new Semaphore(maxTasks); - this.underlyingExecutor = underlyingExecutor; - this.acquire = onTasksExceeded == BLOCK ? this::blockOnTasksExceeded : this::rejectOnTasksExceeded; - } - - private void blockOnTasksExceeded() { - semaphore.acquireUninterruptibly(); - } - - private void rejectOnTasksExceeded() { - if (!semaphore.tryAcquire()) { - throw new RejectedExecutionException("task limit of " + maxTasks + " exceeded"); - } - } - - @Override - public void execute(Runnable task) { - checkNotNull(task); - synchronized (this) { - if (drained) { - throw new RejectedExecutionException("executor drained"); - } else { - acquire.run(); - } - } - try { - underlyingExecutor.execute(new KeyRunnable<>( - task, - () -> { - try { - task.run(); - } finally { - semaphore.release(); - } - }) - ); - } catch (RejectedExecutionException e) { - semaphore.release(); - throw e; - } - } - - @Override - public synchronized boolean drain(long timeout, TimeUnit unit) throws InterruptedException { - if (!drained && semaphore.tryAcquire(maxTasks, timeout, unit)) { - drained = true; - } - return drained; - } -} diff --git a/src/main/java/com/jano7/executor/KeySequentialBoundedExecutor.java b/src/main/java/com/jano7/executor/KeySequentialBoundedExecutor.java index e523183..bab980c 100644 --- a/src/main/java/com/jano7/executor/KeySequentialBoundedExecutor.java +++ b/src/main/java/com/jano7/executor/KeySequentialBoundedExecutor.java @@ -24,23 +24,74 @@ of this software and associated documentation files (the "Software"), to deal package com.jano7.executor; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import static com.jano7.executor.BoundedStrategy.BLOCK; +import static com.jano7.executor.Util.checkNotNull; + public final class KeySequentialBoundedExecutor implements DrainableExecutor { - private final BoundedExecutor boundedExecutor; + private final int maxTasks; + + private final Semaphore semaphore; + + private final KeySequentialExecutor keySequentialExecutor; + + private final Runnable acquire; + + private boolean drained = false; public KeySequentialBoundedExecutor(int maxTasks, BoundedStrategy onTasksExceeded, Executor underlyingExecutor) { - boundedExecutor = new BoundedExecutor(maxTasks, onTasksExceeded, new KeySequentialExecutor(underlyingExecutor)); + this.maxTasks = maxTasks; + this.semaphore = new Semaphore(maxTasks); + this.keySequentialExecutor = new KeySequentialExecutor(underlyingExecutor); + this.acquire = onTasksExceeded == BLOCK ? this::blockOnTasksExceeded : this::rejectOnTasksExceeded; + } + + private void blockOnTasksExceeded() { + semaphore.acquireUninterruptibly(); + } + + private void rejectOnTasksExceeded() { + if (!semaphore.tryAcquire()) { + throw new RejectedExecutionException("task limit of " + maxTasks + " exceeded"); + } } @Override public void execute(Runnable task) { - boundedExecutor.execute(task); + checkNotNull(task); + synchronized (this) { + if (drained) { + throw new RejectedExecutionException("executor drained"); + } else { + acquire.run(); + } + } + try { + keySequentialExecutor.execute(new KeyRunnable<>( + task, + () -> { + try { + task.run(); + } finally { + semaphore.release(); + } + }) + ); + } catch (RejectedExecutionException e) { + semaphore.release(); + throw e; + } } @Override - public boolean drain(long timeout, TimeUnit unit) throws InterruptedException { - return boundedExecutor.drain(timeout, unit); + public synchronized boolean drain(long timeout, TimeUnit unit) throws InterruptedException { + if (!drained && semaphore.tryAcquire(maxTasks, timeout, unit)) { + drained = true; + } + return drained; } } diff --git a/src/test/java/com/jano7/executor/BoundedExecutorTest.java b/src/test/java/com/jano7/executor/BoundedExecutorTest.java deleted file mode 100644 index 2b039c6..0000000 --- a/src/test/java/com/jano7/executor/BoundedExecutorTest.java +++ /dev/null @@ -1,176 +0,0 @@ -package com.jano7.executor; - -import org.junit.Test; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; - -import static com.jano7.executor.BoundedStrategy.BLOCK; -import static com.jano7.executor.BoundedStrategy.REJECT; -import static com.jano7.executor.TestUtil.doSomething; -import static org.junit.Assert.*; - -public class BoundedExecutorTest { - - @Test(timeout = 5000) - public void blockWhenLimitReached() throws InterruptedException { - ExecutorService underlyingExecutor = Executors.newFixedThreadPool(10); - KeySequentialExecutor keySequentialExecutor = new KeySequentialExecutor(underlyingExecutor); - AtomicInteger completed = new AtomicInteger(0); - CountDownLatch block = new CountDownLatch(1); - CountDownLatch done = new CountDownLatch(1); - Runnable blockingTask = new KeyRunnable<>("key", () -> { - try { - block.await(); - } catch (InterruptedException ignored) { - } - completed.incrementAndGet(); - }); - Runnable simpleTask = new KeyRunnable<>("key", completed::incrementAndGet); - - BoundedExecutor bounded = new BoundedExecutor(5, BLOCK, keySequentialExecutor); - bounded.execute(blockingTask); - bounded.execute(simpleTask); - bounded.execute(simpleTask); - bounded.execute(simpleTask); - bounded.execute(simpleTask); - - Thread t = new Thread(() -> { - bounded.execute(simpleTask); - done.countDown(); - }); - t.start(); - - assertFalse(done.await(1, TimeUnit.SECONDS)); - - block.countDown(); - done.await(); - - underlyingExecutor.shutdown(); - underlyingExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - - assertEquals(6, completed.get()); - } - - @Test(timeout = 5000, expected = RejectedExecutionException.class) - public void rejectWhenLimitReached() { - ExecutorService underlyingExecutor = Executors.newSingleThreadExecutor(); - CountDownLatch block = new CountDownLatch(1); - Runnable blockingTask = () -> { - try { - block.await(); - } catch (InterruptedException ignored) { - } - }; - - BoundedExecutor bounded = new BoundedExecutor(2, REJECT, underlyingExecutor); - bounded.execute(blockingTask); - bounded.execute(blockingTask); - - try { - bounded.execute(blockingTask); - } finally { - underlyingExecutor.shutdownNow(); - } - } - - @Test(timeout = 10000) - public void drain() throws InterruptedException { - for (int i = 0; i < 1000; ++i) { - ExecutorService underlying = Executors.newFixedThreadPool(5); - BoundedExecutor bounded = new BoundedExecutor(20, BLOCK, underlying); - CountDownLatch latch = new CountDownLatch(1); - AtomicInteger completed = new AtomicInteger(0); - - for (int j = 0; j < 10; ++j) { - if (j == 5) { - bounded.execute(() -> { - try { - latch.await(); - completed.incrementAndGet(); - } catch (InterruptedException ignored) { - } - }); - } else { - bounded.execute(completed::incrementAndGet); - } - } - - assertFalse(bounded.drain(1, TimeUnit.MILLISECONDS)); - - latch.countDown(); - - assertTrue(bounded.drain(Long.MAX_VALUE, TimeUnit.SECONDS)); - assertEquals(10, completed.get()); - assertTrue(underlying.shutdownNow().isEmpty()); - } - } - - @Test(timeout = 5000, expected = RejectedExecutionException.class) - public void rejectTasksAfterDrain() throws InterruptedException { - ExecutorService underlying = Executors.newCachedThreadPool(); - BoundedExecutor bounded = new BoundedExecutor(10, BLOCK, underlying); - - bounded.execute(doSomething); - - bounded.drain(Long.MAX_VALUE, TimeUnit.SECONDS); - try { - bounded.execute(doSomething); - } finally { - underlying.shutdownNow(); - } - } - - @Test(timeout = 5000) - public void safeToCallDrainMultipleTime() throws InterruptedException { - ExecutorService underlying = Executors.newCachedThreadPool(); - BoundedExecutor bounded = new BoundedExecutor(10, BLOCK, underlying); - - bounded.execute(doSomething); - - assertTrue(bounded.drain(Long.MAX_VALUE, TimeUnit.SECONDS)); - assertTrue(bounded.drain(Long.MAX_VALUE, TimeUnit.SECONDS)); - - underlying.shutdownNow(); - } - - @Test(timeout = 5000) - public void releaseLockOnException() { - Executor underlying = new Executor() { - - private int count = 0; - - @Override - public void execute(Runnable command) { - if (count++ == 0) { - throw new RejectedExecutionException(); - } - command.run(); - } - }; - - Executor bounded = new BoundedExecutor(1, BLOCK, underlying); - - boolean thrown = false; - try { - bounded.execute(doSomething); - } catch (RejectedExecutionException e) { - thrown = true; - } - bounded.execute(doSomething); - - assertTrue(thrown); - } - - @Test(expected = NullPointerException.class) - public void throwExceptionWhenTaskIsNull() { - ExecutorService underlying = Executors.newCachedThreadPool(); - Executor bounded = new BoundedExecutor(10, BLOCK, underlying); - - try { - bounded.execute(null); - } finally { - underlying.shutdownNow(); - } - } -} diff --git a/src/test/java/com/jano7/executor/KeySequentialBoundedExecutorTest.java b/src/test/java/com/jano7/executor/KeySequentialBoundedExecutorTest.java index d2a6525..0900f9e 100644 --- a/src/test/java/com/jano7/executor/KeySequentialBoundedExecutorTest.java +++ b/src/test/java/com/jano7/executor/KeySequentialBoundedExecutorTest.java @@ -28,15 +28,177 @@ of this software and associated documentation files (the "Software"), to deal import java.util.Collections; import java.util.LinkedList; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import static com.jano7.executor.BoundedStrategy.BLOCK; -import static org.junit.Assert.assertEquals; +import static com.jano7.executor.BoundedStrategy.REJECT; +import static com.jano7.executor.TestUtils.doSomething; +import static org.junit.Assert.*; public class KeySequentialBoundedExecutorTest { + @Test(timeout = 5000) + public void blockWhenLimitReached() throws InterruptedException { + ExecutorService underlyingExecutor = Executors.newFixedThreadPool(10); + AtomicInteger completed = new AtomicInteger(0); + CountDownLatch block = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(1); + Runnable blockingTask = new KeyRunnable<>("key", () -> { + try { + block.await(); + } catch (InterruptedException ignored) { + } + completed.incrementAndGet(); + }); + Runnable simpleTask = new KeyRunnable<>("key", completed::incrementAndGet); + + KeySequentialBoundedExecutor bounded = new KeySequentialBoundedExecutor(5, BLOCK, underlyingExecutor); + bounded.execute(blockingTask); + bounded.execute(simpleTask); + bounded.execute(simpleTask); + bounded.execute(simpleTask); + bounded.execute(simpleTask); + + Thread t = new Thread(() -> { + bounded.execute(simpleTask); + done.countDown(); + }); + t.start(); + + assertFalse(done.await(1, TimeUnit.SECONDS)); + + block.countDown(); + done.await(); + + underlyingExecutor.shutdown(); + underlyingExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + + assertEquals(6, completed.get()); + } + + @Test(timeout = 5000, expected = RejectedExecutionException.class) + public void rejectWhenLimitReached() { + ExecutorService underlyingExecutor = Executors.newSingleThreadExecutor(); + CountDownLatch block = new CountDownLatch(1); + Runnable blockingTask = () -> { + try { + block.await(); + } catch (InterruptedException ignored) { + } + }; + + KeySequentialBoundedExecutor bounded = new KeySequentialBoundedExecutor(2, REJECT, underlyingExecutor); + bounded.execute(blockingTask); + bounded.execute(blockingTask); + + try { + bounded.execute(blockingTask); + } finally { + underlyingExecutor.shutdownNow(); + } + } + + @Test(timeout = 10000) + public void drain() throws InterruptedException { + for (int i = 0; i < 1000; ++i) { + ExecutorService underlying = Executors.newFixedThreadPool(5); + KeySequentialBoundedExecutor bounded = new KeySequentialBoundedExecutor(20, BLOCK, underlying); + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger completed = new AtomicInteger(0); + + for (int j = 0; j < 10; ++j) { + if (j == 5) { + bounded.execute(() -> { + try { + latch.await(); + completed.incrementAndGet(); + } catch (InterruptedException ignored) { + } + }); + } else { + bounded.execute(completed::incrementAndGet); + } + } + + assertFalse(bounded.drain(1, TimeUnit.MILLISECONDS)); + + latch.countDown(); + + assertTrue(bounded.drain(Long.MAX_VALUE, TimeUnit.SECONDS)); + assertEquals(10, completed.get()); + assertTrue(underlying.shutdownNow().isEmpty()); + } + } + + @Test(timeout = 5000, expected = RejectedExecutionException.class) + public void rejectTasksAfterDrain() throws InterruptedException { + ExecutorService underlying = Executors.newCachedThreadPool(); + KeySequentialBoundedExecutor bounded = new KeySequentialBoundedExecutor(10, BLOCK, underlying); + + bounded.execute(doSomething); + + bounded.drain(Long.MAX_VALUE, TimeUnit.SECONDS); + try { + bounded.execute(doSomething); + } finally { + underlying.shutdownNow(); + } + } + + @Test(timeout = 5000) + public void safeToCallDrainMultipleTime() throws InterruptedException { + ExecutorService underlying = Executors.newCachedThreadPool(); + KeySequentialBoundedExecutor bounded = new KeySequentialBoundedExecutor(10, BLOCK, underlying); + + bounded.execute(doSomething); + + assertTrue(bounded.drain(Long.MAX_VALUE, TimeUnit.SECONDS)); + assertTrue(bounded.drain(Long.MAX_VALUE, TimeUnit.SECONDS)); + + underlying.shutdownNow(); + } + + @Test(timeout = 5000) + public void releaseLockOnException() { + Executor underlying = new Executor() { + + private int count = 0; + + @Override + public void execute(Runnable command) { + if (count++ == 0) { + throw new RejectedExecutionException(); + } + command.run(); + } + }; + + Executor bounded = new KeySequentialBoundedExecutor(1, BLOCK, underlying); + + boolean thrown = false; + try { + bounded.execute(doSomething); + } catch (RejectedExecutionException e) { + thrown = true; + } + bounded.execute(doSomething); + + assertTrue(thrown); + } + + @Test(expected = NullPointerException.class) + public void throwExceptionWhenTaskIsNull() { + ExecutorService underlying = Executors.newCachedThreadPool(); + Executor bounded = new KeySequentialBoundedExecutor(10, BLOCK, underlying); + + try { + bounded.execute(null); + } finally { + underlying.shutdownNow(); + } + } + @Test(timeout = 5000) public void underLoad() throws InterruptedException { ExecutorService underlyingExecutor = Executors.newFixedThreadPool(10); diff --git a/src/test/java/com/jano7/executor/KeySequentialRunnerTest.java b/src/test/java/com/jano7/executor/KeySequentialRunnerTest.java index a4e68ef..88b21ed 100644 --- a/src/test/java/com/jano7/executor/KeySequentialRunnerTest.java +++ b/src/test/java/com/jano7/executor/KeySequentialRunnerTest.java @@ -33,8 +33,8 @@ of this software and associated documentation files (the "Software"), to deal import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import static com.jano7.executor.BoundedStrategy.BLOCK; -import static com.jano7.executor.TestUtil.doSomething; +import static com.jano7.executor.TestUtils.BoundedExecutor; +import static com.jano7.executor.TestUtils.doSomething; import static org.junit.Assert.*; public class KeySequentialRunnerTest { @@ -232,7 +232,7 @@ public void underLoad() throws InterruptedException { @Test(timeout = 5000) public void underLoadWithBoundedExecutor() throws InterruptedException { ExecutorService underlyingExecutor = Executors.newFixedThreadPool(10); - BoundedExecutor boundedExecutor = new BoundedExecutor(1, BLOCK, underlyingExecutor); + BoundedExecutor boundedExecutor = new BoundedExecutor(1, underlyingExecutor); KeySequentialRunner runner = new KeySequentialRunner<>(boundedExecutor); List processed = Collections.synchronizedList(new LinkedList<>()); @@ -275,9 +275,17 @@ public void rejectHandling() throws InterruptedException { List completedTasks = Collections.synchronizedList(new LinkedList<>()); List submittingThreads = new LinkedList<>(); for (int key = 0; key < keys; ++key) { + final int taskKey = key; for (int i = 0; i < tasksPerKey; ++i) { + final int taskId = (taskKey * tasksPerKey) + i; submittingThreads.add( - submittingThread(key, (key * tasksPerKey) + i, submittedTasks, completedTasks, runner) + new Thread(() -> { + try { + runner.run(taskKey, () -> completedTasks.add(taskId)); + submittedTasks.add(taskId); + } catch (RejectedExecutionException ignored) { + } + }) ); } } @@ -293,18 +301,4 @@ public void rejectHandling() throws InterruptedException { assertTrue(submittedTasks.containsAll(completedTasks) && completedTasks.containsAll(submittedTasks)); } - - private Thread submittingThread(int key, - int taskId, - List submittedTasks, - List completedTasks, - KeySequentialRunner runner) { - return new Thread(() -> { - try { - runner.run(key, () -> completedTasks.add(taskId)); - submittedTasks.add(taskId); - } catch (RejectedExecutionException ignored) { - } - }); - } } diff --git a/src/test/java/com/jano7/executor/TaskQueueTest.java b/src/test/java/com/jano7/executor/TaskQueueTest.java index d993c08..76b81e0 100644 --- a/src/test/java/com/jano7/executor/TaskQueueTest.java +++ b/src/test/java/com/jano7/executor/TaskQueueTest.java @@ -28,7 +28,7 @@ of this software and associated documentation files (the "Software"), to deal import java.util.LinkedList; import java.util.List; -import static com.jano7.executor.TestUtil.doSomething; +import static com.jano7.executor.TestUtils.doSomething; import static org.junit.Assert.*; public class TaskQueueTest { diff --git a/src/test/java/com/jano7/executor/TestUtil.java b/src/test/java/com/jano7/executor/TestUtil.java deleted file mode 100644 index ede2ce4..0000000 --- a/src/test/java/com/jano7/executor/TestUtil.java +++ /dev/null @@ -1,30 +0,0 @@ -/* -MIT License - -Copyright (c) 2020 Jan Gaspar - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -*/ -package com.jano7.executor; - -public class TestUtil { - - public static final Runnable doSomething = () -> { - }; -} diff --git a/src/test/java/com/jano7/executor/TestUtils.java b/src/test/java/com/jano7/executor/TestUtils.java new file mode 100644 index 0000000..0620b12 --- /dev/null +++ b/src/test/java/com/jano7/executor/TestUtils.java @@ -0,0 +1,77 @@ +/* +MIT License + +Copyright (c) 2020 Jan Gaspar + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ +package com.jano7.executor; + +import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import static com.jano7.executor.Util.checkNotNull; + +public class TestUtils { + + public static final Runnable doSomething = () -> { + }; + + public static class BoundedExecutor implements DrainableExecutor { + + private final int maxTasks; + + private final Semaphore semaphore; + + private final Executor underlyingExecutor; + + public BoundedExecutor(int maxTasks, Executor underlyingExecutor) { + this.maxTasks = maxTasks; + this.semaphore = new Semaphore(maxTasks); + this.underlyingExecutor = underlyingExecutor; + } + + @Override + public void execute(Runnable task) { + checkNotNull(task); + semaphore.acquireUninterruptibly(); + try { + underlyingExecutor.execute( + () -> { + try { + task.run(); + } finally { + semaphore.release(); + } + } + ); + } catch (RejectedExecutionException e) { + semaphore.release(); + throw e; + } + } + + @Override + public boolean drain(long timeout, TimeUnit unit) throws InterruptedException { + return semaphore.tryAcquire(maxTasks, timeout, unit); + } + } +}