Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,19 @@ default CompletableFuture<Void> addProblemChange(@NonNull ProblemChange<Solution
CompletableFuture<Void> addProblemChanges(@NonNull List<ProblemChange<Solution_>> problemChangeList);

/**
* Terminates the solver or cancels the solver job if it hasn't (re)started yet.
* <p>
* Terminates the solver if running.
* If the solver has not yet started, the job will be canceled and the solver will never start.
* Does nothing if the solver already terminated.
* <p>
* Waits for the termination or cancellation to complete before returning.
* During termination, a {@code bestSolutionConsumer} could still be called. When the solver terminates,
* the {@code finalBestSolutionConsumer} is executed with the latest best solution.
* These consumers run on a consumer thread independently of the termination and may still run even after
* this method returns.
* <p>
* It waits for no more than one minute, at which point it returns.
* In this case the solver might still be running in the background, but it will be terminated as soon as possible.
* Any best solutions from that point on will be ignored.
*/
void terminateEarly();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import ai.timefold.solver.core.api.solver.change.ProblemChange;
import ai.timefold.solver.core.api.solver.event.EventProducerId;

import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;

Expand Down Expand Up @@ -149,7 +148,6 @@ void set(Solution_ bestSolution, EventProducerId producerId, BooleanSupplier isE
* @return CompletableFuture that will be completed after the best solution containing this change is passed to
* a user-defined Consumer.
*/
@NonNull
CompletableFuture<Void> addProblemChange(Solver<Solution_> solver, List<ProblemChange<Solution_>> problemChangeList) {
var futureProblemChange = new CompletableFuture<Void>();
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
Expand All @@ -14,6 +15,23 @@
import ai.timefold.solver.core.api.solver.event.NewBestSolutionEvent;
import ai.timefold.solver.core.api.solver.event.SolverJobStartedEvent;

import org.jspecify.annotations.NullMarked;
import org.jspecify.annotations.Nullable;

/**
* Support class for consuming solver events in a separate thread.
* It ensures that the events are consumed in the correct order and handles exceptions properly.
* <p>
* The public consume* methods in this class are called by the Solver thread,
* and the actual consumption of events is scheduled in the schedule* to a separate Consumer thread
* produced by the {@link #consumerExecutor}.
* The consumptions are protected by semaphores to ensure the correct order and to avoid concurrent consumptions,
* and it is the responsibility of the consume* methods to only run the schedule* methods when locked.
*
* @param <Solution_> the solution type
* @param <ProblemId_> the problem id type
*/
@NullMarked
final class ConsumerSupport<Solution_, ProblemId_> implements AutoCloseable {

private final ProblemId_ problemId;
Expand All @@ -27,8 +45,8 @@ final class ConsumerSupport<Solution_, ProblemId_> implements AutoCloseable {
private final Semaphore startSolverJobConsumption = new Semaphore(1);
private final BestSolutionHolder<Solution_> bestSolutionHolder;
private final ExecutorService consumerExecutor = Executors.newSingleThreadExecutor();
private Solution_ firstInitializedSolution;
private Solution_ initialSolution;
private final AtomicReference<Solution_> firstInitializedSolution = new AtomicReference<>();
private final AtomicReference<Solution_> initialSolution = new AtomicReference<>();

public ConsumerSupport(ProblemId_ problemId,
Consumer<NewBestSolutionEvent<Solution_>> bestSolutionConsumer,
Expand All @@ -47,164 +65,146 @@ public ConsumerSupport(ProblemId_ problemId,
this.solverJobStartedConsumer = solverJobStartedConsumer;
this.exceptionHandler = exceptionHandler;
this.bestSolutionHolder = bestSolutionHolder;
this.firstInitializedSolution = null;
this.initialSolution = null;
}

// Called on the Solver thread.
void consumeIntermediateBestSolution(Solution_ bestSolution, EventProducerId producerId,
void consumeIntermediateBestSolution(Solution_ solution, EventProducerId producerId,
BooleanSupplier isEveryProblemChangeProcessed) {
/*
* If the bestSolutionConsumer is not provided, the best solution is still set for the purpose of recording
* problem changes.
*/
bestSolutionHolder.set(bestSolution, producerId, isEveryProblemChangeProcessed);
bestSolutionHolder.set(solution, producerId, isEveryProblemChangeProcessed);
if (bestSolutionConsumer != null) {
tryConsumeWaitingIntermediateBestSolution();
}
}

// Called on the Solver thread.
void consumeFirstInitializedSolution(Solution_ firstInitializedSolution, EventProducerId producerId,
boolean isTerminatedEarly) {
try {
// Called on the solver thread
// During the solving process, this lock is called once, and it won't block the Solver thread
firstSolutionConsumption.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted when waiting for the first initialized solution consumption.");
}
// called on the Consumer thread
this.firstInitializedSolution = firstInitializedSolution;
scheduleFirstInitializedSolutionConsumption(
solution -> firstInitializedSolutionConsumer
.accept(new FirstInitializedSolutionEventImpl<>(solution, producerId, isTerminatedEarly)));
}

// Called on the consumer thread
void consumeStartSolverJob(Solution_ initialSolution) {
try {
// Called on the solver thread
// During the solving process, this lock is called once, and it won't block the Solver thread
startSolverJobConsumption.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted when waiting for the start solver job consumption.");
}
// called on the Consumer thread
this.initialSolution = initialSolution;
scheduleStartJobConsumption();
}

// Called on the Solver thread after Solver#solve() returns.
void consumeFinalBestSolution(Solution_ finalBestSolution) {
try {
acquireAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted when waiting for the final best solution consumption.");
}
// Make sure the final best solution is consumed by the intermediate best solution consumer first.
// Situation:
// The consumer is consuming the last but one best solution. The final best solution is waiting for the consumer.
if (bestSolutionConsumer != null) {
scheduleIntermediateBestSolutionConsumption();
}
consumerExecutor.submit(() -> {
try {
finalBestSolutionConsumer.accept(new FinalBestSolutionEventImpl<>(finalBestSolution));
} catch (Throwable throwable) {
exceptionHandler.accept(problemId, throwable);
} finally {
// If there is no intermediate best solution consumer, complete the problem changes now.
if (bestSolutionConsumer == null) {
var solutionHolder = bestSolutionHolder.take();
if (solutionHolder != null) {
solutionHolder.completeProblemChanges();
}
}
// Cancel problem changes that arrived after the solver terminated.
bestSolutionHolder.cancelPendingChanges();
releaseAll();
disposeConsumerThread();
}
});
}

// Called both on the Solver thread and the Consumer thread.
private void tryConsumeWaitingIntermediateBestSolution() {
if (bestSolutionHolder.isEmpty()) {
return; // There is no best solution to consume.
}
if (activeConsumption.tryAcquire()) {
scheduleIntermediateBestSolutionConsumption().thenRunAsync(this::tryConsumeWaitingIntermediateBestSolution,
consumerExecutor);
scheduleIntermediateBestSolutionConsumption()
.whenCompleteAsync((solution, throwable) -> {
activeConsumption.release();
tryConsumeWaitingIntermediateBestSolution();
}, consumerExecutor);
}
}

/**
* Called both on the Solver thread and the Consumer thread.
* Don't call without locking, otherwise multiple consumptions may be scheduled.
*/
private CompletableFuture<Void> scheduleIntermediateBestSolutionConsumption() {
return CompletableFuture.runAsync(() -> {
BestSolutionContainingProblemChanges<Solution_> bestSolutionContainingProblemChanges = bestSolutionHolder.take();
var bestSolutionContainingProblemChanges = bestSolutionHolder.take();
if (bestSolutionContainingProblemChanges != null) {
try {
bestSolutionConsumer
.accept(new NewBestSolutionEventImpl<>(bestSolutionContainingProblemChanges.getBestSolution(),
bestSolutionContainingProblemChanges.getProducerId()));
if (bestSolutionConsumer != null) {
var event = new NewBestSolutionEventImpl<>(bestSolutionContainingProblemChanges.getBestSolution(),
bestSolutionContainingProblemChanges.getProducerId());
bestSolutionConsumer.accept(event);
}
bestSolutionContainingProblemChanges.completeProblemChanges();
} catch (Throwable throwable) {
if (exceptionHandler != null) {
exceptionHandler.accept(problemId, throwable);
}
bestSolutionContainingProblemChanges.completeProblemChangesExceptionally(throwable);
} finally {
activeConsumption.release();
}
}
}, consumerExecutor);
}

/**
* Called on the Consumer thread.
* Don't call without locking firstSolutionConsumption,
* because the consumption may not be executed before the final best solution is executed.
*/
private void scheduleFirstInitializedSolutionConsumption(
Consumer<? super Solution_> solutionConsumer) {
scheduleConsumption(firstSolutionConsumption, solutionConsumer, firstInitializedSolution);
void consumeFirstInitializedSolution(Solution_ solution, EventProducerId producerId, boolean isTerminatedEarly) {
try { // During the solving process, this lock is called once, and it won't block the Solver thread
firstSolutionConsumption.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted when waiting for the first initialized solution consumption.");
}
this.firstInitializedSolution.getAndSet(solution); // Reachable more than once; problem change triggers restart.
scheduleFirstInitializedSolutionConsumption(s -> firstInitializedSolutionConsumer
.accept(new FirstInitializedSolutionEventImpl<>(s, producerId, isTerminatedEarly)))
.whenCompleteAsync((unused, throwable) -> firstSolutionConsumption.release(), consumerExecutor);
}

private CompletableFuture<Void> scheduleFirstInitializedSolutionConsumption(Consumer<? super Solution_> solutionConsumer) {
return scheduleConsumption(solutionConsumer, firstInitializedSolution.get());
}

/**
* Called on the Consumer thread.
* Don't call without locking startSolverJobConsumption,
* because the consumption may not be executed before the final best solution is executed.
* Assumes that it runs locked.
*
* @return future which completes when the consumption is done; can be unlocked then
*/
private void scheduleStartJobConsumption() {
scheduleConsumption(startSolverJobConsumption,
private CompletableFuture<Void> scheduleConsumption(@Nullable Consumer<? super Solution_> consumer,
@Nullable Solution_ solution) {
return CompletableFuture.runAsync(() -> {
try {
if (consumer != null && solution != null) {
consumer.accept(solution);
}
} catch (Throwable throwable) {
exceptionHandler.accept(problemId, throwable);
}
}, consumerExecutor);
}

void consumeStartSolverJob(Solution_ solution) {
try { // During the solving process, this lock is called once, and it won't block the Solver thread
startSolverJobConsumption.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted when waiting for the start solver job consumption.");
}
this.initialSolution.getAndSet(solution); // Reachable more than once; problem change triggers restart.
scheduleStartJobConsumption().whenCompleteAsync((unused, throwable) -> startSolverJobConsumption.release(),
consumerExecutor);
}

private CompletableFuture<Void> scheduleStartJobConsumption() {
return scheduleConsumption(
solverJobStartedConsumer == null ? null
: solution -> solverJobStartedConsumer.accept(new SolverJobStartedEventImpl<>(solution)),
initialSolution);
initialSolution.get());
}

private void scheduleConsumption(Semaphore semaphore, Consumer<? super Solution_> consumer,
Solution_ solution) {
CompletableFuture.runAsync(() -> {
void consumeFinalBestSolution(Solution_ solution) { // Called on the Solver thread, after solving is finished.
try {
acquireAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted when waiting for the final best solution consumption.");
}
// Make sure the final best solution is consumed by the intermediate best solution consumer first.
// Situation:
// The consumer is consuming the last but one best solution. The final best solution is waiting for the consumer.
if (bestSolutionConsumer != null) {
scheduleIntermediateBestSolutionConsumption();
}
scheduleFinalBestSolutionConsumption(solution)
.whenComplete((unused, throwable) -> releaseAll());
}

private CompletableFuture<Void> scheduleFinalBestSolutionConsumption(Solution_ solution) {
return CompletableFuture.runAsync(() -> {
try {
if (consumer != null && solution != null) {
consumer.accept(solution);
}
finalBestSolutionConsumer.accept(new FinalBestSolutionEventImpl<>(solution));
} catch (Throwable throwable) {
if (exceptionHandler != null) {
exceptionHandler.accept(problemId, throwable);
}
} finally {
semaphore.release();
}
}, consumerExecutor);
}, consumerExecutor)
.whenComplete((unused, throwable) -> {
// If there is no intermediate best solution consumer, complete the problem changes now.
if (bestSolutionConsumer == null) {
var solutionHolder = bestSolutionHolder.take();
if (solutionHolder != null) {
solutionHolder.completeProblemChanges();
}
}
shutdownConsumerExecutor();
});
}

private void acquireAll() throws InterruptedException {
Expand All @@ -223,24 +223,28 @@ private void releaseAll() {
firstSolutionConsumption.release();
}

private void shutdownConsumerExecutor() {
// Cancel problem changes that arrived after the solver terminated.
bestSolutionHolder.cancelPendingChanges();
consumerExecutor.shutdownNow();
}

@Override
public void close() {
if (consumerExecutor.isShutdown()) {
return; // Already closed, do nothing.
}
try {
acquireAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted when waiting for closing the consumer.");
} finally {
disposeConsumerThread();
bestSolutionHolder.cancelPendingChanges();
shutdownConsumerExecutor();
releaseAll();
}
}

private void disposeConsumerThread() {
consumerExecutor.shutdownNow();
}

record NewBestSolutionEventImpl<Solution_>(Solution_ solution,
EventProducerId producerId) implements NewBestSolutionEvent<Solution_> {
}
Expand Down
Loading