diff --git a/app.py b/app.py index 37cc736..fc2946e 100644 --- a/app.py +++ b/app.py @@ -5,18 +5,16 @@ Date: August 11, 2023 """ -import glob import json from pathlib import Path -from src import motivation_model as mm -import jupedsim as jps + import numpy as np import pandas as pd -import pedpy import streamlit as st from jupedsim.internal.notebook_utils import animate, read_sqlite_file from simulation import main +from src import motivation_model as mm from src.analysis import run from src.inifile_parser import ( parse_fps, @@ -159,7 +157,7 @@ def read_data(output_file: str) -> pd.DataFrame: data, Path(OUTPUT_FILE), ) - msg.code(f"Finished simulation. Evac time {evac_time} s") + msg.code(f"Finished simulation. Evac time {evac_time:.2f} s") st.empty() output_path = Path(OUTPUT_FILE) if Path("values.txt").exists(): @@ -179,7 +177,8 @@ def read_data(output_file: str) -> pd.DataFrame: max_value = float(data["motivation_parameters"]["max_value"]) min_value = float(data["motivation_parameters"]["min_value"]) seed = data["motivation_parameters"]["seed"] - number_agents = float(parse_number_agents(data)) + number_agents = int(parse_number_agents(data)) + motivation_strategy: mm.MotivationStrategy if strategy == "default": motivation_strategy = mm.DefaultMotivationStrategy( width=width, height=height diff --git a/files/bottleneck.json b/files/bottleneck.json index 784da53..9f8919a 100644 --- a/files/bottleneck.json +++ b/files/bottleneck.json @@ -8,7 +8,7 @@ "a_ped": 1.0, "d_ped": 0.2, "a_wall": 1.0, - "d_wall": 0.2, + "d_wall": 0.19, "radius": 0.10 }, "simulation_parameters": { diff --git a/simulation.py b/simulation.py index 71de38e..da2d9d8 100644 --- a/simulation.py +++ b/simulation.py @@ -2,41 +2,33 @@ # Copyright © 2012-2022 Forschungszentrum Jülich GmbH # SPDX-License-Identifier: LGPL-3.0-or-later -import logging import contextlib import json +import logging import pathlib +import random import sys import time -from typing import Any, Dict, Iterator, List, Tuple, TypeAlias +from typing import Any, Dict, Iterator, Tuple, TypeAlias + import _io import jupedsim as jps from jupedsim.distributions import distribute_by_number from src import motivation_model as mm -from src.inifile_parser import ( - parse_accessible_areas, - parse_destinations, - parse_distribution_polygons, - parse_fps, - parse_radius, - parse_motivation_doors, - parse_motivation_parameter, - parse_motivation_strategy, - parse_normal_time_gap, - parse_normal_v_0, - parse_number_agents, - parse_simulation_time, - parse_time_step, - parse_velocity_init_parameters, - parse_way_points, -) -from src.logger_config import init_logger, log_debug, log_error, log_info -from src.utilities import ( - build_geometry, - distribute_and_add_agents, - init_journey, -) +from src.inifile_parser import (parse_accessible_areas, parse_destinations, + parse_distribution_polygons, parse_fps, + parse_motivation_doors, + parse_motivation_parameter, + parse_motivation_strategy, + parse_normal_time_gap, parse_normal_v_0, + parse_number_agents, parse_radius, + parse_simulation_time, parse_time_step, + parse_velocity_init_parameters, + parse_way_points) +from src.logger_config import init_logger, log_debug, log_error +from src.utilities import (build_geometry, distribute_and_add_agents, + init_journey) # import cProfile # import pstats @@ -109,6 +101,7 @@ def init_simulation( choose_motivation_strategy = parse_motivation_strategy(_data) number_agents = parse_number_agents(_data) # ================= + motivation_strategy: mm.MotivationStrategy if choose_motivation_strategy == "default": motivation_strategy = mm.DefaultMotivationStrategy(width=width, height=height) if choose_motivation_strategy == "EVC": @@ -198,7 +191,7 @@ def main( _simulation_time: float, _data: Dict[str, Any], _trajectory_path: pathlib.Path, -) -> None: +) -> float: """Main simulation loop. :param fps: @@ -250,7 +243,7 @@ def main( ) logging.info(f"simulation time: {simulation.iteration_count()*_time_step} [s]") # logging.info(f"Trajectory: {_trajectory_path}") - return simulation.iteration_count() * _time_step + return float(simulation.iteration_count() * _time_step) if __name__ == "__main__": diff --git a/src/analysis.py b/src/analysis.py index ab750d0..be51e81 100644 --- a/src/analysis.py +++ b/src/analysis.py @@ -1,39 +1,26 @@ import glob -import numpy as np -import numpy.typing as npt -from typing import Any -import pedpy - -import streamlit as st -from .utilities import ( - create_empty_figure, - update_figure_layout, - calculate_heatmap_values, - add_heatmap_trace, - add_polygon_traces, - customize_fig_layout, -) import json from pathlib import Path -from .inifile_parser import parse_fps, parse_accessible_areas, parse_geometry -from shapely import Polygon -from shapely.ops import unary_union +from typing import Any + import matplotlib.pyplot as plt -from pedpy.column_identifier import ( - CUMULATED_COL, - DENSITY_COL, - FRAME_COL, - ID_COL, -) +import numpy as np +import numpy.typing as npt import pedpy -from .plotting import ( - plot_density_time_series, - plotly_nt_series, - plot_flow_time_series, - plot_speed_time_series, -) -from .ui import ui_measurement_parameters +import streamlit as st from jupedsim.internal.notebook_utils import read_sqlite_file +from pedpy.column_identifier import FRAME_COL, ID_COL + +from .inifile_parser import parse_fps +from .plotting import (plot_density_time_series, plot_flow_time_series, + plot_speed_time_series, plotly_nt_series) +from .ui import ui_measurement_parameters +from .utilities import (add_heatmap_trace, add_polygon_traces, + calculate_heatmap_values, create_empty_figure, + customize_fig_layout, update_figure_layout) + +# from shapely import Polygon +# from shapely.ops import unary_union def generate_heatmap( @@ -67,7 +54,7 @@ def generate_heatmap( st.plotly_chart(fig) -def run(): +def run() -> None: selected = st.sidebar.radio( "Choose option", [ diff --git a/src/inifile_parser.py b/src/inifile_parser.py index 3b6666e..6d2783c 100644 --- a/src/inifile_parser.py +++ b/src/inifile_parser.py @@ -6,12 +6,12 @@ import json import sys -from typing import Dict, List, Optional, Tuple, TypeAlias, Any +from typing import Any, Dict, List, Optional, Tuple, TypeAlias import jsonschema import shapely +from shapely import GeometryCollection, Polygon from shapely.ops import unary_union -from shapely import Polygon, GeometryCollection Point: TypeAlias = Tuple[float, float] @@ -124,7 +124,7 @@ def parse_distribution_polygons( return _distribution_polygons -def parse_measurement_line(json_data: Dict[str, Any]) -> Dict[int, List[List[float]]]: +def parse_measurement_line(json_data: Dict[str, Any]) -> List[Point]: """Measurement line has two points.""" _points: List[Point] = [] @@ -291,21 +291,21 @@ def parse_motivation_strategy(json_data: Dict[str, Any]) -> str: "motivation_parameters" in json_data and "motivation_strategy" in json_data["motivation_parameters"] ): - return json_data["motivation_parameters"]["motivation_strategy"] + return str(json_data["motivation_parameters"]["motivation_strategy"]) return "default" -def parse_motivation_parameter(json_data: Dict[str, Any], parameter: str) -> str: +def parse_motivation_parameter(json_data: Dict[str, Any], parameter: str) -> int: """Get motivation parameter.""" if ( "motivation_parameters" in json_data and parameter in json_data["motivation_parameters"] ): - return float(json_data["motivation_parameters"][parameter]) + return int(json_data["motivation_parameters"][parameter]) - return 1.0 + return 1 def is_motivation_active(json_data: Dict[str, Any]) -> int: diff --git a/src/json2xml.py b/src/json2xml.py index 2d5dfe0..60e755e 100644 --- a/src/json2xml.py +++ b/src/json2xml.py @@ -6,7 +6,8 @@ - jpsvis_doors - accessible_areas: Make a union on all walls to get a nice polygon - """ +""" + import json import sys import xml.etree.ElementTree as ET @@ -26,7 +27,7 @@ def add_transitions(_root: ET.Element, _data: Dict[str, Any]) -> None: :returns: """ - if not "jpsvis_doors" in _data: + if "jpsvis_doors" not in _data: return destinations = _data["jpsvis_doors"] diff --git a/src/logger_config.py b/src/logger_config.py index 80d7dfa..ff38354 100644 --- a/src/logger_config.py +++ b/src/logger_config.py @@ -3,6 +3,7 @@ """ import logging + import jupedsim as jps # # Create a handler that writes INFO and DEBUG messages to stdout diff --git a/src/motivation_model.py b/src/motivation_model.py index 0ad892e..728b676 100644 --- a/src/motivation_model.py +++ b/src/motivation_model.py @@ -1,18 +1,31 @@ """Module for motivational model.""" import random +from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, Optional, Tuple, TypeAlias +from typing import Any, List, Optional, Tuple, TypeAlias + import matplotlib.pyplot as plt import numpy as np +from matplotlib.figure import Figure from .logger_config import log_debug Point: TypeAlias = Tuple[float, float] +class MotivationStrategy(ABC): + @abstractmethod + def motivation(self, params: dict[str, Any]) -> float: + pass + + # abstractmethod + def plot(self) -> List[Figure]: + pass + + @dataclass -class DefaultMotivationStrategy: +class DefaultMotivationStrategy(MotivationStrategy): """Default strategy for motivation calculation based on distance.""" width: float = 1.0 @@ -35,7 +48,7 @@ def motivation(self, params: dict[str, Any]) -> float: return float(np.exp(expr) * np.e * self.height) - def plot(self): + def plot(self) -> List[Figure]: fig = plt.figure() distances = np.linspace(0, 10, 100) m = [] @@ -53,15 +66,15 @@ def plot(self): @dataclass -class EVCStrategy: +class EVCStrategy(MotivationStrategy): """Motivation theory based on E.V.C (model4).""" width: float = 1.0 height: float = 1.0 max_reward: int = 0 seed: int = 0 - min_value: int = 0 - max_value: int = 1 + min_value: float = 0 + max_value: float = 1 @staticmethod def name() -> str: @@ -87,14 +100,14 @@ def competition(got_reward: int, max_reward: int) -> float: got_reward: How many got reward max_reward: hom many max can get reward """ - comp = 0 + comp = 0.0 if got_reward <= max_reward: comp = 1 - got_reward / max_reward return comp @staticmethod - def value(min_v: float, max_v: float, seed: Optional[float] = None): + def value(min_v: float, max_v: float, seed: Optional[float] = None) -> float: """Random value in interval. seed is optional.""" if seed is not None: random.seed(seed) @@ -108,7 +121,7 @@ def motivation(self, params: dict[str, Any]) -> float: got_reward = self.max_reward - number_agents_in_simulation if "seed" not in params: params["seed"] = None - return ( + return float( EVCStrategy.value(self.min_value, self.max_value, params["seed"]) * EVCStrategy.competition(got_reward, self.max_reward) * EVCStrategy.expectancy( @@ -118,7 +131,7 @@ def motivation(self, params: dict[str, Any]) -> float: ) ) - def plot(self): + def plot(self) -> List[Figure]: """Plot functions for inspection.""" fig0, ax0 = plt.subplots(ncols=1, nrows=1) fig1, ax1 = plt.subplots(ncols=1, nrows=1) @@ -183,7 +196,7 @@ def plot(self): ax3.set_xlabel("Distance / m") ax3.set_ylabel("Motivation") - return fig0, fig1, fig2, fig3 + return [fig0, fig1, fig2, fig3] @dataclass diff --git a/src/plotting.py b/src/plotting.py index 4dbf9bc..4969213 100644 --- a/src/plotting.py +++ b/src/plotting.py @@ -1,12 +1,14 @@ -import streamlit as st -from pedpy.column_identifier import DENSITY_COL, TIME_COL, CUMULATED_COL -import plotly.graph_objects as go +import json +from typing import Dict, List, Tuple + import pandas as pd -from typing import Tuple, List, Dict -from plotly.graph_objs import Figure import pedpy import plotly.express as px -import json +import plotly.graph_objects as go +import streamlit as st +from pedpy.column_identifier import CUMULATED_COL, DENSITY_COL, TIME_COL +from plotly.graph_objs import Figure + from .inifile_parser import parse_accessible_areas diff --git a/src/ui.py b/src/ui.py index f4f932f..df4f304 100644 --- a/src/ui.py +++ b/src/ui.py @@ -112,13 +112,6 @@ def ui_simulation_parameters(data: Dict[str, Any]) -> None: min_value=50, max_value=300, ) - data["motivation_parameters"]["seed"] = st.number_input( - "Seed", - key="seed", - step=1.0, - value=float(data["motivation_parameters"]["seed"]), - help="Seed for random generator for value", - ) def ui_motivation_parameters(data: Dict[str, Any]) -> None: @@ -176,6 +169,13 @@ def ui_motivation_parameters(data: Dict[str, Any]) -> None: step=0.1, value=float(data["motivation_parameters"]["normal_time_gap"]), ) + data["motivation_parameters"]["seed"] = st.number_input( + "Seed", + key="seed", + step=1.0, + value=float(data["motivation_parameters"]["seed"]), + help="Seed for random generator for value", + ) st.sidebar.write("**At this line the motivation is maximal**") with st.sidebar.expander(label="Motivation line"): diff --git a/src/utilities.py b/src/utilities.py index a8a7af9..dfb5b9b 100644 --- a/src/utilities.py +++ b/src/utilities.py @@ -1,32 +1,32 @@ """Some functions to setup the simulation.""" import glob -import os -import plotly.graph_objects as go -from plotly.graph_objs import Figure - -from typing import Dict, List, Tuple, TypeAlias, Any import json -import streamlit as st +import os from pathlib import Path +from typing import Any, Dict, List, Tuple, TypeAlias + import jupedsim as jps -from shapely import GeometryCollection, Polygon -from shapely.ops import unary_union -import numpy.typing as npt import numpy as np -from scipy import stats -from src.inifile_parser import parse_accessible_areas -from .logger_config import log_info, log_error +import numpy.typing as npt import pedpy +import plotly.graph_objects as go +import streamlit as st +from plotly.graph_objs import Figure +from scipy import stats +from shapely import GeometryCollection, Polygon +from shapely.ops import unary_union + +from .logger_config import log_error, log_info Point: TypeAlias = Tuple[float, float] -def delete_txt_files(): +def delete_txt_files() -> None: """Delete all *.sqlite files in the current directory.""" files = glob.glob("files/*.sqlite") if not files: - st.toast(f"No trajectories to delete!", icon="💿") + st.toast("No trajectories to delete!", icon="💿") for file in files: st.toast(f"Delete {file}", icon="💿") try: