Skip to content

Commit

Permalink
python: Implemented run_speed_tests() and related update_speed_test_r…
Browse files Browse the repository at this point in the history
…esults().
  • Loading branch information
levy committed Nov 14, 2024
1 parent 19d9c2c commit aabb3b2
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 24 deletions.
24 changes: 0 additions & 24 deletions python/inet/test/speed.py

This file was deleted.

11 changes: 11 additions & 0 deletions python/inet/test/speed/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
"""
This package supports automated speed testing.
The main function is :py:func:`run_speed_tests <inet.test.speed.task.run_speed_tests>`. It allows running multiple speed tests matching
the provided filter criteria.
"""

from inet.test.speed.store import *
from inet.test.speed.task import *

__all__ = [k for k,v in locals().items() if k[0] != "_" and v.__class__.__name__ != "module"]
120 changes: 120 additions & 0 deletions python/inet/test/speed/store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import json
import logging
import os
import subprocess
import time

from inet.common import *
from inet.simulation.project import *

__sphinx_mock__ = True # ignore this module in documentation

_logger = logging.getLogger(__name__)

_speed_measurement_stores = dict()

class SpeedMeasurementStore:
def __init__(self, simulation_project, file_name):
self.simulation_project = simulation_project
self.file_name = file_name
self.entries = None

def read(self):
_logger.info(f"Reading speed measurements from {self.file_name}")
file = open(self.file_name, "r")
self.entries = json.load(file)
file.close()

def write(self):
self.get_entries().sort(key=lambda element: (element["working_directory"], element["ini_file"], element["config"], element["run_number"], element["sim_time_limit"]))
_logger.info(f"Writing speed measurements to {self.file_name}")
file = open(self.file_name, "w")
json.dump(self.entries, file, indent=True)
file.close()

def ensure(self):
if os.path.exists(self.file_name):
self.read()
else:
self.entries = []
self.write()

def clear(self):
self.entries = []

def reset(self):
self.entries = None

def get_entries(self):
if self.entries is None:
self.ensure()
return self.entries

def set_entries(self, entries):
self.entries = entries

def find_entry(self, **kwargs):
result = self.filter_entries(**kwargs)
return result[0] if len(result) == 1 else None

def get_entry(self, **kwargs):
result = self.find_entry(**kwargs)
if result is not None:
return result
else:
print(kwargs)
raise Exception("Entry not found")

def remove_entry(self, entry):
self.get_entries().remove(entry)

def filter_entries(self, test_result=None, working_directory=os.getcwd(), ini_file="omnetpp.ini", config="General", run_number=0, sim_time_limit=None, itervars=None):
def f(entry):
return (working_directory is None or entry["working_directory"] == working_directory) and \
(ini_file is None or entry["ini_file"] == ini_file) and \
(config is None or entry["config"] == config) and \
(run_number is None or entry["run_number"] == run_number) and \
(sim_time_limit is None or entry["sim_time_limit"] == sim_time_limit) and \
(itervars is None or entry["itervars"] == itervars) and \
(test_result is None or entry["test_result"] == test_result)
return list(filter(f, self.get_entries()))

def find_elapsed_wall_time(self, **kwargs):
entry = self.find_entry(**kwargs)
if entry is not None:
return entry["elapsed_wall_time"]
else:
return None

def get_elapsed_wall_time(self, **kwargs):
return self.get_entry(**kwargs)["elapsed_wall_time"]

def set_elapsed_wall_time(self, elapsed_wall_time, **kwargs):
self.get_entry(**kwargs)["elapsed_wall_time"] = elapsed_wall_time

def insert_elapsed_wall_time(self, elapsed_wall_time, test_result=None, working_directory=os.getcwd(), ini_file="omnetpp.ini", config="General", run_number=0, sim_time_limit=None, itervars="$repetition==0"):
# assert test_result == "ERROR" or sim_time_limit is not None
self.get_entries().append({"working_directory": working_directory,
"ini_file": ini_file,
"config": config,
"run_number": run_number,
"sim_time_limit": sim_time_limit,
"test_result": test_result,
"elapsed_wall_time": elapsed_wall_time,
"timestamp": time.time(),
"itervars": itervars})

