Skip to content

Commit

Permalink
Fix CI and add pre-commit
Browse files Browse the repository at this point in the history
  • Loading branch information
chraibi committed Oct 21, 2024
1 parent 48d6053 commit 6f614d6
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 91 deletions.
21 changes: 21 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.8
hooks:
# Run the linter.
- id: ruff
args: ['check', '--select', 'I', '--fix', '.']
# Run the formatter.
- id: ruff-format

# Mypy type checker
- repo: local
hooks:
- id: mypy
name: Analyze with Mypy
entry: mypy --config-file mypy.ini
language: python
types: ['python'] # Targets Python files
files: \.py$ # Only .py files
exclude: (tests/|scripts/|docs|helper)

51 changes: 33 additions & 18 deletions simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import logging
import pathlib
import time
from typing import Any, Dict, Iterator, List, Tuple, TypeAlias
from typing import Any, Dict, Iterator, List, Tuple, TypeAlias, cast

import jupedsim as jps
import typer
Expand All @@ -36,6 +36,7 @@
from src.logger_config import init_logger
from src.utilities import (
build_geometry,
calculate_centroid,
calculate_distance,
distribute_and_add_agents,
init_journey,
Expand Down Expand Up @@ -175,8 +176,11 @@ def init_simulation(


def adjust_parameter_linearly(
motivation_i, min_value=0.01, default_value=0.5, max_value=1.0
):
motivation_i: float,
min_value: float = 0.01,
default_value: float = 0.5,
max_value: float = 1.0,
) -> float:
"""
Adjust the a parameter based on agent's motivation level (0 < motivation_i < 1).
Expand All @@ -192,7 +196,7 @@ def adjust_parameter_linearly(

def process_agent(
agent: jps.Agent,
door: List[float],
door: Point,
simulation: jps.Simulation,
motivation_model: mm.MotivationModel,
a_ped_min: float,
Expand All @@ -203,7 +207,7 @@ def process_agent(
default_range: float,
file_handle: _io.TextIOWrapper,
frame_to_write: int,
):
) -> str:
"""Process an individual agent by calculating motivation and updating model parameters."""
position = agent.position
distance = calculate_distance(position, door)
Expand Down Expand Up @@ -243,7 +247,7 @@ def process_agent(

def run_simulation_loop(
simulation: jps.Simulation,
door: List[float],
door: Point,
motivation_model: mm.MotivationModel,
simulation_time: float,
a_ped_min: float,
Expand All @@ -259,7 +263,7 @@ def run_simulation_loop(
Args:
simulation (jps.Simulation): The simulation instance.
door (List[float]): The coordinates of the door.
door (Point): The coordinates of the door.
motivation_model (mm.MotivationModel): The motivation model used for agents.
simulation_time (float): The total simulation time.
a_ped_min (float): Minimum value for adjusting agent strength based on motivation.
Expand Down Expand Up @@ -310,11 +314,13 @@ def run_simulation_loop(

def create_agent_parameters(
_data: Dict[str, Any], simulation: jps.Simulation
) -> List[jps.CollisionFreeSpeedModelAgentParameters]:
) -> Tuple[List[jps.CollisionFreeSpeedModelV2AgentParameters], List[List[Point]]]:
"""Create the model parameters."""
way_points = parse_way_points(_data)
destinations_dict = parse_destinations(_data)
destinations = list(destinations_dict.values())
destinations: List[List[Point]] = cast(
List[List[Point]], list(destinations_dict.values())
)
journey_id, exit_ids = init_journey(simulation, way_points, destinations)

normal_v_0 = parse_normal_v_0(_data)
Expand All @@ -338,7 +344,7 @@ def create_agent_parameters(
)
agent_parameters_list.append(agent_parameters)

return agent_parameters_list, destinations
return (agent_parameters_list, destinations)


def init_positions(_data: Dict[str, Any], _number_agents: int) -> List[Point]:
Expand Down Expand Up @@ -381,7 +387,7 @@ def init_positions(_data: Dict[str, Any], _number_agents: int) -> List[Point]:
return positions


def read_positions_from_csv(file_path="points.csv"):
def read_positions_from_csv(file_path: str = "points.csv") -> List[Point]:
"""Read positions generated by notebook from a CSV file if it exists."""
path = pathlib.Path(file_path)

Expand All @@ -390,9 +396,16 @@ def read_positions_from_csv(file_path="points.csv"):

with path.open("r") as f:
reader = csv.reader(f)
tuple_list = [tuple(map(float, row)) for row in reader]
points: List[Point] = []
for row in reader:
if len(row) == 2:
try:
x, y = float(row[0]), float(row[1])
points.append((x, y))
except ValueError:
raise FileNotFoundError(f"The file {file_path} does not exist yet.")

return tuple_list
return points


def init_and_run_simulation(
Expand Down Expand Up @@ -436,7 +449,7 @@ def init_and_run_simulation(
motivation_model = init_motivation_model(_data, ped_ids)
x_door = 0.5 * (motivation_model.door_point1[0] + motivation_model.door_point2[0])
y_door = 0.5 * (motivation_model.door_point1[1] + motivation_model.door_point2[1])
motivation_door = [x_door, y_door]
motivation_door: Point = (x_door, y_door)
logging.info(f"Running simulation for {len(ped_ids)} agents:")
logging.info(f"{motivation_model.motivation_strategy.width = }")
start_time = time.time()
Expand Down Expand Up @@ -464,7 +477,7 @@ def init_and_run_simulation(
return float(simulation.iteration_count() * _time_step)


def start_simulation(config_path, output_path):
def start_simulation(config_path: str, output_path: str) -> float:
"""Call main function."""
with open(config_path, "r", encoding="utf8") as f:
data = json.load(f)
Expand All @@ -486,7 +499,9 @@ def start_simulation(config_path, output_path):
return evac_time


def modify_and_save_config(base_config, modification_dict, new_config_path):
def modify_and_save_config(
base_config: float, modification_dict: Dict[str, Any], new_config_path: str
) -> None:
"""Modify base configuration and save as a new JSON file."""
config = json.loads(json.dumps(base_config)) # Deep copy
for key, value in modification_dict.items():
Expand All @@ -505,7 +520,7 @@ def main(
pathlib.Path("files/inifile.json"),
help="Path to the initial configuration file",
),
):
) -> None:
"""Implement Main function. Create variations and start simulations."""
init_logger()
logging.info(f"Base config = {inifile}")
Expand All @@ -526,7 +541,7 @@ def main(

for i, variation in enumerate(variations, start=1):
logging.info(f"running variation {i:03d}: {variation}")
new_config_path = f"config_variation_{i:03d}.json"
new_config_path = f"{output_dir}/config_variation_{i:03d}.json"
output_path = f"files/trajectory_variation_{i:03d}.sqlite"
logging.info(f"{output_path = }")
# Modify and save the new configuration
Expand Down
Loading

0 comments on commit 6f614d6

Please sign in to comment.