Skip to content

Commit 10a890f

Browse files
authored
Merge pull request #2 from jano7/v2.0
V2.0
2 parents 0434b8c + b70a25e commit 10a890f

21 files changed

+951
-420
lines changed

README.md

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Key Sequential Executor
22
This small library provides an optimized solution to a problem where tasks for a particular key need to be processed
33
sequentially as they arrive. This kind of problem can be solved by a [`SingleThreadExecutor`](
4-
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor--), however it is
4+
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newSingleThreadExecutor--); however, it is
55
not efficient. The issue is that the tasks for unrelated keys are not being processed in parallel, instead they are put
66
into a queue common to all keys and wait for the single thread to execute them. This library allows them to be executed
77
concurrently. Moreover this library works well in a situation where all the possible keys and their number is **not**
@@ -10,7 +10,7 @@ known upfront.
1010
A typical scenario in order management or booking systems is that messages for a particular trade **A** must be
1111
processed sequentially in the same order as they are received (otherwise the state of the trade will be incorrect). The
1212
same is true for any other trade - for example messages for the trade **B** must be processed sequentially as well.
13-
However it is desirable that a message for the trade **A** does not block processing of a message for the trade **B**
13+
However, it is desirable that a message for the trade **A** does not block processing of a message for the trade **B**
1414
(and vice versa) if they happen to arrive at the same time.
1515
```java
1616
ExecutorService underlyingExecutor = Executors.newFixedThreadPool(10);
@@ -33,7 +33,7 @@ runner.run(tradeIdB, task); // execution is not blocked by the task for tradeIdA
3333

3434
runner.run(tradeIdA, task); // execution starts when the previous task for tradeIdA completes
3535
```
36-
In the example above the key is a Trade ID. Tasks for a particular Trade ID are executed sequentially but they do not
36+
In the example above the key is a Trade ID. Tasks for a particular Trade ID are executed sequentially, but they do not
3737
block tasks for other Trade IDs (unless the tasks are blocked by the underlying executor).
3838

3939
Please note the Key needs to correctly implement `hashCode` and `equals` methods as the implementation stores the tasks
@@ -47,32 +47,42 @@ key.
4747
```java
4848
Executor executor = new KeySequentialExecutor(underlyingExecutor);
4949

50-
Runnable runnable = new KeyRunnable<>(tradeIdA, task); // helper class delegating hashCode and equals to the key
50+
Runnable runnable =
51+
new KeyRunnable<>(tradeIdA, task); // helper class delegating 'hashCode' and 'equals' to the key
5152

5253
executor.execute(runnable);
53-
```
5454

55+
underlyingExecutor.shutdown();
56+
57+
// at this point, tasks for new keys will be rejected; however, tasks for keys being currently executed may
58+
// still be accepted (and executed)
59+
60+
underlyingExecutor.awaitTermination(timeout, TimeUnit.SECONDS);
61+
62+
// if the executor terminates before a timeout, then it is guaranteed all accepted tasks have been executed
63+
```
5564
The `KeySequentialExecutor` and `KeySequentialRunner` do not support back-pressure. It means that `execute` and `run`
5665
methods never block, instead the submitted tasks are put into a queue where they wait until executed by the underlying
57-
executor. In many cases this is not a problem, however in some situations it may cause an application to run out of
66+
executor. In many cases this is not a problem, but in some situations it may cause an application to run out of
5867
memory as the number of waiting tasks grows. If you want to restrict the number of queued tasks, consider use of a
59-
[`KeySequentialBoundedExecutor`](src/main/java/com/jano7/executor/KeySequentialBoundedExecutor.java) which blocks the
60-
task submission when the number of tasks, which haven't been executed yet, hits the limit.
68+
[`KeySequentialBoundedExecutor`](src/main/java/com/jano7/executor/KeySequentialBoundedExecutor.java) which can be
69+
configured to block the task submission when the number of tasks, which haven't been executed yet, reaches the limit.
6170
```java
6271
ExecutorService underlyingExecutor = Executors.newCachedThreadPool();
6372
int maxTasks = 10;
64-
KeySequentialBoundedExecutor boundedExecutor = new KeySequentialBoundedExecutor(maxTasks, underlyingExecutor);
73+
KeySequentialBoundedExecutor boundedExecutor =
74+
new KeySequentialBoundedExecutor(maxTasks, BoundedStrategy.BLOCK, underlyingExecutor);
6575