def update_elapsed_wall_time(self, elapsed_wall_time, **kwargs):
entry = self.find_entry(**kwargs)
if entry:
entry["elapsed_wall_time"] = elapsed_wall_time
else:
self.insert_elapsed_wall_time(elapsed_wall_time, **kwargs)

def remove_elapsed_wall_times(self, **kwargs):
list(map(lambda element: self.entries.remove(element), self.filter_entries(**kwargs)))

def get_speed_measurement_store(simulation_project):
if not simulation_project in _speed_measurement_stores:
_speed_measurement_stores[simulation_project] = SpeedMeasurementStore(simulation_project, simulation_project.get_full_path(simulation_project.speed_store))
return _speed_measurement_stores[simulation_project]
134 changes: 134 additions & 0 deletions python/inet/test/speed/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import logging

from inet.simulation import *
from inet.test.simulation import *
from inet.test.task import *
from inet.test.speed.store import *

__sphinx_mock__ = True # ignore this module in documentation

_logger = logging.getLogger(__name__)

_speed_test_extra_args = ["--cmdenv-express-mode=true", "--cmdenv-performance-display=false", "--cmdenv-status-frequency=1000000s",
"--record-eventlog=false", "--vector-recording=false", "--scalar-recording=false", "--bin-recording=false", "--param-recording=false"]

class SpeedTestTask(SimulationTestTask):
def __init__(self, expected_wall_time=None, max_relative_error = 0.2, **kwargs):
super().__init__(**kwargs)
self.expected_wall_time = expected_wall_time
self.max_relative_error = max_relative_error

def check_simulation_task_result(self, simulation_task_result, **kwargs):
if (simulation_task_result.elapsed_wall_time - self.expected_wall_time) / self.expected_wall_time > self.max_relative_error:
return self.task_result_class(task=self, simulation_task_result=simulation_task_result, result="FAIL", expected_result="PASS", reason="Elapsed wall time is too large")
elif (self.expected_wall_time - simulation_task_result.elapsed_wall_time) / self.expected_wall_time > self.max_relative_error:
return self.task_result_class(task=self, simulation_task_result=simulation_task_result, result="FAIL", expected_result="PASS", reason="Elapsed wall time is too small")
else:
return super().check_simulation_task_result(simulation_task_result, **kwargs)

def run_protected(self, **kwargs):
return super().run_protected(nice=-10, extra_args=_speed_test_extra_args, **kwargs)

def get_speed_test_tasks(mode="release", run_number=0, working_directory_filter="showcases", **kwargs):
multiple_simulation_tasks = get_simulation_tasks(name="speed test", mode=mode, run_number=run_number, working_directory_filter=working_directory_filter, **kwargs)
simulation_project = multiple_simulation_tasks.simulation_project
speed_measurement_store = get_speed_measurement_store(simulation_project)
tasks = []
for simulation_task in multiple_simulation_tasks.tasks:
simulation_config = simulation_task.simulation_config
expected_wall_time = speed_measurement_store.get_elapsed_wall_time(working_directory=simulation_config.working_directory, ini_file=simulation_config.ini_file, config=simulation_config.config, run_number=simulation_task.run_number)
tasks.append(SpeedTestTask(simulation_task=simulation_task, expected_wall_time=expected_wall_time, **dict(kwargs, simulation_project=simulation_project)))
return MultipleSimulationTestTasks(tasks=tasks, **dict(kwargs, simulation_project=simulation_project, concurrent=False))

def run_speed_tests(**kwargs):
multiple_test_tasks = get_speed_test_tasks(**kwargs)
return multiple_test_tasks.run(**kwargs)

class SpeedUpdateTask(SimulationUpdateTask):
def __init__(self, action="Updating speed", **kwargs):
super().__init__(action=action, **kwargs)
self.locals = locals()
self.locals.pop("self")
self.kwargs = kwargs

