Skip to content

Commit

Permalink
Updated computeService to force the simulation to stop when all tasks…
Browse files Browse the repository at this point in the history
… are finished (#259)
  • Loading branch information
DanteNiewenhuis authored Nov 1, 2024
1 parent 7511fb7 commit 6fa203b
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opendc.compute.simulator.host.HostState;
import org.opendc.compute.simulator.host.SimHost;
import org.opendc.compute.simulator.scheduler.ComputeScheduler;
import org.opendc.compute.simulator.telemetry.ComputeMetricReader;
import org.opendc.compute.simulator.telemetry.SchedulerStats;
import org.opendc.simulator.compute.power.SimPowerSource;
import org.opendc.simulator.compute.workload.Workload;
Expand Down Expand Up @@ -141,6 +142,8 @@ public final class ComputeService implements AutoCloseable {

private final List<ServiceTask> tasksToRemove = new ArrayList<>();

private ComputeMetricReader metricReader;

/**
* A [HostListener] used to track the active tasks.
*/
Expand Down Expand Up @@ -201,7 +204,7 @@ public void onStateChanged(@NotNull SimHost host, @NotNull ServiceTask task, @No
}

if (task.getState() == TaskState.COMPLETED || task.getState() == TaskState.TERMINATED) {
tasksToRemove.add(task);
setTaskToBeRemoved(task);
}

// Try to reschedule if needed
Expand All @@ -214,11 +217,12 @@ public void onStateChanged(@NotNull SimHost host, @NotNull ServiceTask task, @No
private long maxMemory = 0L;
private long attemptsSuccess = 0L;
private long attemptsFailure = 0L;
private int tasksTotal = 0;
private int tasksPending = 0;
private int tasksActive = 0;
private int tasksTerminated = 0;
private int tasksCompleted = 0;
private int tasksExpected = 0; // Number of tasks expected from the input trace
private int tasksTotal = 0; // Number of tasks seen by the service
private int tasksPending = 0; // Number of tasks waiting to be scheduled
private int tasksActive = 0; // Number of tasks that are currently running
private int tasksTerminated = 0; // Number of tasks that were terminated due to too much failures
private int tasksCompleted = 0; // Number of tasks completed successfully

/**
* Construct a {@link ComputeService} instance.
Expand Down Expand Up @@ -332,6 +336,21 @@ public Set<SimPowerSource> getPowerSources() {
return Collections.unmodifiableSet(this.powerSources);
}

public void setMetricReader(ComputeMetricReader metricReader) {
this.metricReader = metricReader;
}

public void setTasksExpected(int numberOfTasks) {
this.tasksExpected = numberOfTasks;
}

public void setTaskToBeRemoved(ServiceTask task) {
this.tasksToRemove.add(task);
if ((tasksTerminated + tasksCompleted) == tasksExpected) {
metricReader.loggState(); // Logg the state for the final time. This will also delete all remaining tasks.
}
}

/**
* Collect the statistics about the scheduler component of this service.
*/
Expand Down Expand Up @@ -426,7 +445,8 @@ private void doSchedule() {
tasksPending--;
tasksTerminated++;
task.setState(TaskState.TERMINATED);
tasksToRemove.add(task);

this.setTaskToBeRemoved(task);
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ public class ComputeMonitorProvisioningStep(
startTime,
carbonTrace,
)
return AutoCloseable { metricReader.close() }
return metricReader
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package org.opendc.compute.simulator.provisioner

import org.opendc.common.Dispatcher
import org.opendc.compute.simulator.ServiceRegistry
import org.opendc.compute.simulator.telemetry.ComputeMetricReader
import java.util.ArrayDeque
import java.util.SplittableRandom

Expand Down Expand Up @@ -61,6 +62,16 @@ public class Provisioner(dispatcher: Dispatcher, seed: Long) : AutoCloseable {
public val registry: ServiceRegistry
get() = context.registry

public fun getMonitor(): ComputeMetricReader? {
for (element in stack) {
if (element is ComputeMetricReader) {
return element
}
}

return null
}

/**
* Run a single [ProvisioningStep] for this environment.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class ComputeMetricReader(
}
}

private fun loggState() {
public fun loggState() {
loggCounter++
try {
val now = this.clock.instant()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ public fun runScenario(
val startTime = Duration.ofMillis(tasks.minOf { it.submissionTime }.toEpochMilli())
addExportModel(provisioner, serviceDomain, scenario, seed, startTime, carbonTrace, scenario.id)

val monitor = provisioner.getMonitor()

val service = provisioner.registry.resolve(serviceDomain, ComputeService::class.java)!!
service.setMetricReader(monitor)
service.setTasksExpected(tasks.size)
service.replay(
timeSource,
tasks,
Expand Down

0 comments on commit 6fa203b

Please sign in to comment.