diff --git a/examples/city_walking_behaviour/Readme.md b/examples/city_walking_behaviour/Readme.md new file mode 100644 index 00000000..1f468969 --- /dev/null +++ b/examples/city_walking_behaviour/Readme.md @@ -0,0 +1,105 @@ +# Walking Behavior Agent-Based Model + +An agent-based model (ABM) that simulates how people walk in cities based on socioeconomic status, environment, and social factors. + +## What This Model Shows + +This simulation demonstrates how different city layouts and social factors affect walking patterns. Key insights include: + +- How safety perceptions influence route choices +- Impact of socioeconomic status on walking frequency +- Effects of centralized vs distributed city planning +- Relationship between social networks and walking behavior + +## Features + +- Four simulation scenarios: + + - Random layout with random safety (RR) + - Random layout with core safety gradient (RS) + - Centralized layout with random safety (CR) + - Centralized layout with core safety gradient (CS) + +- Agent behaviors include: + - Work commutes + - Shopping trips + - Social visits + - Leisure walks + +## Outcomes in different scenarios + +#### RR +![image](https://github.com/user-attachments/assets/fddda37d-d0d5-40f7-9e6d-89fa9b37898a) + +#### RS +![image](https://github.com/user-attachments/assets/67112dbb-056d-4bd5-b668-a1bb389c262d) + +#### CR +![image](https://github.com/user-attachments/assets/aee37477-7693-46a8-848e-5a5986db0e95) + +#### CS +![image](https://github.com/user-attachments/assets/f9e58239-19a2-4fa4-bbea-2b7ccbe36c8f) + +## Quick Start + +1. Run the simulation: + +```python +solara run app.py +``` + +## Technical Details + +### Model Architecture (`model.py`) + +#### Initialization Parameters + +- Grid dimensions (width and height) +- Number of workplaces (categorized into Grocery Stores, Social Places, etc.) +- Population composition (number of couples and singles) + +#### Environmental Layers + +1. **Safety Layer** (`safety_cell_layer`) + + - Values vary based on selected scenario + - Impacts walking behavior and route choices + +2. **Aesthetic Layer** (`aesthetic_cell_layer`) + - Values decrease with distance from center + - Reflects personal preferences in route selection + +### Agent Implementation (`agents.py`) + +#### Human Class Attributes + +- **Demographics**: Gender, Age (18-87), Family Size (1-2), Pet Ownership (20% probability) +- **Personal**: Walking Ability, Walking Attitude, Employment Status +- **Social**: Network of friends and family members + +#### Behavioral System + +Walking attitude influenced by: + +- Social network (family and friends' attitudes) +- Environmental factors (safety and aesthetics) +- Local pedestrian density +- Cumulative walking distance + +### Data Collection + +Tracks metrics across five SES levels (1-5): + +1. Average daily walking trips +2. Work-related trips +3. Basic needs trips +4. Leisure trips + +## File Structure + +- `city_walking_behaviour/model.py`: Core simulation engine +- `city_walking_behaviour/agents.py`: Agent behavior definitions + +## Based On + +This model extends research from ["A Spatial Agent-Based Model for the Simulation of Adults' Daily Walking Within a City"](https://pmc.ncbi.nlm.nih.gov/articles/PMC3306662/) diff --git a/examples/city_walking_behaviour/app.py b/examples/city_walking_behaviour/app.py new file mode 100644 index 00000000..925f8cd2 --- /dev/null +++ b/examples/city_walking_behaviour/app.py @@ -0,0 +1,347 @@ +from city_walking_behaviour.agents import ( + GroceryStore, + Human, + NonFoodShop, + Other, + SocialPlace, +) +from city_walking_behaviour.model import WalkingModel +from mesa.visualization import ( + SolaraViz, + make_plot_component, + make_space_component, +) + +# Define the scenarios +SCENARIOS = [ + ("random_random", "Random Land Use, Random Safety"), + ("random_safe", "Random Land Use, Low Safety in Core"), + ("central_random", "Centralized Land Use, Random Safety"), + ("central_safe", "Centralized Land Use, Low Safety in Core"), +] + + +def agent_portrayal(agent): + """Determine visual portrayal details for each agent.""" + if agent is None: + return + + portrayal = { + "size": 25, + } + + if isinstance(agent, GroceryStore): + portrayal["color"] = "tab:green" + portrayal["marker"] = "s" + portrayal["zorder"] = 2 + elif isinstance(agent, SocialPlace): + portrayal["color"] = "tab:purple" + portrayal["marker"] = "s" + portrayal["zorder"] = 2 + elif isinstance(agent, NonFoodShop): + portrayal["color"] = "tab:olive" + portrayal["marker"] = "s" + portrayal["zorder"] = 2 + elif isinstance(agent, Other): + portrayal["color"] = "tab:brown" + portrayal["marker"] = "s" + portrayal["zorder"] = 2 + elif isinstance(agent, Human): + portrayal["color"] = "tab:red" + portrayal["marker"] = "v" + portrayal["zorder"] = 2 + + return portrayal + + +model_params = { + "width": 40, + "height": 40, + "seed": { + "type": "InputText", + "value": 42, + "label": "Random Seed", + }, + "no_of_couples": { + "type": "SliderInt", + "value": 2400, + "label": "Number of Couples:", + "min": 2000, + "max": 3000, + "step": 100, + }, + "no_of_singles": { + "type": "SliderInt", + "value": 600, + "label": "Number of Singles:", + "min": 300, + "max": 1000, + "step": 20, + }, + "no_of_grocery_stores": { + "type": "SliderInt", + "value": 10, + "label": "Number of Grocery Stores:", + "min": 5, + "max": 20, + "step": 1, + }, + "no_of_social_places": { + "type": "SliderInt", + "value": 75, + "label": "Number of Social Places:", + "min": 50, + "max": 90, + "step": 1, + }, + "no_of_non_food_shops": { + "type": "SliderInt", + "value": 40, + "label": "Number of Non-Food Shops:", + "min": 25, + "max": 55, + "step": 1, + }, + "no_of_others": { + "type": "SliderInt", + "value": 475, + "label": "Number of Other Places:", + "min": 405, + "max": 600, + "step": 1, + }, + "scenario": { + "type": "Select", + "value": "random_random", + "label": "Scenario", + "values": [s[0] for s in SCENARIOS], + }, +} + + +def post_process_space(ax): + """Ensure consistent scaling for visual grid.""" + ax.set_aspect("equal") + ax.set_xticks([]) + ax.set_yticks([]) + ax.get_figure().set_size_inches(10, 10) + + +def post_process_lines_walk(ax): + """Configure the average walking trips plot.""" + handles, labels = ax.get_legend_handles_labels() + new_labels = [] + for label in labels: + if label == "avg_walk_ses1": + new_labels.append("SES 1") + elif label == "avg_walk_ses2": + new_labels.append("SES 2") + elif label == "avg_walk_ses3": + new_labels.append("SES 3") + elif label == "avg_walk_ses4": + new_labels.append("SES 4") + elif label == "avg_walk_ses5": + new_labels.append("SES 5") + else: + new_labels.append(label) + ax.legend(handles, new_labels, loc="center left", bbox_to_anchor=(1, 0.9)) + ax.set_ylabel("Average Walking Trips", fontsize=12, fontweight="normal") + + +def post_process_lines_work(ax): + """Configure the work trips plot.""" + handles, labels = ax.get_legend_handles_labels() + new_labels = [] + for label in labels: + if label == "avg_work_ses1": + new_labels.append("SES 1") + elif label == "avg_work_ses2": + new_labels.append("SES 2") + elif label == "avg_work_ses3": + new_labels.append("SES 3") + elif label == "avg_work_ses4": + new_labels.append("SES 4") + elif label == "avg_work_ses5": + new_labels.append("SES 5") + else: + new_labels.append(label) + ax.legend(handles, new_labels, loc="center left", bbox_to_anchor=(1, 0.9)) + ax.set_ylabel("Average Work Trips", fontsize=12, fontweight="normal") + + +def post_process_lines_basic(ax): + """Configure the basic needs trips plot.""" + handles, labels = ax.get_legend_handles_labels() + new_labels = [] + for label in labels: + if label == "avg_basic_ses1": + new_labels.append("SES 1") + elif label == "avg_basic_ses2": + new_labels.append("SES 2") + elif label == "avg_basic_ses3": + new_labels.append("SES 3") + elif label == "avg_basic_ses4": + new_labels.append("SES 4") + elif label == "avg_basic_ses5": + new_labels.append("SES 5") + else: + new_labels.append(label) + ax.legend(handles, new_labels, loc="center left", bbox_to_anchor=(1, 0.9)) + ax.set_ylabel("Average Trips for Basic Needs", fontsize=12, fontweight="normal") + + +def post_process_lines_leisure(ax): + """Configure the leisure trips plot.""" + handles, labels = ax.get_legend_handles_labels() + new_labels = [] + for label in labels: + if label == "avg_leisure_ses1": + new_labels.append("SES 1") + elif label == "avg_leisure_ses2": + new_labels.append("SES 2") + elif label == "avg_leisure_ses3": + new_labels.append("SES 3") + elif label == "avg_leisure_ses4": + new_labels.append("SES 4") + elif label == "avg_leisure_ses5": + new_labels.append("SES 5") + else: + new_labels.append(label) + ax.legend(handles, new_labels, loc="center left", bbox_to_anchor=(1, 0.9)) + ax.set_ylabel("Average Trips for Leisure", fontsize=12, fontweight="normal") + + +def post_process_buildings_legend(ax): + import matplotlib.lines as mlines + + # Create legend entries for each building/agent + grocery_store = mlines.Line2D( + [], + [], + color="tab:green", + marker="s", + linestyle="None", + markersize=10, + label="Grocery Store", + ) + social_place = mlines.Line2D( + [], + [], + color="tab:purple", + marker="s", + linestyle="None", + markersize=10, + label="Social Place", + ) + non_food_shop = mlines.Line2D( + [], + [], + color="tab:olive", + marker="s", + linestyle="None", + markersize=10, + label="Non-Food Shop", + ) + other_building = mlines.Line2D( + [], + [], + color="tab:brown", + marker="s", + linestyle="None", + markersize=10, + label="Other", + ) + human = mlines.Line2D( + [], + [], + color="tab:red", + marker="v", + linestyle="None", + markersize=10, + label="Human", + ) + + ax.legend( + handles=[grocery_store, social_place, non_food_shop, other_building, human], + loc="center", + bbox_to_anchor=(0.5, 0.5), + ) + ax.axis("off") + + +space_component = make_space_component( + agent_portrayal, draw_grid=True, post_process=post_process_space +) + +plot_component_a = make_plot_component( + { + "avg_walk_ses1": "tab:red", + "avg_walk_ses2": "tab:blue", + "avg_walk_ses3": "tab:green", + "avg_walk_ses4": "tab:purple", + "avg_walk_ses5": "tab:cyan", + }, + post_process=post_process_lines_walk, +) + +plot_component_b = make_plot_component( + { + "avg_work_ses1": "tab:red", + "avg_work_ses2": "tab:blue", + "avg_work_ses3": "tab:green", + "avg_work_ses4": "tab:purple", + "avg_work_ses5": "tab:cyan", + }, + post_process=post_process_lines_work, +) + +plot_component_c = make_plot_component( + { + "avg_basic_ses1": "tab:red", + "avg_basic_ses2": "tab:blue", + "avg_basic_ses3": "tab:green", + "avg_basic_ses4": "tab:purple", + "avg_basic_ses5": "tab:cyan", + }, + post_process=post_process_lines_basic, +) + +plot_component_d = make_plot_component( + { + "avg_leisure_ses1": "tab:red", + "avg_leisure_ses2": "tab:blue", + "avg_leisure_ses3": "tab:green", + "avg_leisure_ses4": "tab:purple", + "avg_leisure_ses5": "tab:cyan", + }, + post_process=post_process_lines_leisure, +) + +plot_component_legend = make_plot_component( + {}, post_process=post_process_buildings_legend +) + +# Initialize and run the model +model = WalkingModel() + +# server = mesa.visualization( +# WalkingModel, +# [space_component, plot_component_legend, plot_component_a, plot_component_b, plot_component_c, plot_component_d], +# "Walking Model", +# model_params, +# ) + +page = SolaraViz( + model, + components=[ + space_component, + plot_component_legend, + plot_component_a, + plot_component_b, + plot_component_c, + plot_component_d, + ], + model_params=model_params, + name="Walking Model", +) +page # noqa diff --git a/examples/city_walking_behaviour/city_walking_behaviour/__init__.py b/examples/city_walking_behaviour/city_walking_behaviour/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/city_walking_behaviour/city_walking_behaviour/agents.py b/examples/city_walking_behaviour/city_walking_behaviour/agents.py new file mode 100644 index 00000000..1209ea4a --- /dev/null +++ b/examples/city_walking_behaviour/city_walking_behaviour/agents.py @@ -0,0 +1,532 @@ +import math +from collections import defaultdict +from enum import Enum +from functools import lru_cache +from typing import List, Optional, Tuple + +import numpy as np +from mesa import Model +from mesa.agent import AgentSet +from mesa.experimental.cell_space import Cell, CellAgent, FixedAgent + +# Constants for probability values +FEMALE_PROBABILITY = 0.5 +DOG_OWNER_PROBABILITY = 0.2 +SINGLE_HOUSEHOLD_PROBABILITY = 0.2 +MIN_AGE = 18 +MAX_AGE = 87 +MIN_FRIENDS = 3 +MAX_FRIENDS = 5 +WORKING_PROBABILITY = 0.95 +RETIREMENT_AGE = 69 + + +class ActivityType(Enum): + WORK = "work" + GROCERY = "grocery" + NON_FOOD_SHOPPING = "shopping" + SOCIAL = "social" + LEISURE = "leisure" + + +class Workplace: + """Generic workplace base class.""" + + def __init__(self, store_type: str): + self.store_type = store_type + + +class GroceryStore(Workplace, FixedAgent): + def __init__(self, model: Model, cell=None): + Workplace.__init__(self, store_type="Grocery Store") + FixedAgent.__init__(self, model) + self.cell = cell + + +class NonFoodShop(Workplace, FixedAgent): + def __init__(self, model: Model, cell=None): + Workplace.__init__(self, store_type="Non-Food Shop") + FixedAgent.__init__(self, model) + self.cell = cell + + +class SocialPlace(Workplace, FixedAgent): + def __init__(self, model: Model, cell=None): + Workplace.__init__(self, store_type="Social Place") + FixedAgent.__init__(self, model) + self.cell = cell + + +class Other(Workplace, FixedAgent): + def __init__(self, model: Model, cell=None): + Workplace.__init__(self, store_type="Other") + FixedAgent.__init__(self, model) + self.cell = cell + + +class DailyWalkingBehaviour: + """Optimized walking behavior model with spatial caching and early termination.""" + + MILES_TO_METERS = 1609.34 + DAILY_PROBABILITIES = { + ActivityType.GROCERY: 0.4, + ActivityType.NON_FOOD_SHOPPING: 0.25, # once every 4 days + ActivityType.SOCIAL: 0.20, + ActivityType.LEISURE: 0.33, + } + + BASE_MAX_DISTANCES = { + ActivityType.WORK: 1.125 * MILES_TO_METERS, # meters + ActivityType.GROCERY: 2.000 * MILES_TO_METERS, + ActivityType.NON_FOOD_SHOPPING: 1.500 * MILES_TO_METERS, + ActivityType.SOCIAL: 2.500 * MILES_TO_METERS, + ActivityType.LEISURE: 5.500 * MILES_TO_METERS, + } + + def __init__(self, model: Model): + self.model = model + self.total_distance_walked = 0 + # Spatial index for quick location lookup + self._location_cache = {} + self._distance_cache = {} + # Maximum possible walking distance for any activity + self._max_possible_distance = max(self.BASE_MAX_DISTANCES.values()) + + def reset_daily_distance(self) -> None: + """Reset daily walking distance.""" + self.total_distance_walked = 0 + + def add_distance(self, distance: float) -> None: + """Add to total daily walking distance.""" + self.total_distance_walked += distance + + @lru_cache(maxsize=1024) # noqa + def get_max_walking_distance(self, ability: float, activity: ActivityType) -> float: + """Cached calculation of max walking distance based on ability.""" + return self.BASE_MAX_DISTANCES[activity] * ability + + @staticmethod + @lru_cache(maxsize=4096) + def calculate_distance(x1: int, y1: int, x2: int, y2: int) -> float: + """Cached distance calculation between two points.""" + dx = x2 - x1 + dy = y2 - y1 + return math.sqrt(dx * dx + dy * dy) + + def get_distance(self, cell1, cell2) -> float: + """Get distance between cells using cache.""" + key = (cell1, cell2) + if key not in self._distance_cache: + x1, y1 = cell1.coordinate + x2, y2 = cell2.coordinate + self._distance_cache[key] = self.calculate_distance(x1, y1, x2, y2) + return self._distance_cache[key] + + def decide_walk_to_work(self, human) -> bool: + """Optimized work walk decision.""" + if not (human.is_working and human.workplace): + return False + + distance = self.get_distance(human.household, human.workplace.cell) + max_distance = self.get_max_walking_distance( + human.walking_ability, ActivityType.WORK + ) + + return ( + distance <= max_distance + and self.model.random.random() <= human.walking_attitude + ) + + def _build_location_cache(self, activity_type: ActivityType) -> None: + """Build spatial index for locations of given type.""" + if activity_type not in self._location_cache: + locations = defaultdict(list) + if activity_type == ActivityType.GROCERY: + agents = self.model.agents_by_type[GroceryStore] + elif activity_type == ActivityType.NON_FOOD_SHOPPING: + agents = self.model.agents_by_type[NonFoodShop] + elif activity_type == ActivityType.SOCIAL: + agents = self.model.agents_by_type[SocialPlace] + else: + return + + # Group locations by grid sectors for faster lookup + sector_size = int(self._max_possible_distance) + for agent in agents: + x, y = agent.cell.coordinate + sector_x = x // sector_size + sector_y = y // sector_size + locations[(sector_x, sector_y)].append(agent) + + self._location_cache[activity_type] = locations + + def find_walkable_locations(self, human, activity_type: ActivityType) -> List: + """Find walkable locations using spatial indexing.""" + self._build_location_cache(activity_type) + max_distance = self.get_max_walking_distance( + human.walking_ability, activity_type + ) + + walkable = [] + locations = self._location_cache.get(activity_type, {}) + + # Helper function to check locations near a reference point + def check_near_point(ref_point): + x, y = ref_point.coordinate + sector_size = int(self._max_possible_distance) + sector_x = x // sector_size + sector_y = y // sector_size + + # Check nearby sectors only + for dx in (-1, 0, 1): + for dy in (-1, 0, 1): + sector = (sector_x + dx, sector_y + dy) + for location in locations.get(sector, []): + if self.get_distance(ref_point, location.cell) <= max_distance: + walkable.append(location) + return bool(walkable) + + # Check household first + if check_near_point(human.household): + return walkable + + # If no locations found and person is working, check workplace + if human.is_working and human.workplace: + check_near_point(human.workplace.cell) + + return walkable + + def get_leisure_cells(self, human) -> List[Cell]: + """ + Get valid leisure walk destinations. + """ + if not human or not human.household: + return [] + + # Calculate distances based on walking ability + max_distance = self.get_max_walking_distance( + human.walking_ability, ActivityType.LEISURE + ) + # Set minimum distance to 75% of max distance + min_distance = max_distance * 0.75 + + household_x, household_y = human.household.coordinate + valid_cells = [] + + for cell in self.model.grid.all_cells.cells: + x, y = cell.coordinate + + # Quick boundary check + if ( + abs(x - household_x) > max_distance + or abs(y - household_y) > max_distance + ): + continue + + # Calculate exact distance + dist = self.calculate_distance(household_x, household_y, x, y) + if min_distance <= dist <= max_distance: + valid_cells.append(cell) + + if len(valid_cells) >= 200: + return valid_cells + + return valid_cells + + def decide_leisure_walk(self, human) -> Optional[Cell]: + """ + Leisure walk decision making. + """ + base_probability = self.DAILY_PROBABILITIES[ActivityType.LEISURE] + + # Consider additional factors that might encourage walking + motivation_factors = 1.0 + if human.has_dog: # Dog owners are more likely to take leisure walks + motivation_factors += 0.3 + if not human.is_working: # Non-working individuals have more time + motivation_factors += 0.2 + + # Final probability calculation + probability = base_probability * human.walking_attitude * motivation_factors + + if self.model.random.random() > probability: + return None + + valid_cells = self.get_leisure_cells(human) + if not valid_cells: + return None + + return self.model.random.choice(valid_cells) + + def simulate_daily_walks(self, human) -> List[Tuple]: + """Optimized daily walk simulation.""" + walks = [] + random = self.model.random.random + + # Work walk + if self.decide_walk_to_work(human): + distance = self.get_distance(human.household, human.workplace.cell) + self.add_distance(distance) + walks.append((ActivityType.WORK, human.workplace)) + + # Basic needs walks + for activity in (ActivityType.GROCERY, ActivityType.NON_FOOD_SHOPPING): + if random() <= self.DAILY_PROBABILITIES[activity]: + walkable = self.find_walkable_locations(human, activity) + if walkable and random() <= human.walking_attitude: + destination = self.model.random.choice(walkable) + distance = self.get_distance(human.household, destination.cell) + self.add_distance(distance) + walks.append((activity, destination)) + + # Social visit + if random() <= self.DAILY_PROBABILITIES[ActivityType.SOCIAL]: + social_place = self.model.random.choice( + self.model.agents_by_type[SocialPlace] + ) + max_distance = self.get_max_walking_distance( + human.walking_ability, ActivityType.SOCIAL + ) + + household_distance = self.get_distance(human.household, social_place.cell) + workplace_distance = float("inf") + if human.is_working and human.workplace: + workplace_distance = self.get_distance( + human.workplace.cell, social_place.cell + ) + + min_distance = min(household_distance, workplace_distance) + if min_distance <= max_distance and random() <= human.walking_attitude: + self.add_distance(min_distance) + walks.append((ActivityType.SOCIAL, social_place)) + + # Leisure walk + leisure_destination = self.decide_leisure_walk(human) + if leisure_destination: + distance = self.get_distance(human.household, leisure_destination) + self.add_distance(distance) + walks.append((ActivityType.LEISURE, leisure_destination)) + + return walks + + def __repr__(self) -> str: + """ + Return a detailed string representation of the DailyWalkingBehaviour. + + Returns: + str: String showing model state including caches and distances + """ + cache_stats = { + "location_cache_size": sum( + len(locations) for locations in self._location_cache.values() + ), + "distance_cache_size": len(self._distance_cache), + "leisure_cache_size": len(getattr(self, "_leisure_cells_cache", {})), + } + + return ( + f"DailyWalkingBehaviour(" + f"total_distance_walked={self.total_distance_walked:.2f}, " + f"max_possible_distance={self._max_possible_distance}, " + f"cache_sizes={cache_stats}, " + f"daily_probabilities={len(self.DAILY_PROBABILITIES)} activities)" + ) + + +class Human(CellAgent): + """Represents a person with specific attributes and daily walking behavior.""" + + def __init__( + self, + model: Model, + gender: Optional[int] = None, + family_size: Optional[int] = None, + age: Optional[int] = None, + SES: Optional[int] = None, + unique_id: int = 0, + cell=None, + household: Cell = None, + ): + super().__init__(model) + self.cell = cell + self.unique_id = unique_id + self.household = household + + # Human Attributes + self.gender = gender + self.age = age + self.SES = SES + self.family_size = family_size + self.has_dog = self.model.generate_dog_ownership() + self.walking_ability = self.get_walking_ability() + self.walking_attitude = self.get_walking_attitude() + self.is_working = self._determine_working_status() + self.workplace = self.get_workplace() + self.friends = self.get_friends() + self.family: Human = None + + self.previous_walking_density: float = 0 + + # Datacollector attributes + self.walk_daily_trips: int = 0 + self.work_trips: int = 0 + self.basic_needs_trips: int = 0 + self.leisure_trips: int = 0 + + # Initialize walking behavior + self.walking_behavior = DailyWalkingBehaviour(model) + + def _determine_working_status(self) -> bool: + if self.age >= RETIREMENT_AGE: + return False + return self.random.random() < WORKING_PROBABILITY + + def reset_daily_trips(self): + self.walk_daily_trips = 0 + self.basic_needs_trips = 0 + self.leisure_trips = 0 + self.work_trips = 0 + + def get_friends(self) -> AgentSet: + friend_count = self.random.randint(MIN_FRIENDS, MAX_FRIENDS) + friend_set = AgentSet.select( + self.model.agents_by_type[Human], + lambda x: ( + x.SES > self.SES - 2 and x.SES < self.SES + 2 + ) # get friends with similar SES i.e. difference no more than 3 + and x.unique_id != self.unique_id, + at_most=friend_count, + ) + if len(friend_set) > 0: + for friend in friend_set: + friend.friends.add(self) # add self to the friends list as well + return friend_set + + def get_workplace(self) -> Optional[Workplace | FixedAgent]: + if not self.is_working: + return None + + # Get all workplaces like grocery stores, non-food shops, social places + all_workplaces = [ + workplace + for workplace in self.model.agents + if not isinstance(workplace, Human) + ] + return self.random.choice(all_workplaces) + + def get_walking_ability( + self, + ) -> float: # Method from https://pmc.ncbi.nlm.nih.gov/articles/PMC3306662/ + random_component = self.random.random() ** 4 + if self.age <= 37: + # For ages up to 37, use the base calculation + return random_component * (min(abs(137 - self.age), 100) / 100) + else: + # For ages over 37, apply linear decrease + base_ability = random_component * (min(abs(137 - self.age), 100) / 100) + age_factor = (self.age - 37) / 50 # Linear decrease factor + return base_ability * (1 - age_factor) + + def get_walking_attitude( + self, + ) -> float: # Method from https://pmc.ncbi.nlm.nih.gov/articles/PMC3306662/ + return self.random.random() ** 3 + + def get_feedback(self): + a: float = 0.001 # attitude factor + + # 1. Social network feedback (family and friends) + # Store original attitude for use in calculations + At = self.walking_attitude + + # Family feedback (Equations 1 & 2 in literature) + if self.family: + self.walking_attitude = (1 - a) * At + a * self.family.walking_attitude + + # Friends feedback (Equation 3 in literature) + if self.friends: + friends_attitude = sum(friend.walking_attitude for friend in self.friends) + if len(self.friends) > 0: + friends_attitude /= len(self.friends) + self.walking_attitude = (1 - a) * At + a * friends_attitude + + # 2. Walking experience feedback (Equation 4 in literature) + x, y = self.cell.coordinate + SE_index = ( + ( + self.model.safety_cell_layer.data[x][y] + + self.model.random.uniform(-0.5, 0.5) + ) + * ( + self.model.aesthetic_cell_layer.data[x][y] + + self.model.random.uniform(-0.5, 0.5) + ) + ) / np.mean( + self.model.safety_cell_layer.data * self.model.aesthetic_cell_layer.data + ) + + # 3. Density feedback + # Compare current walking density to previous day + neighbour_cells = self.cell.get_neighborhood(radius=1) + current_density = sum(len(cell.agents) for cell in neighbour_cells) / len( + neighbour_cells + ) + + Id = 0 + if self.previous_walking_density > 0: + Id = current_density / self.previous_walking_density + else: + Id = 1 if current_density > 0 else 0 + + self.previous_walking_density = current_density + + # 4. Walking distance feedback (Equation 5 in literature) + It = 0 + if self.walking_behavior.total_distance_walked > 0: + Ab_Da = sum( + [ + dis * self.walking_ability + for dis in self.walking_behavior.BASE_MAX_DISTANCES.values() + ] + ) + d = self.walking_behavior.total_distance_walked + It = min(1, Ab_Da / d) + + # Final attitude update (Equation 6 in literature) + self.walking_attitude = ( + At * (1 - a + a * SE_index) * (1 - a + a * Id) * (1 - a + a * It) + ) + + def step(self): + """Execute one simulation step: decide on daily walks, update feedback.""" + daily_walks = self.walking_behavior.simulate_daily_walks(self) + + # Update datacollector attributes + self.walk_daily_trips = len(daily_walks) + self.work_trips = sum( + [1 for activity, _ in daily_walks if activity == ActivityType.WORK] + ) + self.basic_needs_trips = sum( + [ + 1 + for activity, _ in daily_walks + if activity + in [ + ActivityType.GROCERY, + ActivityType.NON_FOOD_SHOPPING, + ActivityType.SOCIAL, + ] + ] + ) + self.leisure_trips = sum( + [1 for activity, _ in daily_walks if activity == ActivityType.LEISURE] + ) + + if len(daily_walks) > 0: + self.get_feedback() + for activity, destination in daily_walks: + # Move agent to new cell if applicable + if isinstance(destination, FixedAgent): + self.cell = destination.cell + elif isinstance(destination, Cell): + self.cell = destination diff --git a/examples/city_walking_behaviour/city_walking_behaviour/model.py b/examples/city_walking_behaviour/city_walking_behaviour/model.py new file mode 100644 index 00000000..19708c82 --- /dev/null +++ b/examples/city_walking_behaviour/city_walking_behaviour/model.py @@ -0,0 +1,558 @@ +import math + +from mesa import Model +from mesa.agent import AgentSet +from mesa.datacollection import DataCollector +from mesa.experimental.cell_space import OrthogonalVonNeumannGrid +from mesa.experimental.cell_space.property_layer import PropertyLayer + +from .agents import ( + DOG_OWNER_PROBABILITY, + FEMALE_PROBABILITY, + MAX_AGE, + MIN_AGE, + SINGLE_HOUSEHOLD_PROBABILITY, + GroceryStore, + Human, + NonFoodShop, + Other, + SocialPlace, +) + +SCENARIOS = { + "random_random": "Random Land Use, Random Safety", + "random_safe": "Random Land Use, Low Safety in Core", + "central_random": "Centralized Land Use, Random Safety", + "central_safe": "Centralized Land Use, Low Safety in Core", +} + + +class WalkingModel(Model): + def __init__( + self, + height: int = 40, + width: int = 40, + no_of_couples: int = 2400, + no_of_singles: int = 600, + no_of_grocery_stores: int = 10, + no_of_social_places: int = 75, + no_of_non_food_shops: int = 40, + no_of_others: int = 475, + scenario: str = "random_random", + seed=None, + ): + super().__init__(seed=seed) + + # Initialize basic properties + self.initialize_properties( + height, + width, + no_of_couples, + no_of_singles, + no_of_grocery_stores, + no_of_social_places, + no_of_non_food_shops, + no_of_others, + ) + + # Set up grid and layers + self.setup_grid_and_layers() + + # Apply selected scenario + self.apply_scenario(scenario) + + def create_model_reporters(): + """Create a dictionary of model reporters with minimal boilerplate""" + reporters = {} + metrics = [ + "walk_daily_trips", + "work_trips", + "basic_needs_trips", + "leisure_trips", + ] + + for metric in metrics: + for ses in range(1, 6): + name = ( + f"avg_{metric.split('_')[0]}_ses{ses}" # e.g. "avg_walk_ses1" + ) + + # Create the reporter function + reporters[name] = lambda x, m=metric, s=ses: ( + sum( + getattr(agent, m) + for agent in x.agents_by_type[Human] + if s == agent.SES + ) + / len(x.agents_by_type[Human]) + if x.agents_by_type[Human] + else 0 + ) + + return reporters + + # Generate the model_reporters dict + model_reporters = create_model_reporters() + + self.datacollector = DataCollector(model_reporters) + self.human_set: AgentSet = AgentSet([], random=self.random) + # Add initial humans + self.add_initial_humans() + + self.datacollector.collect(self) + self.running = True + + def initialize_properties( + self, + height, + width, + no_of_couples, + no_of_singles, + no_of_grocery_stores, + no_of_social_places, + no_of_non_food_shops, + no_of_others, + ): + """Initialize basic model properties.""" + self.height = height + self.width = width + self.no_of_couples = no_of_couples + self.no_of_singles = no_of_singles + self.no_of_grocery_stores = no_of_grocery_stores + self.no_of_social_places = no_of_social_places + self.no_of_non_food_shops = no_of_non_food_shops + self.no_of_others = no_of_others + self.no_of_humans = 2 * self.no_of_couples + self.no_of_singles + self.unique_id = 1 + + def setup_grid_and_layers(self): + """Set up the visual grid and associated layers.""" + self.grid = OrthogonalVonNeumannGrid( + [self.height, self.width], + torus=True, + capacity=math.inf, + random=self.random, + ) + self.safety_cell_layer = PropertyLayer( + "safety", (self.width, self.height), dtype=float + ) + self.aesthetic_cell_layer = PropertyLayer( + "aesthetic", (self.width, self.height), dtype=float + ) + self.setup_aesthetic_layer() + + def setup_aesthetic_layer(self): + """Setup aesthetic distribution with central tendency and organic variations""" + center_x, center_y = self.height // 2, self.width // 2 + max_distance = math.sqrt((self.height // 2) ** 2 + (self.width // 2) ** 2) + + # Create multiple aesthetic hotspots + hotspots = [ + (center_x, center_y, 1.0), # Main center, full weight + ( + center_x + self.height // 4, + center_y + self.width // 4, + 0.7, + ), # Secondary spots + (center_x - self.height // 4, center_y - self.width // 4, 0.7), + (center_x + self.height // 3, center_y - self.width // 3, 0.5), + (center_x - self.height // 3, center_y + self.width // 3, 0.5), + ] + + # Create base noise grid for organic variation + noise_grid = [ + [self.random.random() * 0.3 for _ in range(self.width)] + for _ in range(self.height) + ] + + # Apply smoothing to noise + smoothing_radius = 2 + smoothed_noise = [[0 for _ in range(self.width)] for _ in range(self.height)] + for i in range(self.height): + for j in range(self.width): + total = 0 + count = 0 + for di in range(-smoothing_radius, smoothing_radius + 1): + for dj in range(-smoothing_radius, smoothing_radius + 1): + ni, nj = (i + di) % self.height, (j + dj) % self.width + total += noise_grid[ni][nj] + count += 1 + smoothed_noise[i][j] = total / count + + def calculate_aesthetic_value(i, j): + # Calculate influence from all hotspots + hotspot_influence = 0 + total_weight = 0 + + for hx, hy, weight in hotspots: + distance = math.sqrt((i - hx) ** 2 + (j - hy) ** 2) + normalized_distance = distance / max_distance + + # Exponential decay with distance + influence = math.exp(-2 * normalized_distance) * weight + hotspot_influence += influence + total_weight += weight + + # Normalize hotspot influence + hotspot_value = hotspot_influence / total_weight + + # Combine with smoothed noise + base_value = hotspot_value * 0.7 + smoothed_noise[i][j] + + # Add some local character + local_variation = self.random.random() * 0.2 + if base_value > 0.7: # High aesthetic areas get more consistent + local_variation *= 0.5 + + # Add some "character" to certain areas + if 0.3 < base_value < 0.7: # Mid-range areas get more variation + local_variation *= 1.5 + + final_value = base_value + local_variation + + # Ensure value stays within [0, 1] + return max(0.1, min(1.0, final_value)) + + for i in range(self.height): + for j in range(self.width): + self.aesthetic_cell_layer.data[i][j] = calculate_aesthetic_value(i, j) + + # Apply final smoothing pass for more natural transitions + final_smoothing = 1 + for i in range(self.height): + for j in range(self.width): + total = 0 + count = 0 + for di in range(-final_smoothing, final_smoothing + 1): + for dj in range(-final_smoothing, final_smoothing + 1): + ni, nj = (i + di) % self.height, (j + dj) % self.width + total += self.aesthetic_cell_layer.data[ni][nj] + count += 1 + self.aesthetic_cell_layer.data[i][j] = total / count + + def apply_scenario(self, scenario: str): + """Apply chosen scenario for building placement & safety setup.""" + scenario_handlers = { + "random_random": self._apply_random_random, + "random_safe": self._apply_random_safe, + "central_random": self._apply_central_random, + "central_safe": self._apply_central_safe, + } + if scenario not in scenario_handlers: + raise ValueError(f"Invalid scenario: {scenario}") + scenario_handlers[scenario]() + + def _place_buildings_random(self): + """Place buildings randomly on the grid.""" + building_types = [ + (GroceryStore, self.no_of_grocery_stores), + (SocialPlace, self.no_of_social_places), + (NonFoodShop, self.no_of_non_food_shops), + (Other, self.no_of_others), + ] + + for building_type, count in building_types: + for _ in range(count): + cell = self.grid.select_random_empty_cell() + if cell is not None: + building_type(self, cell) + + def _place_buildings_central(self): + """Place buildings with stronger central tendency using exponential decay.""" + center_x, center_y = self.width // 2, self.height // 2 + max_distance = math.sqrt((self.width // 2) ** 2 + (self.height // 2) ** 2) + + # Different weights for different building types (personal preference) + building_types = [ + (GroceryStore, self.no_of_grocery_stores, 0.7), # Less centralized + (SocialPlace, self.no_of_social_places, 0.85), # More centralized + (NonFoodShop, self.no_of_non_food_shops, 0.9), # Highly centralized + (Other, self.no_of_others, 0.6), # Least centralized + ] + + def get_placement_probability(x, y, centralization_weight): + # Calculate distance from center + distance = math.sqrt((x - center_x) ** 2 + (y - center_y) ** 2) + normalized_distance = distance / max_distance + + # Exponential decay function for stronger central tendency + prob = math.exp(-centralization_weight * normalized_distance * 3) + + # Add some randomness to avoid perfect circles + prob *= 0.8 + 0.4 * self.random.random() + + return prob + + for building_type, count, centralization_weight in building_types: + buildings_placed = 0 + max_attempts = count * 20 + attempts = 0 + + while buildings_placed < count and attempts < max_attempts: + cell = self.grid.select_random_empty_cell() + if cell is None: + break + + x, y = cell.coordinate + if self.random.random() < get_placement_probability( + x, y, centralization_weight + ): + building_type(self, cell) + buildings_placed += 1 + attempts += 1 + + def _setup_random_safety(self): + """Set up random safety values with some spatial correlation.""" + # Initialize with pure random values + base_values = [ + [self.random.random() for _ in range(self.width)] + for _ in range(self.height) + ] + + # Apply smoothing to create more realistic patterns + smoothing_radius = 2 + for i in range(self.height): + for j in range(self.width): + # Calculate average of neighboring cells + total = 0 + count = 0 + for di in range(-smoothing_radius, smoothing_radius + 1): + for dj in range(-smoothing_radius, smoothing_radius + 1): + ni, nj = (i + di) % self.height, (j + dj) % self.width + total += base_values[ni][nj] + count += 1 + + # Set smoothed value with some random variation + smoothed = total / count + variation = (self.random.random() - 0.5) * 0.2 # +/-0.1 variation + self.safety_cell_layer.data[i][j] = max( + 0.1, min(1.0, smoothed + variation) + ) + + def _setup_central_safety(self): + """Set up safety values with a subtle urban-like distribution.""" + center_x, center_y = self.height // 2, self.width // 2 + max_distance = math.sqrt((self.height // 2) ** 2 + (self.width // 2) ** 2) + + # Define centers with proper formatting + centers = [ + (center_x, center_y), # Main center + # Secondary Centers: Removed for model simplicity. + # (center_x + self.height // 4, center_y + self.width // 4), # NE + # (center_x - self.height // 4, center_y - self.width // 4), # SW + # (center_x + self.height // 4, center_y - self.width // 4), # SE + # (center_x - self.height // 4, center_y + self.width // 4), # NW + ] + + def calculate_safety(i, j): + # Calculate Manhattan distance + distances = [max(abs(i - cx), abs(j - cy)) for cx, cy in centers] + min_distance = min(distances) / max_distance + + # More subtle base safety calculation for center + base_safety = 0.4 + ( + min_distance * 0.4 + ) # Reduced range and increased minimum + + # Reduced square influence for center + square_factor = abs((i - center_x) / self.height) + abs( + (j - center_y) / self.width + ) + central_damping = 1 - max(0, 0.7 - min_distance) # Reduce effect in center + square_influence = ( + square_factor * 0.15 * central_damping + ) # Reduced overall influence + + # Smaller local variation + local_variation = self.random.random() * 0.03 # Reduced random variation + + # More subtle transition in center + if min_distance < 0.3: + safety = ( + base_safety + (square_influence * 0.5) + (local_variation * 0.5) + ) + else: + safety = base_safety + square_influence + (local_variation * 0.4) + + return max(0.3, min(0.85, safety)) # Narrower range for more subtlety + + # Fill the safety layer + for i in range(self.height): + for j in range(self.width): + self.safety_cell_layer.data[i][j] = calculate_safety(i, j) + + def _apply_random_random(self): + self._place_buildings_random() + self._setup_random_safety() + + def _apply_random_safe(self): + self._place_buildings_random() + self._setup_central_safety() + + def _apply_central_random(self): + self._place_buildings_central() + self._setup_random_safety() + + def _apply_central_safe(self): + self._place_buildings_central() + self._setup_central_safety() + + def add_initial_humans(self): + """Add initial humans with distance-based cell organization. Each cell can contain up to 'n' households. + Couples share households.""" + center_x, center_y = self.height // 2, self.width // 2 + max_distance = math.sqrt((self.height // 2) ** 2 + (self.width // 2) ** 2) + MAX_HOUSEHOLDS_PER_CELL = 10 + + # Track available cells and their SES levels along with current occupancy + cells_by_ses = {} + cell_occupancy = {} # Track number of households in each cell + + for cell in self.grid.all_cells: + x, y = cell.coordinate + distance = math.sqrt((x - center_x) ** 2 + (y - center_y) ** 2) + normalized_distance = distance / max_distance + + # Assign SES based on normalized distance from center + if normalized_distance < 0.2: + ses_level = 1 + elif normalized_distance < 0.4: + ses_level = 2 + elif normalized_distance < 0.6: + ses_level = 3 + elif normalized_distance < 0.8: + ses_level = 4 + else: + ses_level = 5 + + if ses_level not in cells_by_ses: + cells_by_ses[ses_level] = [] + cells_by_ses[ses_level].append(cell) + cell_occupancy[cell] = 0 # Initialize occupancy counter + + def find_available_cell(ses): + """Helper function to find a cell that hasn't reached maximum capacity""" + # Try in preferred SES zone first + available_cells = [ + cell + for cell in cells_by_ses.get(ses, []) + if cell_occupancy[cell] < MAX_HOUSEHOLDS_PER_CELL + ] + + if available_cells: + return self.random.choice(available_cells) + + # Look in adjacent SES zones (alternate between higher and lower) + for offset in range(1, 5): + # Try higher SES + if ses + offset <= 5: + available_cells = [ + cell + for cell in cells_by_ses.get(ses + offset, []) + if cell_occupancy[cell] < MAX_HOUSEHOLDS_PER_CELL + ] + if available_cells: + return self.random.choice(available_cells) + + # Try lower SES + if ses - offset >= 1: + available_cells = [ + cell + for cell in cells_by_ses.get(ses - offset, []) + if cell_occupancy[cell] < MAX_HOUSEHOLDS_PER_CELL + ] + if available_cells: + return self.random.choice(available_cells) + + return None + + # Place couples first (they share the same household) + for _ in range(self.no_of_couples): + ses = self.generate_ses() + cell = find_available_cell(ses) + + if cell: + # Create both members of the couple in the same household + household = cell # Using cell as household identifier + cell_occupancy[cell] += ( + 1 # Increment occupancy (couples count as one household) + ) + + male = Human( + self, + gender="Male", + family_size=2, + age=self.generate_age(), + SES=ses, + unique_id=self.unique_id, + cell=cell, + household=cell, + ) + self.unique_id += 1 + female = Human( + self, + gender="Female", + family_size=2, + age=self.random.randint( + male.age - 3, male.age + 3 + ), # selecting age with difference no more than 3. + SES=ses, + unique_id=self.unique_id, + cell=cell, + household=cell, + ) + self.unique_id += 1 + + # Link the couple together + male.family = female + female.family = male + + self.human_set.add(male) + self.human_set.add(female) + + # Place singles (each gets their own household) + for _ in range(self.no_of_singles): + ses = self.generate_ses() + cell = find_available_cell(ses) + + if cell: + household = cell # Using cell as household identifier + cell_occupancy[cell] += 1 # Increment occupancy + + person = Human( + self, + gender=self.generate_gender(), + family_size=1, + age=self.generate_age(), + SES=ses, + unique_id=self.unique_id, + cell=cell, + household=household, + ) + self.unique_id += 1 + + self.human_set.add(person) + + def step(self): + """Advance the model by one step.""" + self.human_set.shuffle_do("step") + + self.datacollector.collect(self) + + # Reset daily walking trips + self.human_set.do("reset_daily_trips") + + def generate_gender(self) -> str: + return "Female" if self.random.random() < FEMALE_PROBABILITY else "Male" + + def generate_age(self) -> int: + return self.random.randint(MIN_AGE, MAX_AGE) + + def generate_ses(self) -> int: + return self.random.randint(1, 5) + + def generate_family_size(self) -> int: + return 1 if self.random.random() < SINGLE_HOUSEHOLD_PROBABILITY else 2 + + def generate_dog_ownership(self) -> bool: + return self.random.random() < DOG_OWNER_PROBABILITY