def run_protected(self, **kwargs):
simulation_config = self.simulation_task.simulation_config
simulation_project = simulation_config.simulation_project
start_time = time.time()
simulation_task_result = self.simulation_task.run_protected(nice=-10, extra_args=_speed_test_extra_args, **kwargs)
end_time = time.time()
simulation_task_result.elapsed_wall_time = end_time - start_time
speed_measurement_store = get_speed_measurement_store(simulation_project)
expected_wall_time = speed_measurement_store.find_elapsed_wall_time(working_directory=simulation_config.working_directory, ini_file=simulation_config.ini_file, config=simulation_config.config, run_number=self.simulation_task.run_number)
return SpeedUpdateTaskResult(task=self, simulation_task_result=simulation_task_result, expected_wall_time=expected_wall_time, elapsed_wall_time=simulation_task_result.elapsed_wall_time)

class MultipleSpeedUpdateTasks(MultipleSimulationUpdateTasks):
def __init__(self, multiple_simulation_tasks=None, name="update speed", **kwargs):
super().__init__(name=name, concurrent=False, **kwargs)
self.locals = locals()
self.locals.pop("self")
self.kwargs = kwargs
self.multiple_simulation_tasks = multiple_simulation_tasks

def run(self, simulation_project=None, concurrent=None, build=True, **kwargs):
if concurrent is None:
concurrent = self.multiple_simulation_tasks.concurrent
simulation_project = simulation_project or self.multiple_simulation_tasks.simulation_project
if build:
build_project(simulation_project=simulation_project, **kwargs)
multiple_speed_update_results = super().run(**kwargs)
speed_measurement_store = get_speed_measurement_store(simulation_project)
for speed_update_result in multiple_speed_update_results.results:
speed_update_task = speed_update_result.task
simulation_task = speed_update_task.simulation_task
simulation_config = simulation_task.simulation_config
elapsed_wall_time = speed_update_result.elapsed_wall_time
if elapsed_wall_time is not None:
speed_measurement_store.update_elapsed_wall_time(elapsed_wall_time, test_result="PASS", sim_time_limit=simulation_task.sim_time_limit,
working_directory=simulation_config.working_directory, ini_file=simulation_config.ini_file, config=simulation_config.config, run_number=simulation_task.run_number)
speed_measurement_store.write()
return speed_measurement_store

class SpeedUpdateTaskResult(SimulationUpdateTaskResult):
def __init__(self, expected_wall_time=None, elapsed_wall_time=None, reason=None, max_relative_error=0.2, **kwargs):
super().__init__(**kwargs)
self.locals = locals()
self.locals.pop("self")
self.kwargs = kwargs
if expected_wall_time is None:
self.result = "INSERT"
self.reason = None
self.color = COLOR_YELLOW
elif abs(expected_wall_time - elapsed_wall_time) / expected_wall_time > max_relative_error:
self.result = "UPDATE"
self.reason = None
self.color = COLOR_YELLOW
else:
self.result = "KEEP"
self.reason = None
self.color = COLOR_GREEN
self.expected = self.result == self.expected_result

def get_description(self, complete_error_message=True, include_parameters=False, **kwargs):
return (self.task.simulation_task.get_parameters_string() + " " if include_parameters else "") + \
self.color + self.result + COLOR_RESET + \
((" " + self.simulation_task_result.get_error_message(complete_error_message=complete_error_message)) if self.simulation_task_result and self.simulation_task_result.result == "ERROR" else "") + \
(" (" + self.reason + ")" if self.reason else "")

def __repr__(self):
return "Speed update result: " + self.get_description(include_parameters=True)

def print_result(self, complete_error_message=True, output_stream=sys.stdout, **kwargs):
print(self.get_description(complete_error_message=complete_error_message), file=output_stream)

def get_update_speed_test_results_tasks(mode="release", run_number=0, working_directory_filter="showcases", **kwargs):
update_tasks = []
multiple_simulation_tasks = get_simulation_tasks(mode=mode, run_number=run_number, working_directory_filter=working_directory_filter, **kwargs)
for simulation_task in multiple_simulation_tasks.tasks:
update_task = SpeedUpdateTask(simulation_task=simulation_task, **kwargs)
update_tasks.append(update_task)
return MultipleSpeedUpdateTasks(multiple_simulation_tasks, tasks=update_tasks, **kwargs)

def update_speed_test_results(**kwargs):
multiple_speed_update_tasks = get_update_speed_test_results_tasks(**kwargs)
return multiple_speed_update_tasks.run(**kwargs)

0 comments on commit aabb3b2

Please sign in to comment.