Skip to content

Commit

Permalink
TFW: support JVM startup hooks when forking
Browse files Browse the repository at this point in the history
Test PartRunner function.
Rename TestPart to Part.
Allow Part chaining with andThen() and butFirst().
Distinguish join hooks (local to parent) from jvm hooks (local to child).
Add state machine to PartRunner.
  • Loading branch information
joe-chacko committed Jun 29, 2023
1 parent c4050a1 commit fb9ee69
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 31 deletions.
2 changes: 1 addition & 1 deletion testify/src/main/java/testify/parts/ForkedPart.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.function.Consumer;

/**
* Interface to customise the behaviour of a {@link PartRunner} when forking a {@link TestPart}.
* Interface to customise the behaviour of a {@link PartRunner} when forking a {@link Part}.
*/
public interface ForkedPart {

Expand Down
6 changes: 3 additions & 3 deletions testify/src/main/java/testify/parts/NamedPart.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

final class NamedPart implements TestPart {
final class NamedPart implements Part {
private enum Event implements TypeSpec<Throwable> {STARTED, ENDED}
private static final ConcurrentMap<String, AtomicInteger> uids = new ConcurrentHashMap<>();
final String name;
private final TestPart part;
private final Part part;
private final String uid;

NamedPart(String name, TestPart part) {
NamedPart(String name, Part part) {
this.name = name;
this.part = part;
int instance = uids.computeIfAbsent(name, s -> new AtomicInteger()).incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import java.util.function.Consumer;

/**
* A test part is a piece of a test to be performed in another context,
* e.g. a separate thread or a new process.
* A piece of work to be performed in another context, such as a separate thread or a new process.
* <br>
* Logically, this is like a {@link Consumer} of a {@link Bus},
* but it enforces an additional requirement on the compiler:
Expand All @@ -35,6 +34,21 @@
* all its explicit and implicit references must be serializable.</em>
*/
@FunctionalInterface
public interface TestPart extends Serializable {
public interface Part extends Serializable {
void run(Bus bus) throws Throwable;

default Part andThen(Part that) {
if (that == NO_OP) return this;
if (this == NO_OP) return that;
return bus -> {
this.run(bus);
that.run(bus);
};
}

default Part butFirst(Part that) {
return that.andThen(this);
}

Part NO_OP = bus -> {};
}
33 changes: 29 additions & 4 deletions testify/src/main/java/testify/parts/PartRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,38 @@
import java.util.function.Consumer;

/**
* An object that can run a {@link TestPart} in another context.
* An object that can run a {@link Part} in another context.
* Methods are provided for configuring what that contet should be.
*/
@SuppressWarnings("UnusedReturnValue")
public interface PartRunner {
static PartRunner create() { return new PartRunnerImpl(); }

/**
* The possible states of a PartRunner.
*/
enum State {
/** Some config options on a PartRunner must be set before first use. */
CONFIGURING,
/** Once a PartRunner is IN_USE, some configuration options may no longer be changed */
IN_USE,
/** After join() has been called, a PartRunner cannot be used again */
COMPLETED;
}

/**
*
*/
State getState();

/**
* Add a piece of work to be performed when a new JVM is started.
*
* @param initializer - the work to be done on new JVM startup
* @throws IllegalStateException if the state of this PartRunner is not CONFIGURING
*/
void addJVMStartupHook(Part initializer) throws IllegalStateException;

/**
* Enable a range of log levels for the specified pattern and partnames.
* @param level the most detailed log level to include
Expand All @@ -49,12 +74,12 @@ public interface PartRunner {
PartRunner useNewJVMWhenForking(String...jvmArgs);
PartRunner useNewThreadWhenForking();

ForkedPart fork(String partName, TestPart part);
default ForkedPart fork(Enum<?> partName, TestPart part) { return fork(partName.toString(), part); }
ForkedPart fork(String partName, Part part);
default ForkedPart fork(Enum<?> partName, Part part) { return fork(partName.toString(), part); }

default ForkedPart forkMain(Class<?> mainClass, String...args) { return fork(mainClass.getName(), wrapMain(mainClass, args)); }

static TestPart wrapMain(Class<?> mainClass, String[] args) {
static Part wrapMain(Class<?> mainClass, String[] args) {
return bus -> {
try {
mainClass.getMethod("main", String[].class).invoke(null, new Object[]{args});
Expand Down
77 changes: 57 additions & 20 deletions testify/src/main/java/testify/parts/PartRunnerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,53 @@
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.EnumSet.complementOf;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;
import static testify.bus.TestLogLevel.DEBUG;
import static testify.bus.TestLogLevel.ERROR;
import static testify.bus.TestLogLevel.WARN;
import static testify.parts.PartRunnerImpl.HookType.JOIN;
import static testify.parts.PartRunnerImpl.HookType.POST_JOIN;
import static testify.parts.PartRunnerImpl.HookType.PRE_JOIN;
import static testify.parts.PartRunner.State.COMPLETED;
import static testify.parts.PartRunner.State.CONFIGURING;
import static testify.parts.PartRunnerImpl.JoinHookType.JOIN;
import static testify.parts.PartRunnerImpl.JoinHookType.POST_JOIN;
import static testify.parts.PartRunnerImpl.JoinHookType.PRE_JOIN;

class PartRunnerImpl implements PartRunner {
private static final EnumSet<TestLogLevel> URGENT_LEVELS = EnumSet.of(ERROR, WARN);
private final String label = ObjectUtil.getNextObjectLabel(PartRunnerImpl.class);
private final InterProcessBus centralBus = InterProcessBus.createParent();
private final Map<String, Bus> knownBuses = new ConcurrentHashMap<>();

enum HookType { PRE_JOIN, JOIN, POST_JOIN; }
private final EnumMap<HookType, Deque<EasyCloseable>> hooks = new EnumMap<>(HookType.class);
{ for (HookType ht: HookType.values()) hooks.put(ht, new ConcurrentLinkedDeque<>()); }
enum JoinHookType {
PRE_JOIN,
JOIN,
POST_JOIN;
}
private final EnumMap<JoinHookType, Deque<EasyCloseable>> hooks = new EnumMap<>(JoinHookType.class);
{ for (JoinHookType ht: JoinHookType.values()) hooks.put(ht, new ConcurrentLinkedDeque<>()); }
private final Map<EasyCloseable, String> hookNames = new HashMap<>();
private Function<String, Bus> busFactory = ((Function<String, Bus>)centralBus::forUser)
.andThen(b -> b.logToSysErr(URGENT_LEVELS))
.andThen(b -> b.logToSysOut(complementOf(URGENT_LEVELS)));
private boolean useProcesses = false;

private Part jvmInitHook = Part.NO_OP;
private String[] jvmArgs;
private State state = CONFIGURING;
@Override
public State getState() { return state; }

@Override
public void addJVMStartupHook(Part hook) throws IllegalStateException {
if (CONFIGURING != this.state) throw new IllegalStateException("PartRunner cannot be configured after first use");
jvmInitHook = jvmInitHook.andThen(hook);
}

@Override
public PartRunner enableTestLogging(TestLogLevel level, String pattern) {
Expand All @@ -80,25 +97,36 @@ public Bus bus(String partName) {

@Override
public PartRunner useNewJVMWhenForking(String... jvmArgs) {
if (state == COMPLETED) throw new IllegalStateException("Runner already completed");
this.useProcesses = true;
this.jvmArgs = jvmArgs;
return this;
}

@Override
public PartRunner useNewThreadWhenForking() {
if (state == COMPLETED) throw new IllegalStateException("Runner already completed");
this.useProcesses = false;
this.jvmArgs = null;
return this;
}

@Override
public ForkedPart fork(String partName, TestPart part) {
final Runner<?> runner = useProcesses ? new ProcessRunner(jvmArgs) : ThreadRunner.SINGLETON;
public ForkedPart fork(String partName, Part part) {
if (state == COMPLETED) throw new IllegalStateException("Runner already completed");
final Runner<?> runner;
if (useProcesses) {
runner = new ProcessRunner(jvmArgs);
part = part.butFirst(jvmInitHook);
} else {
runner = ThreadRunner.SINGLETON;
}

return fork(runner, partName, part);
}

private <J> ForkedPart fork(Runner<J> runner, String partName, TestPart part) {
private <J> ForkedPart fork(Runner<J> runner, String partName, Part part) {
state = State.IN_USE;
try {
final NamedPart namedPart = new NamedPart(partName, part);
J job = runner.fork(centralBus, namedPart);
Expand All @@ -111,12 +139,12 @@ private <J> ForkedPart fork(Runner<J> runner, String partName, TestPart part) {
}
}

private void addHook(HookType hookType, String partName, EasyCloseable hook) {
private void addHook(JoinHookType hookType, String partName, EasyCloseable hook) {
hookNames.put(hook, partName);
Deque<EasyCloseable> hookQueue = hooks.get(hookType);
// PRE_JOIN hooks get run in order of addition;
// PRE_JOIN hooks are run in order of addition;
// other hooks are run in reverse order of addition.
if (Objects.requireNonNull(hookType) == PRE_JOIN) hookQueue.add(hook);
if (requireNonNull(hookType) == PRE_JOIN) hookQueue.add(hook);
else hookQueue.addFirst(hook);
}

Expand All @@ -135,17 +163,20 @@ private Error fatalError(Throwable t) {
}
}

// recursively ensure close
private void close(HookType type) {
// recursively ensure close using try-with-resources and finally
private void close(JoinHookType type) {
Deque<EasyCloseable> closeables = this.hooks.get(type);
if (closeables.isEmpty()) return;
if (closeables.isEmpty()) return; // end recursion when all
String name = "unknown";
// Use try-with-resources to drive close on the first hook.
// Get hook using poll() so the queue shrinks.
try (EasyCloseable hook = closeables.poll()) {
final String partName = name = hookNames.get(hook);
privateBus().log(DEBUG, () -> "Running " + partName + " " + type + " hook.");
} finally {
final String partName = name;
privateBus().log(DEBUG, () -> "Stopped running " + partName + " " + type + " hook.");
// recurse in finally to close next hook
close(type);
}
}
Expand All @@ -154,14 +185,20 @@ private void close(HookType type) {
public void join() {
// close down the main bus
try (EasyCloseable close = centralBus) {
for (HookType type : HookType.values()) {
privateBus().log(() -> "Running " + type + " hooks: " + hooks.get(type).stream().map(hookNames::get).collect(Collectors.joining()));
close(type);
}
runHooks(PRE_JOIN);
runHooks(JOIN);
runHooks(POST_JOIN);
privateBus().log("Completed all join actions.");
} finally {
state = COMPLETED;
}
}

private void runHooks(JoinHookType type) {
privateBus().log(() -> "Running " + type + " hooks: " + hooks.get(type).stream().map(hookNames::get).collect(Collectors.joining()));
close(type);
}

private <J> void registerForJoin(Runner<J> runner, J job, String name) {
addHook(JOIN, name, () -> {
try {
Expand Down
Loading

0 comments on commit fb9ee69

Please sign in to comment.