66-
KeyRunnable<String> aTask = new KeyRunnable<>("my key", () -> {
76+
KeyRunnable<String> task = new KeyRunnable<>("my key", () -> {
6777
// do something
6878
});
6979

70-
boundedExecutor.execute(aTask);
80+
boundedExecutor.execute(task);
7181

7282
// execute more tasks ... at most 10 will be scheduled
7383

7484
// before shutting down you can call a 'drain' method which blocks until all submitted task have been executed
75-
boundedExecutor.drain(aTimeout, TimeUnit.SECONDS); // returns true if drained; false if the timeout elapses
85+
boundedExecutor.drain(timeout, TimeUnit.SECONDS); // returns true if drained; false if the timeout elapses
7686

7787
// newly submitted tasks will be rejected after calling 'drain'
7888

@@ -81,12 +91,12 @@ underlyingExecutor.shutdownNow(); // safe to call 'shutdownNow' if drained as th
8191
The source code of the examples can be found [here](src/test/java/com/jano7/executor/Examples.java).
8292

8393
A note on **thread-safety**: The library is thread-safe; i.e. methods `run`, `execute` or `drain` can be safely invoked
84-
from multiple threads without synchronization.
94+
from multiple threads without synchronization.
8595
## Maven Dependency
8696
```xml
8797
<dependency>
8898
<groupId>com.jano7</groupId>
8999
<artifactId>executor</artifactId>
90-
<version>1.0.6</version>
100+
<version>2.0.0</version>
91101
</dependency>
92102
```

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
<groupId>com.jano7</groupId>
55
<artifactId>executor</artifactId>
6-
<version>1.0.7-SNAPSHOT</version>
6+
<version>2.0.0-SNAPSHOT</version>
77

88
<name>Java Key Sequential Executor</name>
99
<description>This small library provides an optimized solution to a problem where tasks for a particular key need to
@@ -152,7 +152,7 @@
152152
<connection>scm:git:git://github.com/jano7/executor.git</connection>
153153
<developerConnection>scm:git:git@github.com:jano7/executor.git</developerConnection>
154154
<url>https://github.com/jano7/executor</url>
155-
<tag>executor-1.0.6</tag>
155+
<tag>executor-2.0.0</tag>
156156
</scm>
157157

158158
</project>
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
MIT License
3+
4+
Copyright (c) 2020 Jan Gaspar
5+
6+
Permission is hereby granted, free of charge, to any person obtaining a copy
7+
of this software and associated documentation files (the "Software"), to deal
8+
in the Software without restriction, including without limitation the rights
9+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
copies of the Software, and to permit persons to whom the Software is
11+
furnished to do so, subject to the following conditions:
12+
13+
The above copyright notice and this permission notice shall be included in all
14+
copies or substantial portions of the Software.
15+
16+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
SOFTWARE.
23+
*/
24+
package com.jano7.executor;
25+
26+
import java.util.concurrent.Executor;
27+
import java.util.concurrent.RejectedExecutionException;
28+
import java.util.concurrent.Semaphore;
29+
import java.util.concurrent.TimeUnit;
30+
31+
import static com.jano7.executor.BoundedStrategy.BLOCK;
32+
import static com.jano7.executor.Util.checkNotNull;
33+
34+
public final class BoundedExecutor implements DrainableExecutor {
35+
36+
private final int maxTasks;
37+
38+
private final Semaphore semaphore;
39+
40+
private final Executor underlyingExecutor;
41+
42+
private final Runnable acquire;
43+
44+
private boolean drained = false;
45+
46+
public BoundedExecutor(int maxTasks, BoundedStrategy onTasksExceeded, Executor underlyingExecutor) {
47+
this.maxTasks = maxTasks;
48+
this.semaphore = new Semaphore(maxTasks);
49+
this.underlyingExecutor = underlyingExecutor;
50+
this.acquire = onTasksExceeded == BLOCK ? this::blockOnTasksExceeded : this::rejectOnTasksExceeded;
51+
}
52+
53+
private void blockOnTasksExceeded() {
54+
semaphore.acquireUninterruptibly();
55+
}
56+
57+
private void rejectOnTasksExceeded() {
58+
if (!semaphore.tryAcquire()) {
59+
throw new RejectedExecutionException("task limit of " + maxTasks + " exceeded");
60+
}
61+
}
62+
63+
@Override
64+
public void execute(Runnable task) {
65+
checkNotNull(task);
66+
synchronized (this) {
67+
if (drained) {
68+
throw new RejectedExecutionException("executor drained");
69+
} else {
70+
acquire.run();
71+
}
72+
}
73+
try {
74+
underlyingExecutor.execute(new KeyRunnable<>(
75+
task,
76+
() -> {
77+
try {
78+
task.run();
79+
} finally {
80+
semaphore.release();
81+
}
82+
})
83+
);
84+
} catch (RejectedExecutionException e) {
85+
semaphore.release();
86+
throw e;
87+
}
88+
}
89+
90+
@Override
91+
public synchronized boolean drain(long timeout, TimeUnit unit) throws InterruptedException {
92+
if (!drained && semaphore.tryAcquire(maxTasks, timeout, unit)) {
93+
drained = true;
94+
}
95+
return drained;
96+
}
97+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
MIT License
3+
4+
Copyright (c) 2020 Jan Gaspar
5+
6+
Permission is hereby granted, free of charge, to any person obtaining a copy
7+
of this software and associated documentation files (the "Software"), to deal
8+
in the Software without restriction, including without limitation the rights
9+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
copies of the Software, and to permit persons to whom the Software is
11+
furnished to do so, subject to the following conditions:
12+
13+
The above copyright notice and this permission notice shall be included in all
14+
copies or substantial portions of the Software.
15+
16+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
SOFTWARE.
23+
*/
24+
package com.jano7.executor;
25+
26+
public enum BoundedStrategy {
27+
REJECT, BLOCK
28+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
MIT License
3+
4+
Copyright (c) 2020 Jan Gaspar
5+
6+
Permission is hereby granted, free of charge, to any person obtaining a copy
7+
of this software and associated documentation files (the "Software"), to deal
8+
in the Software without restriction, including without limitation the rights
9+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
copies of the Software, and to permit persons to whom the Software is
11+
furnished to do so, subject to the following conditions:
12+
13+
The above copyright notice and this permission notice shall be included in all
14+
copies or substantial portions of the Software.
15+
16+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
SOFTWARE.
23+
*/
24+
package com.jano7.executor;
25+
26+
import java.util.concurrent.Executor;
27+
import java.util.concurrent.TimeUnit;
28+
29+
public interface DrainableExecutor extends Executor {
30+
31+
boolean drain(long timeout, TimeUnit unit) throws InterruptedException;
32+
}

src/main/java/com/jano7/executor/KeyRunnable.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,12 @@ public KeyRunnable(Key key, Runnable runnable) {
3838

3939
@Override
4040
public boolean equals(Object o) {
41-
if (this == o) return true;
42-
if (o == null || getClass() != o.getClass()) return false;
41+
if (this == o) {
42+
return true;
43+
}
44+
if (o == null || getClass() != o.getClass()) {
45+
return false;
46+
}
4347
KeyRunnable<?> that = (KeyRunnable<?>) o;
4448
return Objects.equals(key, that.key);
4549
}
@@ -49,6 +53,11 @@ public int hashCode() {
4953
return Objects.hashCode(key);
5054
}
5155

56+
@Override
57+
public String toString() {
58+
return Objects.toString(key);
59+
}
60+
5261
@Override
5362
public void run() {
5463
runnable.run();

src/main/java/com/jano7/executor/KeySequentialBoundedExecutor.java

Lines changed: 8 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -24,61 +24,23 @@ of this software and associated documentation files (the "Software"), to deal
2424
package com.jano7.executor;
2525

2626
import java.util.concurrent.Executor;
27-
import java.util.concurrent.RejectedExecutionException;
28-
import java.util.concurrent.Semaphore;
2927
import java.util.concurrent.TimeUnit;
3028

31-
public final class KeySequentialBoundedExecutor implements Executor {
29+
public final class KeySequentialBoundedExecutor implements DrainableExecutor {
3230

33-
private final int maxTasks;
31+
private final BoundedExecutor boundedExecutor;
3432

35-
private final Semaphore semaphore;
36-
37-
private final KeySequentialExecutor executor;
38-
39-
private boolean drained = false;
40-
41-
public KeySequentialBoundedExecutor(int maxTasks, KeySequentialExecutor executor) {
42-
this.maxTasks = maxTasks;
43-
this.semaphore = new Semaphore(maxTasks);
44-
this.executor = executor;
45-
}
46-
47-
public KeySequentialBoundedExecutor(int maxTasks, Executor underlyingExecutor) {
48-
this(maxTasks, new KeySequentialExecutor(underlyingExecutor));
33+
public KeySequentialBoundedExecutor(int maxTasks, BoundedStrategy onTasksExceeded, Executor underlyingExecutor) {
34+
boundedExecutor = new BoundedExecutor(maxTasks, onTasksExceeded, new KeySequentialExecutor(underlyingExecutor));
4935
}
5036

5137
@Override
5238
public void execute(Runnable task) {
53-
if (task == null) throw new NullPointerException("Task is null");
54-
synchronized (this) {
55-
if (drained) {
56-
throw new RejectedExecutionException(getClass().getSimpleName() + " drained");
57-
} else {
58-
semaphore.acquireUninterruptibly();
59-
}
60-
}
61-
try {
62-
executor.execute(new KeyRunnable<>(
63-
task,
64-
() -> {
65-
try {
66-
task.run();
67-
} finally {
68-
semaphore.release();
69-
}
70-
})
71-
);
72-
} catch (RejectedExecutionException e) {
73-
semaphore.release();
74-
throw e;
75-
}
39+
boundedExecutor.execute(task);
7640
}
7741

78-
public synchronized boolean drain(long timeout, TimeUnit unit) throws InterruptedException {
79-
if (!drained && semaphore.tryAcquire(maxTasks, timeout, unit)) {
80-
drained = true;
81-
}
82-
return drained;
42+
@Override
43+
public boolean drain(long timeout, TimeUnit unit) throws InterruptedException {
44+
return boundedExecutor.drain(timeout, unit);
8345
}
8446
}

src/main/java/com/jano7/executor/KeySequentialExecutor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,15 @@ public final class KeySequentialExecutor implements Executor {
3030
private final KeySequentialRunner<Runnable> runner;
3131

3232
public KeySequentialExecutor(Executor underlyingExecutor) {
33-
this.runner = new KeySequentialRunner<>(underlyingExecutor);
33+
runner = new KeySequentialRunner<>(underlyingExecutor);
3434
}
3535

36-
public KeySequentialExecutor(Executor underlyingExecutor, TaskExceptionHandler exceptionHandler) {
37-
this.runner = new KeySequentialRunner<>(underlyingExecutor, exceptionHandler);
36+
public KeySequentialExecutor(Executor underlyingExecutor, TaskExceptionHandler<Runnable> exceptionHandler) {
37+
runner = new KeySequentialRunner<>(underlyingExecutor, exceptionHandler);
3838
}
3939

4040
@Override
4141
public void execute(Runnable task) {
42-
if (task == null) throw new NullPointerException("Task is null");
4342
runner.run(task, task);
4443
}
4544
}

0 commit comments

Comments
 (0)