diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..538675f --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,21 @@ +repos: + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.6.8 + hooks: + # Run the linter. + - id: ruff + args: ['check', '--select', 'I', '--fix', '.'] + # Run the formatter. + - id: ruff-format + + # Mypy type checker + - repo: local + hooks: + - id: mypy + name: Analyze with Mypy + entry: mypy --config-file mypy.ini + language: python + types: ['python'] # Targets Python files + files: \.py$ # Only .py files + exclude: (tests/|scripts/|docs|helper) + \ No newline at end of file diff --git a/simulation.py b/simulation.py index a51e317..3cb9b81 100644 --- a/simulation.py +++ b/simulation.py @@ -9,7 +9,7 @@ import logging import pathlib import time -from typing import Any, Dict, Iterator, List, Tuple, TypeAlias +from typing import Any, Dict, Iterator, List, Tuple, TypeAlias, cast import jupedsim as jps import typer @@ -36,6 +36,7 @@ from src.logger_config import init_logger from src.utilities import ( build_geometry, + calculate_centroid, calculate_distance, distribute_and_add_agents, init_journey, @@ -175,8 +176,11 @@ def init_simulation( def adjust_parameter_linearly( - motivation_i, min_value=0.01, default_value=0.5, max_value=1.0 -): + motivation_i: float, + min_value: float = 0.01, + default_value: float = 0.5, + max_value: float = 1.0, +) -> float: """ Adjust the a parameter based on agent's motivation level (0 < motivation_i < 1). @@ -192,7 +196,7 @@ def adjust_parameter_linearly( def process_agent( agent: jps.Agent, - door: List[float], + door: Point, simulation: jps.Simulation, motivation_model: mm.MotivationModel, a_ped_min: float, @@ -203,7 +207,7 @@ def process_agent( default_range: float, file_handle: _io.TextIOWrapper, frame_to_write: int, -): +) -> str: """Process an individual agent by calculating motivation and updating model parameters.""" position = agent.position distance = calculate_distance(position, door) @@ -243,7 +247,7 @@ def process_agent( def run_simulation_loop( simulation: jps.Simulation, - door: List[float], + door: Point, motivation_model: mm.MotivationModel, simulation_time: float, a_ped_min: float, @@ -259,7 +263,7 @@ def run_simulation_loop( Args: simulation (jps.Simulation): The simulation instance. - door (List[float]): The coordinates of the door. + door (Point): The coordinates of the door. motivation_model (mm.MotivationModel): The motivation model used for agents. simulation_time (float): The total simulation time. a_ped_min (float): Minimum value for adjusting agent strength based on motivation. @@ -310,11 +314,13 @@ def run_simulation_loop( def create_agent_parameters( _data: Dict[str, Any], simulation: jps.Simulation -) -> List[jps.CollisionFreeSpeedModelAgentParameters]: +) -> Tuple[List[jps.CollisionFreeSpeedModelV2AgentParameters], List[List[Point]]]: """Create the model parameters.""" way_points = parse_way_points(_data) destinations_dict = parse_destinations(_data) - destinations = list(destinations_dict.values()) + destinations: List[List[Point]] = cast( + List[List[Point]], list(destinations_dict.values()) + ) journey_id, exit_ids = init_journey(simulation, way_points, destinations) normal_v_0 = parse_normal_v_0(_data) @@ -338,7 +344,7 @@ def create_agent_parameters( ) agent_parameters_list.append(agent_parameters) - return agent_parameters_list, destinations + return (agent_parameters_list, destinations) def init_positions(_data: Dict[str, Any], _number_agents: int) -> List[Point]: @@ -381,7 +387,7 @@ def init_positions(_data: Dict[str, Any], _number_agents: int) -> List[Point]: return positions -def read_positions_from_csv(file_path="points.csv"): +def read_positions_from_csv(file_path: str = "points.csv") -> List[Point]: """Read positions generated by notebook from a CSV file if it exists.""" path = pathlib.Path(file_path) @@ -390,9 +396,16 @@ def read_positions_from_csv(file_path="points.csv"): with path.open("r") as f: reader = csv.reader(f) - tuple_list = [tuple(map(float, row)) for row in reader] + points: List[Point] = [] + for row in reader: + if len(row) == 2: + try: + x, y = float(row[0]), float(row[1]) + points.append((x, y)) + except ValueError: + raise FileNotFoundError(f"The file {file_path} does not exist yet.") - return tuple_list + return points def init_and_run_simulation( @@ -436,7 +449,7 @@ def init_and_run_simulation( motivation_model = init_motivation_model(_data, ped_ids) x_door = 0.5 * (motivation_model.door_point1[0] + motivation_model.door_point2[0]) y_door = 0.5 * (motivation_model.door_point1[1] + motivation_model.door_point2[1]) - motivation_door = [x_door, y_door] + motivation_door: Point = (x_door, y_door) logging.info(f"Running simulation for {len(ped_ids)} agents:") logging.info(f"{motivation_model.motivation_strategy.width = }") start_time = time.time() @@ -464,7 +477,7 @@ def init_and_run_simulation( return float(simulation.iteration_count() * _time_step) -def start_simulation(config_path, output_path): +def start_simulation(config_path: str, output_path: str) -> float: """Call main function.""" with open(config_path, "r", encoding="utf8") as f: data = json.load(f) @@ -486,7 +499,9 @@ def start_simulation(config_path, output_path): return evac_time -def modify_and_save_config(base_config, modification_dict, new_config_path): +def modify_and_save_config( + base_config: float, modification_dict: Dict[str, Any], new_config_path: str +) -> None: """Modify base configuration and save as a new JSON file.""" config = json.loads(json.dumps(base_config)) # Deep copy for key, value in modification_dict.items(): @@ -505,7 +520,7 @@ def main( pathlib.Path("files/inifile.json"), help="Path to the initial configuration file", ), -): +) -> None: """Implement Main function. Create variations and start simulations.""" init_logger() logging.info(f"Base config = {inifile}") @@ -526,7 +541,7 @@ def main( for i, variation in enumerate(variations, start=1): logging.info(f"running variation {i:03d}: {variation}") - new_config_path = f"config_variation_{i:03d}.json" + new_config_path = f"{output_dir}/config_variation_{i:03d}.json" output_path = f"files/trajectory_variation_{i:03d}.sqlite" logging.info(f"{output_path = }") # Modify and save the new configuration diff --git a/src/analysis.py b/src/analysis.py index fd89537..07b5b37 100644 --- a/src/analysis.py +++ b/src/analysis.py @@ -3,7 +3,7 @@ import glob import json from pathlib import Path -from typing import Any, Dict +from typing import Any, Optional, Tuple import matplotlib.cm as cm import matplotlib.pyplot as plt @@ -15,6 +15,7 @@ from jupedsim.internal.notebook_utils import read_sqlite_file from matplotlib.collections import LineCollection from matplotlib.colors import Normalize +from matplotlib.figure import Figure as matplotlib_fig from pedpy.column_identifier import FRAME_COL, ID_COL from .inifile_parser import parse_fps @@ -27,9 +28,12 @@ from .ui import ui_measurement_parameters -def get_first_frame_pedestrian_passes_line(filename: str, passing_line_y: float = 20.0): +def get_first_frame_pedestrian_passes_line( + filename: Path, passing_line_y: float = 20.0 +) -> Tuple[pd.DataFrame, Optional[int]]: """ Return the first frame when a pedestrian passes the specified horizontal line. + Also return the DataFrame of whole trajectories. """ df = pd.read_csv( @@ -83,7 +87,7 @@ def generate_heatmap( position_x, position_y, bins=50, weights=value ) heatmap = heatmap / np.max(heatmap) - extent = [geo_min_x, geo_max_x, geo_min_y, geo_max_y] + extent = (geo_min_x, geo_max_x, geo_min_y, geo_max_y) plt.imshow( heatmap.T, origin="lower", @@ -124,13 +128,13 @@ def run() -> None: handle_analysis(selected, SELECTED_OUTPUT_FILE, traj, walkable_area, json_data) -def load_json_data(filepath: str) -> dict: +def load_json_data(filepath: str) -> Any: """Load JSON data from a given file.""" with open(filepath, "r", encoding="utf8") as f: return json.loads(f.read()) -def handle_heatmap(walkable_area) -> None: +def handle_heatmap(walkable_area: pedpy.WalkableArea) -> None: """Handle heatmap selection and generation.""" heatmap_files = glob.glob("files/*motivation.csv") selected_heatmap_file = st.selectbox( @@ -155,13 +159,17 @@ def handle_heatmap(walkable_area) -> None: def handle_analysis( - selected, SELECTED_OUTPUT_FILE, traj, walkable_area, json_data + selected: str, + selected_output_file: str, + traj: pedpy.TrajectoryData, + walkable_area: pedpy.WalkableArea, + json_data: Any, ) -> None: """Handle the analysis based on the selected option.""" fps = parse_fps(json_data) - if SELECTED_OUTPUT_FILE: - output_path = Path(SELECTED_OUTPUT_FILE) + if selected_output_file: + output_path = Path(selected_output_file) motivation_file = output_path.with_name(output_path.stem + "_motivation.csv") print(f"{motivation_file = }") @@ -205,7 +213,10 @@ def handle_analysis( def plot_measurement_setup( - walkable_area, traj, measurement_line, measurement_area + walkable_area: pedpy.WalkableArea, + traj: pedpy.TrajectoryData, + measurement_line: pedpy.MeasurementLine, + measurement_area: pedpy.MeasurementArea, ) -> None: """Plot the measurement setup.""" pedpy.plot_measurement_setup( @@ -227,7 +238,9 @@ def plot_measurement_setup( st.sidebar.pyplot(fig) -def handle_nt(traj, measurement_line) -> None: +def handle_nt( + traj: pedpy.TrajectoryData, measurement_line: pedpy.MeasurementLine +) -> None: """Handle NT (number of traversals) computation.""" nt, crossing_frames = pedpy.compute_n_t( traj_data=traj, measurement_line=measurement_line @@ -235,7 +248,9 @@ def handle_nt(traj, measurement_line) -> None: plotly_nt_series(nt) -def handle_speed(traj, measurement_area, json_data) -> None: +def handle_speed( + traj: pedpy.TrajectoryData, measurement_area: pedpy.MeasurementArea, json_data: Any +) -> None: """Handle speed computation and plotting.""" ui_measurement_parameters(json_data) individual_speed = pedpy.compute_individual_speed( @@ -251,7 +266,12 @@ def handle_speed(traj, measurement_area, json_data) -> None: plot_speed_time_series(mean_speed) -def handle_flow(traj, measurement_line, fps, json_data) -> None: +def handle_flow( + traj: pedpy.TrajectoryData, + measurement_line: pedpy.MeasurementLine, + fps: int, + json_data: Any, +) -> None: """Handle flow computation and plotting.""" ui_measurement_parameters(json_data) nt, crossing_frames = pedpy.compute_n_t( @@ -269,7 +289,12 @@ def handle_flow(traj, measurement_line, fps, json_data) -> None: plot_flow_time_series(flow_speed) -def handle_density(traj, walkable_area, measurement_area, json_data) -> None: +def handle_density( + traj: pedpy.TrajectoryData, + walkable_area: pedpy.WalkableArea, + measurement_area: pedpy.MeasurementArea, + json_data: Any, +) -> None: """Handle density computation and plotting.""" ui_measurement_parameters(json_data) individual = pedpy.compute_individual_voronoi_polygons( @@ -282,7 +307,9 @@ def handle_density(traj, walkable_area, measurement_area, json_data) -> None: plot_density_time_series(density_voronoi) -def handle_voronoi(traj, walkable_area) -> None: +def handle_voronoi( + traj: pedpy.TrajectoryData, walkable_area: pedpy.WalkableArea +) -> None: """Handle Voronoi polygons plotting.""" individual = pedpy.compute_individual_voronoi_polygons( traj_data=traj, walkable_area=walkable_area @@ -312,7 +339,7 @@ def handle_voronoi(traj, walkable_area) -> None: # UI handling -def get_user_inputs(prefix=""): +def get_user_inputs(prefix: str = "") -> Tuple[float, bool, float, str]: """Get inputs from the user for the plot configuration with unique keys.""" c1, c2, c3 = st.columns(3) @@ -339,8 +366,9 @@ def get_user_inputs(prefix=""): return yaxis_max, color_by_speed == "Speed", colorbar_max, unit_text -# Logic handling -def compute_speed_or_motivation(traj, motivation_file, color_by_speed): +def compute_speed_or_motivation( + traj: pedpy.TrajectoryData, motivation_file: Path, color_by_speed: bool +) -> pd.DataFrame: """Compute either speed or motivation data based on user input.""" if color_by_speed: # Compute speed @@ -359,7 +387,12 @@ def compute_speed_or_motivation(traj, motivation_file, color_by_speed): return speed -def process_data(traj, measurement_line, motivation_file, color_by_speed): +def process_data( + traj: pedpy.TrajectoryData, + measurement_line: pedpy.MeasurementLine, + motivation_file: Path, + color_by_speed: bool, +) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """Process and merge the data required for plotting.""" # Compute time distance line df_time_distance = pedpy.compute_time_distance_line( @@ -384,18 +417,18 @@ def process_data(traj, measurement_line, motivation_file, color_by_speed): # Plotting function def plot_distance_to_entrance( - df_time_distance, - speed, - first_frame_speed, - color_by_speed, - yaxis_max, - colorbar_max, - unit_text, -): + df_time_distance: pd.DataFrame, + speed: pd.DataFrame, + first_frame_speed: pd.DataFrame, + color_by_speed: bool, + yaxis_max: float, + colorbar_max: float, + unit_text: str, +) -> matplotlib_fig: """Plot the distance to entrance with speed or motivation coloring.""" norm = Normalize(speed["speed"].min(), speed["speed"].max()) - cmap = cm.jet - + # cmap = cm.jet + cmap = cm.get_cmap("jet") # Create the figure and axis fig, ax = plt.subplots() @@ -442,7 +475,12 @@ def plot_distance_to_entrance( # Main handler function -def handle_distance_to_entrance(traj, measurement_line, motivation_file, prefix=""): +def handle_distance_to_entrance( + traj: pedpy.TrajectoryData, + measurement_line: pedpy.MeasurementLine, + motivation_file: Path, + prefix: str = "", +) -> matplotlib_fig: """Handle distance to entrance plotting.""" # Get user inputs yaxis_max, color_by_speed, colorbar_max, unit_text = get_user_inputs(prefix) diff --git a/src/inifile_parser.py b/src/inifile_parser.py index dca0fa8..2bca981 100644 --- a/src/inifile_parser.py +++ b/src/inifile_parser.py @@ -39,7 +39,7 @@ def parse_destinations(json_data: Dict[str, Any]) -> Dict[int, List[List[Point]] def parse_velocity_init_parameters( json_data: Dict[str, Any], -) -> Tuple[float, float, float, float]: +) -> Tuple[float, float, float, float, float, float, float, float]: """Parse init parameters for velocity model. return a_ped, d_ped, a_wall, d_Wall diff --git a/src/motivation_model.py b/src/motivation_model.py index aaa4e35..e26338a 100644 --- a/src/motivation_model.py +++ b/src/motivation_model.py @@ -15,38 +15,6 @@ Point: TypeAlias = Tuple[float, float] -def shifted_logistic( - x: float, M_max: float = 1.0, k: float = 1.0, shift: float = 0.0 -) -> np.ndarray: - r""" - Compute the shifted logistic function. - - This function serves as a mean to normalize the motivation values to the range [0, 1]. - - The shifted logistic function is defined as: - - .. math:: - \\text{motivation} = \\frac{M_{max}}{1 + e^{-k \\cdot (x - \\text{shift})}} - - Parameters: - ---------- - x : float - Input value for which to compute the shifted logistic function. - M_max : float, optional - The maximum value of the logistic function. Default is 1.0. - k : float, optional - The steepness of the curve. Default is 1.0. - shift : float, optional - The value to shift the input by. Default is 5.0. - - Returns: - ------- - float - The computed value of the shifted logistic function for the input `x`. - """ - return M_max / (1 + np.exp(-k * (x - shift))) - - class MotivationStrategy(ABC): """Abstract class for strategy model.""" @@ -260,8 +228,8 @@ def plot(self) -> List[Figure]: ax0.plot(distances, E) ax0.grid(alpha=0.3) - ax0.set_ylim([-0.1, 3]) - ax0.set_xlim([-0.1, 4]) + ax0.set_ylim((-0.1, 3)) + ax0.set_xlim((-0.1, 4)) ax0.set_title(f"{self.name()} - E (width, height)") ax0.set_xlabel("Distance / m") ax0.set_ylabel("Expectancy") @@ -272,8 +240,8 @@ def plot(self) -> List[Figure]: ax1.plot(self.agent_ids, V, "o") ax1.grid(alpha=0.3) - ax1.set_ylim([-0.1, 5]) - ax1.set_xlim([-0.1, self.max_reward + 1]) + ax1.set_ylim((-0.1, 5)) + ax1.set_xlim((-0.1, self.max_reward + 1)) ax1.set_title(f"{self.name()} - V (seed = {self.seed:.0f})") ax1.set_xlabel("# Agents") ax1.set_ylabel("Value") @@ -294,8 +262,8 @@ def plot(self) -> List[Figure]: ax2.plot(Nrange, C, ".-") ax2.grid(alpha=0.3) - ax2.set_xlim([0, self.max_reward + 1]) - ax2.set_ylim([-0.1, self.competition_max + 1]) + ax2.set_xlim((0, self.max_reward + 1)) + ax2.set_ylim((-0.1, self.competition_max + 1)) ax2.set_xlabel("#agents left simulation") ax2.set_ylabel("Competition") ax2.set_xticks( @@ -370,7 +338,7 @@ def plot(self) -> List[Figure]: ax3.grid(alpha=0.3) # ax3.set_ylim([-0.1, 3]) - ax3.set_xlim([-0.1, 4]) + ax3.set_xlim((-0.1, 4)) ax3.legend() if self.evc: title = ( diff --git a/src/utilities.py b/src/utilities.py index d6a03f4..5c96c8b 100644 --- a/src/utilities.py +++ b/src/utilities.py @@ -17,7 +17,9 @@ Point: TypeAlias = Tuple[float, float] -def parse(data: Union[List, Dict, Any]) -> Union[List, SimpleNamespace, Any]: +def parse( + data: Union[List[Any], Dict[str, Any], Any], +) -> Union[List[Any], SimpleNamespace, Any]: """ Recursively converts a nested structure of lists and dictionaries. @@ -82,10 +84,10 @@ def init_journey( simulation: jps.Simulation, way_points: List[Tuple[Point, float]], exits: List[List[Point]], -) -> Tuple[int, int]: +) -> Tuple[int, List[int]]: """Init goals of agents to follow. - Add waypoints and exits to journey. Then register journey in simultion + Add waypoints and exits to journey. Then register journey in simulation :param simulation: :param way_points: defined as a list of (point, distance) @@ -95,7 +97,7 @@ def init_journey( # log_info("Init journey with: ") # log_info(f"{ way_points= }") # log_info(f"{ exits= }") - exit_ids = [] + exit_ids: List[int] = [] # wp_ids = [] journey = jps.JourneyDescription() # distance = 1