From 334aa73a84a83547ad0f29867a25232f7b6e912e Mon Sep 17 00:00:00 2001 From: Tobias Karusseit Date: Fri, 9 Jan 2026 12:19:57 +0100 Subject: [PATCH] add lexiographic routing t shortest path --- docs/graph.md | 9 +- pyproject.toml | 2 +- src/multimodalrouter/graph/graph.py | 133 +++++++++-- tests/unit/test_routegraph_public_features.py | 213 ++++++++++++++++++ 4 files changed, 329 insertions(+), 28 deletions(-) diff --git a/docs/graph.md b/docs/graph.md index f256e1e..4056411 100644 --- a/docs/graph.md +++ b/docs/graph.md @@ -134,7 +134,7 @@ def find_shortest_path( start_id: str, end_id: str, allowed_modes: list[str], - optimization_metric: OptimizationMetric | str = OptimizationMetric.DISTANCE, + optimization_metric: OptimizationMetric | str | tuple = OptimizationMetric.DISTANCE, max_segments: int = 10, verbose: bool = False ) -> Route | None: @@ -145,7 +145,8 @@ def find_shortest_path( - start_id: str = the Hub.id of the starting hub (e.g. the source field for this hub in your data -> for `airports` likely the iata code) (for coordinate searches see [here](#searching-with-coordinates)) - end_id: str = the Hub.id of the traget Hub - allowed_modes: list[str] = a list of transport modes that are allowed in the path (all edges with different modes are excluded)(The modes are set during the graph [initailization](#args)) -- optimization_metric: str = the metric by which the pathfinder will determine the length of the path (must be numeric and present in all searched edges) (default = `distance`) (metrics where also set during [initialization](#args)) +- optimization_metric: str | tuple = the metric by which the pathfinder will determine the length of the path (must be numeric and present in all searched edges) (default = `distance`) (metrics where also set during [initialization](#args)) +if a `tuple | list` is passed it will minimize based on order. The soultion is `not` the total minimum but rather the minimum of the first metric and then the minimum of the the second within the new search space and so on. [`pass 'hops' as an internall metric to minimize the hops` -> optimization_metric=('hops', ...)] - max_segments: int = the maximum number of hubs the route is allowed to include (default = 10 to avoid massive searches but should be setvrealtive to the graph size and density) - verbose: bool = whether you want to store all edges and their data in the route or just the hub names (default=False) @@ -161,7 +162,7 @@ def find_shortest_paths( start_id: str, end_ids: list[str], allowed_modes: list[str] | None = None, - optimization_metric: OptimizationMetric | str = OptimizationMetric.DISTANCE, + optimization_metric: OptimizationMetric | str | tuple = OptimizationMetric.DISTANCE, max_segments: int = 10, verbose: bool = False, custom_filter: Filter | None = None, @@ -173,7 +174,7 @@ def find_shortest_paths( - start_id: str = the id of the start point for all routes - end_ids: list[str] = a list of all the target ids for the search (will find a sepperate route from start to every target) - allowed_modes: list[str] = list of allowed transport Modes (pass `None` to allow all) -- optimization_metric: str | OptimizationMetric = the cost factor that the router will minimize +- optimization_metric: str | OptimizationMetric | tuple = the cost factor that the router will minimize (if tuple this will do a lexiographical search that minimizes the metrics in order not sum) - max_segments: int = the search depth (routes with more than n segments are not explored) - verbose: bool = whether to return verbose routes or not - custom_filter: Filter | None = Filter to add custom restrictions to routing diff --git a/pyproject.toml b/pyproject.toml index 2b06f2c..b669133 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "multimodalrouter" -version = "0.1.11" +version = "0.1.12" description = "A graph-based routing library for dynamic routing." readme = "README.md" license = { file = "LICENSE.md" } diff --git a/src/multimodalrouter/graph/graph.py b/src/multimodalrouter/graph/graph.py index 736bca9..e594fc8 100644 --- a/src/multimodalrouter/graph/graph.py +++ b/src/multimodalrouter/graph/graph.py @@ -341,19 +341,96 @@ def _hubToHubDistances(self, hub1: list[Hub], hub2: list[Hub]): return distances.cpu().numpy() + def _primary_metric(self, optimization_metric): + """ + If optimization_metric is a tuple, return the last element of the tuple. + Otherwise, return optimization_metric itself. + + Args: + optimization_metric (OptimizationMetric | str | tuple): The optimization metric to get the primary metric from. + + Returns: + OptimizationMetric | str: The primary optimization metric. + """ + if isinstance(optimization_metric, tuple): + return optimization_metric[-1] + return optimization_metric + + def _build_priority_spec( + self, + optimization_metric: OptimizationMetric | str | tuple, + ): + """ + preprocesses the optim metric request into the expected tuple format for _dijkstra_single_source + + :param optimization_metric: the target metrics (in order of importance) or a single metric + :type optimization_metric: OptimizationMetric | str | tuple + """ + # ensure backwards compatibility with single metric optim + if isinstance(optimization_metric, (OptimizationMetric, str)): + return (optimization_metric,) + + # already correct type + if isinstance(optimization_metric, tuple): + return optimization_metric + + # list + if isinstance(optimization_metric, list): + return tuple(optimization_metric) + + raise TypeError("Invalid optimization_metric") + + def _compute_priority( + self, + path: PathNode, + acc_metrics: EdgeMetadata, + priority_spec: tuple, + ): + """ + collects the values from the current state, + based on the optim metrics requested + + :param path: the last node i the current path (has prev) + :type path: PathNode + :param acc_metrics: the accumulated metrics + :type acc_metrics: EdgeMetadata + :param priority_spec: the optim metrics (in order of importance) + :type priority_spec: tuple + """ + values = [] + + for key in priority_spec: + if key == "hops": + values.append(path.length) + else: + values.append(acc_metrics.getMetric(key)) + + return tuple(values) + def _dijkstra_single_source( self, start_id: str, target_ids: set[str], allowed_modes: list[str], - optimization_metric: OptimizationMetric, + optimization_metric: OptimizationMetric | tuple, max_segments: int, custom_filter: Filter | None, ): counter = count() - pq: list[tuple[float, int, PathNode, EdgeMetadata]] = [] - start_metrics = EdgeMetadata() + priority_spec = self._build_priority_spec(optimization_metric) + + pq: list[tuple[tuple, int, PathNode, EdgeMetadata]] = [] + + start_metrics = EdgeMetadata( + transportMode=None, + **{ + (m if isinstance(m, str) else m.value): 0 + for m in priority_spec + if m != "hops" + } + ) + start_path = PathNode( hub_id=start_id, mode="", @@ -361,29 +438,30 @@ def _dijkstra_single_source( prev=None, ) - heapq.heappush(pq, (0.0, next(counter), start_path, start_metrics)) + start_priority = self._compute_priority(start_path, start_metrics, priority_spec) + heapq.heappush(pq, (start_priority, next(counter), start_path, start_metrics)) - # visited[(hub_id, path_len)] = best_metric - visited: dict[tuple[str, int], float] = {} + # best lexicographic priority seen per hub + visited: dict[str, tuple] = {} # best result per target - results: dict[str, tuple[PathNode, EdgeMetadata]] = {} + results: dict[str, tuple[PathNode, EdgeMetadata, tuple]] = {} while pq: - current_metric, _, path_node, acc_metrics = heapq.heappop(pq) + priority, _, path_node, acc_metrics = heapq.heappop(pq) hub_id = path_node.hub_id - path_len = path_node.length if path_node is not None else 0 - state = (hub_id, path_len) + path_len = path_node.length - if state in visited and visited[state] <= current_metric: + prev_priority = visited.get(hub_id) + if prev_priority is not None and prev_priority <= priority: continue - visited[state] = current_metric + visited[hub_id] = priority # record result if this hub is a target if hub_id in target_ids: prev = results.get(hub_id) - if prev is None or current_metric < prev[1].getMetric(optimization_metric): - results[hub_id] = (path_node, acc_metrics) + if prev is None or priority < prev[2]: + results[hub_id] = (path_node, acc_metrics, priority) if path_len >= max_segments: continue @@ -413,9 +491,6 @@ def _dijkstra_single_source( ): continue - edge_cost = conn_metrics.getMetric(optimization_metric) - new_metric = current_metric + edge_cost - new_acc_metrics = EdgeMetadata( transportMode=None, **acc_metrics.metrics, @@ -435,12 +510,15 @@ def _dijkstra_single_source( prev=path_node, ) + new_priority = self._compute_priority(new_path_node, new_acc_metrics, priority_spec) + heapq.heappush( pq, - (new_metric, next(counter), new_path_node, new_acc_metrics), + (new_priority, next(counter), new_path_node, new_acc_metrics), ) - return results + # strip priority from results (external behavior unchanged) + return {k: (v[0], v[1]) for k, v in results.items()} def _build_route( self, @@ -503,7 +581,7 @@ def find_shortest_path( start_id: str, end_id: str, allowed_modes: list[str] | None = None, - optimization_metric: OptimizationMetric | str = OptimizationMetric.DISTANCE, + optimization_metric: OptimizationMetric | str | tuple = OptimizationMetric.DISTANCE, max_segments: int = 10, verbose: bool = False, custom_filter: Filter | None = None, @@ -511,6 +589,9 @@ def find_shortest_path( if not isinstance(end_id, str): raise TypeError("end_id must be a single hub id (str)") + if allowed_modes is None: + allowed_modes = list(self.TransportModes.values()) + results = self._dijkstra_single_source( start_id=start_id, target_ids={end_id}, @@ -524,10 +605,11 @@ def find_shortest_path( return None path_node, acc_metrics = results[end_id] + return self._build_route( path_node, acc_metrics, - optimization_metric, + self._primary_metric(optimization_metric), verbose, ) @@ -536,7 +618,7 @@ def find_shortest_paths( start_id: str, end_ids: list[str], allowed_modes: list[str] | None = None, - optimization_metric: OptimizationMetric | str = OptimizationMetric.DISTANCE, + optimization_metric: OptimizationMetric | str | tuple = OptimizationMetric.DISTANCE, max_segments: int = 10, verbose: bool = False, custom_filter: Filter | None = None, @@ -544,6 +626,9 @@ def find_shortest_paths( if not end_ids: return {} + if allowed_modes is None: + allowed_modes = list(self.TransportModes.values()) + target_ids = set(end_ids) results = self._dijkstra_single_source( @@ -557,11 +642,13 @@ def find_shortest_paths( routes: dict[str, Route | VerboseRoute] = {} + primary_metric = self._primary_metric(optimization_metric) + for dst, (path_node, acc_metrics) in results.items(): routes[dst] = self._build_route( path_node, acc_metrics, - optimization_metric, + primary_metric, verbose, ) diff --git a/tests/unit/test_routegraph_public_features.py b/tests/unit/test_routegraph_public_features.py index 4e3859b..62fb65a 100644 --- a/tests/unit/test_routegraph_public_features.py +++ b/tests/unit/test_routegraph_public_features.py @@ -802,3 +802,216 @@ def filter(self, start, end, edge, path): ) self.assertNotIn('C', fc['A']) + + def test_lexicographic_hops_beats_long_chain(self): + """ + Test that minimal hops beats minimal distance + in lexiographical order (hops, distance). + """ + rows = [] + + # A -> B -> C -> ... -> J (9 hops, distance 1 each) + nodes = [chr(ord('A') + i) for i in range(10)] + for i in range(len(nodes) - 1): + rows.append((nodes[i], nodes[i + 1], 1, i, 0, i + 1, 0)) + + # Direct A -> J (1 hop, distance 20) + rows.append(('A', 'J', 20, 0, 0, 9, 0)) + + testDf = pd.DataFrame( + columns=[ + 'source', 'destination', 'distance', + 'source_lat', 'source_lng', + 'destination_lat', 'destination_lng', + ], + data=rows, + ) + + path = os.path.join(self.temp_dir.name, 'deep_chain.csv') + testDf.to_csv(path, index=False) + + graph = RouteGraph( + maxDistance=100, + transportModes={'H': 'mv'}, + dataPaths={'H': path}, + drivingEnabled=False, + ) + + graph.build() + + route = graph.find_shortest_path( + 'A', + 'J', + allowed_modes=['mv'], + optimization_metric=['hops', 'distance'], + verbose=True, + ) + # shows that minimal hops beat minimal distance + path_nodes = [n[0] for n in route.path] + self.assertEqual(path_nodes, ['A', 'J']) + + def test_lexicographic_grid_prefers_straight_path(self): + """ + Test lexicographic grid prefers least hops path + """ + rows = [] + size = 4 # 4x4 grid + + def node(x, y): + return f"N{x}{y}" + + for x in range(size): + for y in range(size): + if x + 1 < size: + rows.append((node(x, y), node(x + 1, y), 1, x, y, x + 1, y)) + if y + 1 < size: + rows.append((node(x, y), node(x, y + 1), 1, x, y, x, y + 1)) + + testDf = pd.DataFrame( + columns=[ + 'source', 'destination', 'distance', + 'source_lat', 'source_lng', + 'destination_lat', 'destination_lng', + ], + data=rows, + ) + + path = os.path.join(self.temp_dir.name, 'grid.csv') + testDf.to_csv(path, index=False) + + graph = RouteGraph( + maxDistance=10, + transportModes={'H': 'mv'}, + dataPaths={'H': path}, + drivingEnabled=False, + ) + + graph.build() + + start = node(0, 0) + end = node(3, 3) + + route = graph.find_shortest_path( + start, + end, + allowed_modes=['mv'], + optimization_metric=['hops', 'distance'], + verbose=True, + ) + + # Manhattan shortest hops = 6 + self.assertEqual(len(route.path) - 1, 6) + + def test_lexicographic_diamond_fanin(self): + """ + stress test for lexicographic with many paths. + check leats hop truth and path backtracking + """ + rows = [] + + # A -> B_i -> C + for i in range(10): + rows.append(('A', f'B{i}', 1, 0, 0, i, 1)) + rows.append((f'B{i}', 'C', 1, i, 1, 0, 2)) + + # Direct A -> C + rows.append(('A', 'C', 10, 0, 0, 0, 2)) + + testDf = pd.DataFrame( + columns=[ + 'source', 'destination', 'distance', + 'source_lat', 'source_lng', + 'destination_lat', 'destination_lng', + ], + data=rows, + ) + + path = os.path.join(self.temp_dir.name, 'diamond.csv') + testDf.to_csv(path, index=False) + + graph = RouteGraph( + maxDistance=50, + transportModes={'H': 'mv'}, + dataPaths={'H': path}, + drivingEnabled=False, + ) + + graph.build() + + route = graph.find_shortest_path( + 'A', + 'C', + allowed_modes=['mv'], + optimization_metric=['hops', 'distance'], + verbose=True, + ) + + # hops-first chooses direct edge + self.assertEqual([n[0] for n in route.path], ['A', 'C']) + + def test_lexicographic_hops_with_depth_filter(self): + """ + checks that filters run as expected + (not that filters are correct since least hops will win before filter) + """ + class MaxDepthFilter(Filter): + def filterHub(self, hub): + return True + + def filterEdge(self, edge): + return True + + def filter(self, start, end, edge, path): + # no paths longer than 3 hops + return sum(1 for _ in path) <= 3 + + rows = [] + + # cheap chain A -> B -> C -> ... -> K (10 hops, distance 1) + chain = [chr(ord('A') + i) for i in range(11)] + for i in range(len(chain) - 1): + rows.append((chain[i], chain[i + 1], 1, i, 0, i + 1, 0)) + + # medium path A -> M -> N -> K (3 hops, distance 5) + rows.extend([ + ('A', 'M', 2, 0, 0, 5, 1), + ('M', 'N', 1, 5, 1, 7, 1), + ('N', 'K', 2, 7, 1, 10, 0), + ]) + + # direct path A -> K (1 hop, distance 20) + rows.append(('A', 'K', 20, 0, 0, 10, 0)) + + testDf = pd.DataFrame( + columns=[ + 'source', 'destination', 'distance', + 'source_lat', 'source_lng', + 'destination_lat', 'destination_lng', + ], + data=rows, + ) + + path = os.path.join(self.temp_dir.name, 'lexi_filter_large.csv') + testDf.to_csv(path, index=False) + + graph = RouteGraph( + maxDistance=100, + transportModes={'H': 'mv'}, + dataPaths={'H': path}, + drivingEnabled=False, + ) + + graph.build() + + route = graph.find_shortest_path( + start_id='A', + end_id='K', + allowed_modes=['mv'], + optimization_metric=['hops', 'distance'], + custom_filter=MaxDepthFilter(), + verbose=True, + ) + + # long chain is rejected by filter; direct path wins lexicographically + path_nodes = [n[0] for n in route.path] + self.assertEqual(path_nodes, ['A', 'K'])