-
Notifications
You must be signed in to change notification settings - Fork 75
Re-org Nework to allow user-defined layout and network name #3964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
4a6e9e6
1fcb1a7
6b99d5a
8e1bb9b
7d92a38
4905783
581ba35
12b4e24
04a747d
e039863
0938d32
d705969
4fb46cb
9dc99e0
fb8b388
af2e92e
7285a91
ab9576c
e4007d0
856b4f0
1c89fdb
56ff5ab
fdc3d9a
0ebebdf
7d163a2
5d1f50e
f45d697
cbb4f93
bc15183
9d85736
c1ae9cd
f9673c3
26702c5
844bd5e
9b1a258
0ed8650
0342eb8
22eb5fc
22ceae1
49940e4
935e219
186c439
258d9e6
d7383e9
dc17cbc
f1c6364
c8bbfae
880d18e
7c7deda
7d6cb1c
5711ccb
e478161
745f3ef
1a5392f
76917b8
ad4d1a5
5c7cf46
79862e7
3ae0236
7970afe
45ac54f
b47757c
0e3c24d
d5eb48d
49d6636
36923ac
855bb28
ee4880b
7e532d2
a6aa16c
f529935
04c5d88
eb646e0
6069042
049dcfe
ecfb56e
ddef721
951791c
90286c7
7b702f0
df9a5ad
42fa00f
f3f165c
2e8b879
c4e4df2
12b00d2
c4e14fd
fa1de4c
c393e58
84b6eec
fcb3fc3
79d4f30
dc704b8
c295ae8
41ec943
e54e05a
f4e7105
53b7c57
bb68efa
5fd76e2
59ba633
df37689
09294c2
0923362
f3b5f3c
d764529
f667693
c7e6d48
87094d2
fa20e30
1dada13
f59f19b
40f78ee
2c80df3
9c9bae3
414a6ad
c2e863f
5ebfb85
675b4e6
9b52a6c
24bc811
bc85954
cc1c418
6f67c67
b50c0ec
427ab25
dca59f0
7392696
a870628
ba8852b
6597324
48255e7
f4e1138
55b1a34
4f19e95
b267f68
74d75fb
95f61ae
e544f49
53aece5
5c10024
f1ec769
db31940
e1d588d
3710ad0
b34e8be
e9dd82e
84396d6
6e16893
a18c724
890328e
dc12bbc
cdc4658
1174a6e
adb03e4
5554f08
07282d1
2f956b2
c004040
44aec5f
4f1a0b5
88ec134
12820c1
fe9e01e
96aebab
83955a8
1ec8e3a
1d98d8f
991d138
7e52fe3
1cfec68
601a5a4
26e31c3
6c60e42
4417bcf
0e2239c
298454c
9f4489a
c250b81
4bb357e
84cc1dd
adb3b6b
1350347
bff46e1
4e5d8a3
20a2ff6
de5254f
8c7a7a7
5fdb09b
5d58d44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,122 @@ | ||
| # Configuration Parameters | ||
|
|
||
| ## Main API | ||
|
|
||
| - `Parameter.decode(value: str) → Any` - Parse value from config file (lenient) | ||
| - `Parameter.encode(value: Any) → str` - Validate value before saving (strict) | ||
| - `ChoiceParameter._choices → list[str]` - Available options (can be dynamic via `@property`) | ||
|
|
||
| ## Key Pattern: decode() vs encode() | ||
|
|
||
| ### ✅ DO: Separate parsing from validation | ||
|
|
||
| ```python | ||
| class MyParameter(Parameter): | ||
| def decode(self, value): | ||
| """Parse - security checks only""" | ||
| if not value: | ||
| return "" | ||
| value = value.strip() | ||
|
|
||
| # Only validate security concerns (path traversal, injection, etc.) | ||
| if self._has_security_issue(value): | ||
| raise ValueError("Security violation") | ||
|
|
||
| return value # Don't check business rules | ||
|
|
||
| def encode(self, value): | ||
| """Validate - all business rules""" | ||
| if not value or not value.strip(): | ||
| raise ValueError("Value required") | ||
|
|
||
| value = value.strip() | ||
|
|
||
| # Security check | ||
| if self._has_security_issue(value): | ||
| raise ValueError("Security violation") | ||
|
|
||
| # Business rule check | ||
| if self._resource_exists(value): | ||
| raise ValueError(f"Resource '{value}' already exists") | ||
|
|
||
| return value | ||
| ``` | ||
|
|
||
| ### ❌ DON'T: Validate business rules in decode() | ||
|
|
||
| ```python | ||
| def decode(self, value): | ||
| # ❌ Expensive I/O on every config load | ||
| if not self._resource_exists(value): | ||
| raise ValueError("Resource not found") | ||
|
|
||
| # ❌ Breaks loading old configs when resources deleted | ||
| if self._check_collision(value): | ||
| raise ValueError("Already exists") | ||
|
|
||
| return value | ||
| ``` | ||
|
|
||
| **Why**: decode() is called when loading config files - must be lenient and fast. | ||
|
|
||
| ## Dynamic Choices | ||
|
|
||
| ### ✅ DO: Use @property for dynamic choices | ||
|
|
||
| ```python | ||
| class DynamicChoiceParameter(ChoiceParameter): | ||
| def initialize(self, parser): | ||
| self.depends_on = ['other-param'] # Declare dependencies | ||
|
|
||
| @property | ||
| def _choices(self): | ||
| """Scan resources on each access""" | ||
| return self._get_available_options() | ||
|
|
||
| def _get_available_options(self): | ||
| # Scan filesystem/database for available options | ||
| if not self._can_scan(): | ||
| return [] | ||
|
|
||
| # Return list of valid choices | ||
| return self._scan_resources() | ||
| ``` | ||
|
|
||
| ## Validation Helpers | ||
|
|
||
| Extract shared validation into helpers: | ||
|
|
||
| ```python | ||
| def _validate_security(self, value): | ||
| """Security checks (used by encode AND decode)""" | ||
| invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|'] | ||
| if any(char in value for char in invalid_chars): | ||
| raise ValueError("Invalid characters") | ||
|
|
||
| def _validate_business(self, value): | ||
| """Business rules (used by encode ONLY)""" | ||
| if self._resource_exists(value): | ||
| raise ValueError("Already exists") | ||
|
|
||
| def decode(self, value): | ||
| return self._validate_security(value.strip()) | ||
|
|
||
| def encode(self, value): | ||
| self._validate_security(value.strip()) | ||
| self._validate_business(value) | ||
| return value | ||
| ``` | ||
|
|
||
| ## Common Pitfalls | ||
|
|
||
| 1. **Validating in decode()** → Fragile config loading | ||
| 2. **No dependency declaration** → Dynamic choices don't update | ||
| 3. **Caching without invalidation** → Stale options | ||
| 4. **Mixing security/business validation** → Security checks in both, business in encode only | ||
|
|
||
| ## Related Files | ||
|
|
||
| - `config.py` - All parameter classes (PathParameter, ChoiceParameter, etc.) | ||
| - `config.pyi` - Type stubs (regenerate: `pixi run python cea/utilities/config_type_generator.py`) | ||
| - `default.config` - Default values for all parameters | ||
| - `interfaces/dashboard/api/tools.py` - Validation API endpoints (`validate_field`, `get_parameter_metadata`) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| AGENTS.md |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -427,11 +427,11 @@ def initialize(self, parser): | |
| the default.config | ||
| """ | ||
|
|
||
| def encode(self, value): | ||
| def encode(self, value) -> str: | ||
| """Encode ``value`` to a string representation for writing to the configuration file""" | ||
| return str(value) | ||
|
|
||
| def decode(self, value): | ||
| def decode(self, value) -> Any: | ||
| """Decode ``value`` to the type supported by this Parameter""" | ||
| return value | ||
|
|
||
|
|
@@ -754,6 +754,69 @@ class StringParameter(Parameter): | |
| """Default Parameter type""""" | ||
|
|
||
|
|
||
| class NetworkLayoutNameParameter(StringParameter): | ||
| """ | ||
| Parameter for network layout names with collision detection. | ||
| Validates in real-time to prevent overwriting existing network layouts. | ||
| """ | ||
|
|
||
| def _validate_network_name(self, value) -> str: | ||
| """ | ||
| Validate network name for invalid characters and collision with existing networks. | ||
| """ | ||
| value = value.strip() | ||
|
|
||
| # Check for invalid filesystem characters | ||
| invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|'] | ||
| if any(char in value for char in invalid_chars): | ||
| raise ValueError( | ||
| f"Network name contains invalid characters. " | ||
| f"Avoid: {' '.join(invalid_chars)}" | ||
| ) | ||
|
|
||
| # Check for collision with existing networks | ||
| scenario = self.config.scenario | ||
| locator = cea.inputlocator.InputLocator(scenario) | ||
|
|
||
| # Check network folder exists | ||
| network_folder = locator.get_thermal_network_folder_network_name_folder(value) | ||
| if os.path.exists(network_folder): | ||
| raise ValueError( | ||
| f"Network '{value}' already exists. " | ||
| f"Choose a different name or delete the existing network." | ||
| ) | ||
|
|
||
| return value | ||
|
|
||
| def encode(self, value): | ||
| """ | ||
| Validate and encode network name. | ||
| Raises ValueError if name contains invalid characters or collides with existing network. | ||
| """ | ||
| if not str(value) or str(value).strip() == '': | ||
| raise ValueError("Network name is required. Please provide a valid name.") | ||
|
|
||
| return self._validate_network_name(str(value)) | ||
|
|
||
| def decode(self, value): | ||
| """Parse and normalize network name from config file""" | ||
| if not value: | ||
| return "" | ||
|
|
||
| value = value.strip() | ||
|
|
||
| # Only validate filesystem characters (security concern) | ||
| # Don't check collision - that's encode's job when saving | ||
| invalid_chars = ['/', '\\', ':', '*', '?', '"', '<', '>', '|'] | ||
| if any(char in value for char in invalid_chars): | ||
| raise ValueError( | ||
| f"Network name contains invalid characters. " | ||
| f"Avoid: {' '.join(invalid_chars)}" | ||
| ) | ||
|
|
||
| return value | ||
|
|
||
|
|
||
| class OptimizationIndividualListParameter(ListParameter): | ||
| def initialize(self, parser): | ||
| # allow the parent option to be set | ||
|
|
@@ -803,6 +866,106 @@ def decode(self, value): | |
| return self._choices[0] | ||
|
|
||
|
|
||
| class NetworkLayoutChoiceParameter(ChoiceParameter): | ||
| """ | ||
| Parameter for selecting existing network layouts based on network type. | ||
| """ | ||
|
|
||
| _network_types = {'DH', 'DC'} | ||
|
|
||
| def initialize(self, parser): | ||
| # Override to dynamically populate choices based on available networks | ||
| pass | ||
|
|
||
| @property | ||
| def _choices(self): | ||
| networks = self._get_available_networks() | ||
| sorted_networks = self._sort_networks_by_modification_time(networks) | ||
| return sorted_networks | ||
|
|
||
| def _get_available_networks(self) -> List[str]: | ||
| locator = cea.inputlocator.InputLocator(self.config.scenario) | ||
| network_folder = locator.get_thermal_network_folder() | ||
| if not os.path.exists(network_folder): | ||
| return [] | ||
| return [name for name in os.listdir(network_folder) | ||
| if os.path.isdir(os.path.join(network_folder, name)) and name not in self._network_types] | ||
|
|
||
| def _get_network_file_paths(self, network_type: str, network_name: str) -> Tuple[str, str]: | ||
| """Get path for network node and edge files for the given network name""" | ||
| locator = cea.inputlocator.InputLocator(self.config.scenario) | ||
|
|
||
| network_type_folder = locator.get_output_thermal_network_type_folder(network_type, network_name) | ||
| # Remove trailing slash/separator if present | ||
| network_type_folder = network_type_folder.rstrip(os.sep) | ||
|
|
||
| edges_path = locator.get_network_layout_nodes_shapefile(network_type, network_name) | ||
| nodes_path = locator.get_network_layout_nodes_shapefile(network_type, network_name) | ||
|
Comment on lines
+902
to
+903
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critical bug: edges_path assigned wrong method. Line 900 calls Apply this diff: - edges_path = locator.get_network_layout_nodes_shapefile(network_type, network_name)
+ edges_path = locator.get_network_layout_edges_shapefile(network_type, network_name)
nodes_path = locator.get_network_layout_nodes_shapefile(network_type, network_name)🤖 Prompt for AI Agents |
||
|
|
||
| return edges_path, nodes_path | ||
|
Comment on lines
+869
to
+905
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix edges path construction and avoid shapefile getter side-effects
Use the network-type folder and join the layout filenames directly instead: def _get_network_file_paths(self, network_type: str, network_name: str) -> Tuple[str, str]:
- """Get path for network node and edge files for the given network name"""
- locator = cea.inputlocator.InputLocator(self.config.scenario)
-
- network_type_folder = locator.get_output_thermal_network_type_folder(network_type, network_name)
- # Remove trailing slash/separator if present
- network_type_folder = network_type_folder.rstrip(os.sep)
-
- edges_path = locator.get_network_layout_nodes_shapefile(network_type, network_name)
- nodes_path = locator.get_network_layout_nodes_shapefile(network_type, network_name)
-
- return edges_path, nodes_path
+ """Get paths for network edge and node files for the given network name (no locator side-effects)."""
+ locator = cea.inputlocator.InputLocator(self.config.scenario)
+ base = locator.get_output_thermal_network_type_folder(network_type, network_name).rstrip(os.sep)
+ edges_path = os.path.join(base, "layout", "edges.shp")
+ nodes_path = os.path.join(base, "layout", "nodes.shp")
+ return edges_path, nodes_pathThis fixes the edges/nodes mix-up and keeps |
||
|
|
||
| def _sort_networks_by_modification_time(self, network_names: List[str]) -> List[str]: | ||
| """Sort network layouts by modification time (most recent first)""" | ||
| modified_times = [] | ||
| for network_name in network_names: | ||
| network_time = [] | ||
| for network_type in self._network_types: | ||
| edges_path, nodes_path = self._get_network_file_paths(network_type, network_name) | ||
| if os.path.exists(edges_path) and os.path.exists(nodes_path): | ||
| sort_time = os.path.getmtime(edges_path) | ||
| network_time.append((network_name, sort_time)) | ||
| if network_time: | ||
| # Get the most recent modification time across network types | ||
| most_recent_time = max(time for _, time in network_time) | ||
| modified_times.append((network_name, most_recent_time)) | ||
|
|
||
| # Sort by modification time, most recent first | ||
| modified_times.sort(key=lambda x: x[1], reverse=True) | ||
| sorted_networks = [network_name for network_name, _ in modified_times] | ||
| return sorted_networks | ||
|
|
||
| def encode(self, value): | ||
| """ | ||
| Validate and encode value. | ||
| Raises ValueError if the network layout doesn't exist. | ||
| """ | ||
| # Empty value not allowed | ||
| if not value or str(value).strip() == '': | ||
| raise ValueError("Network layout is required. Please select a network layout.") | ||
|
|
||
| available_networks = self._get_available_networks() | ||
|
|
||
| # Validate that the network exists | ||
| if str(value) not in available_networks: | ||
| raise ValueError( | ||
| f"Network layout '{value}' not found. " | ||
| f"Available layouts: {', '.join(available_networks)}" | ||
| ) | ||
| return str(value) | ||
|
|
||
| def decode(self, value): | ||
| """ | ||
| Decode and validate value exists in available networks. | ||
| Returns the selected value if valid, otherwise returns the most recent network as default. | ||
|
|
||
| If no value provided and no networks found for current network-type, try to find | ||
| the most recent network across ALL network types and switch network-type accordingly. | ||
| """ | ||
| available_networks = self._get_available_networks() | ||
|
|
||
| # If value is provided and valid, return it | ||
| if value and value in available_networks: | ||
| return value | ||
|
|
||
| # Otherwise, return the most recent network (first choice) if available | ||
| if available_networks: | ||
| sorted_networks = self._sort_networks_by_modification_time(available_networks) | ||
| most_recent_network = sorted_networks[0] | ||
| return most_recent_network | ||
|
|
||
| return '' | ||
|
|
||
|
|
||
| class DatabasePathParameter(Parameter): | ||
| """A parameter that can either be set to a region-specific CEA Database (e.g. CH or SG) or to a user-defined | ||
| folder that has the same structure.""" | ||
|
|
@@ -989,7 +1152,10 @@ class SingleBuildingParameter(ChoiceParameter): | |
|
|
||
| def initialize(self, parser): | ||
| # skip the default ChoiceParameter initialization of _choices | ||
| pass | ||
| try: | ||
| self.nullable = parser.getboolean(self.section.name, f"{self.name}.nullable") | ||
| except configparser.NoOptionError: | ||
| self.nullable = False | ||
|
|
||
| @property | ||
| def _choices(self): | ||
|
|
@@ -999,8 +1165,15 @@ def _choices(self): | |
| if not building_names: | ||
| raise cea.ConfigError("Either no buildings in zone or no zone geometry found.") | ||
| return building_names | ||
|
|
||
| def decode(self, value): | ||
| if self.nullable and (value is None or value == ''): | ||
| return None | ||
| return super().decode(value) | ||
|
|
||
| def encode(self, value): | ||
| if self.nullable and (value is None or value == ''): | ||
| return '' | ||
| if str(value) not in self._choices: | ||
| return self._choices[0] | ||
| return str(value) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix typo: extra backtick in documentation.
The line contains an extra backtick at the end of `AGENTS.md``.
Apply this diff:
📝 Committable suggestion
🤖 Prompt for AI Agents