Skip to content

Commit

Permalink
move logic from BoundedExecutor to KeySequentialBoundedExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
Jano committed Sep 20, 2020
1 parent 14fa049 commit aeb719b
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 333 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,6 @@ from multiple threads without synchronization.
<dependency>
<groupId>com.jano7</groupId>
<artifactId>executor</artifactId>
<version>2.0.1</version>
<version>2.0.2</version>
</dependency>
```
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
<connection>scm:git:git://github.com/jano7/executor.git</connection>
<developerConnection>scm:git:git@github.com:jano7/executor.git</developerConnection>
<url>https://github.com/jano7/executor</url>
<tag>executor-2.0.1</tag>
<tag>executor-2.0.2</tag>
</scm>

</project>
97 changes: 0 additions & 97 deletions src/main/java/com/jano7/executor/BoundedExecutor.java

This file was deleted.

61 changes: 56 additions & 5 deletions src/main/java/com/jano7/executor/KeySequentialBoundedExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,74 @@ of this software and associated documentation files (the "Software"), to deal
package com.jano7.executor;

import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import static com.jano7.executor.BoundedStrategy.BLOCK;
import static com.jano7.executor.Util.checkNotNull;

public final class KeySequentialBoundedExecutor implements DrainableExecutor {

private final BoundedExecutor boundedExecutor;
private final int maxTasks;

private final Semaphore semaphore;

private final KeySequentialExecutor keySequentialExecutor;

private final Runnable acquire;

private boolean drained = false;

public KeySequentialBoundedExecutor(int maxTasks, BoundedStrategy onTasksExceeded, Executor underlyingExecutor) {
boundedExecutor = new BoundedExecutor(maxTasks, onTasksExceeded, new KeySequentialExecutor(underlyingExecutor));
this.maxTasks = maxTasks;
this.semaphore = new Semaphore(maxTasks);
this.keySequentialExecutor = new KeySequentialExecutor(underlyingExecutor);
this.acquire = onTasksExceeded == BLOCK ? this::blockOnTasksExceeded : this::rejectOnTasksExceeded;
}

private void blockOnTasksExceeded() {
semaphore.acquireUninterruptibly();
}

private void rejectOnTasksExceeded() {
if (!semaphore.tryAcquire()) {
throw new RejectedExecutionException("task limit of " + maxTasks + " exceeded");
}
}

@Override
public void execute(Runnable task) {
boundedExecutor.execute(task);
checkNotNull(task);
synchronized (this) {
if (drained) {
throw new RejectedExecutionException("executor drained");
} else {
acquire.run();
}
}
try {
keySequentialExecutor.execute(new KeyRunnable<>(
task,
() -> {
try {
task.run();
} finally {
semaphore.release();
}
})
);
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}

@Override
public boolean drain(long timeout, TimeUnit unit) throws InterruptedException {
return boundedExecutor.drain(timeout, unit);
public synchronized boolean drain(long timeout, TimeUnit unit) throws InterruptedException {
if (!drained && semaphore.tryAcquire(maxTasks, timeout, unit)) {
drained = true;
}
return drained;
}
}
176 changes: 0 additions & 176 deletions src/test/java/com/jano7/executor/BoundedExecutorTest.java

This file was deleted.

Loading

0 comments on commit aeb719b

Please sign in to comment.