diff --git a/pyproject.toml b/pyproject.toml index 5966637..855d805 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "multimodalrouter" -version = "0.1.13" +version = "0.1.16" description = "A graph-based routing library for dynamic routing." readme = "README.md" license = { file = "LICENSE.md" } diff --git a/src/multimodalrouter/graph/graph.py b/src/multimodalrouter/graph/graph.py index 70f8585..cdb01e2 100644 --- a/src/multimodalrouter/graph/graph.py +++ b/src/multimodalrouter/graph/graph.py @@ -11,8 +11,9 @@ import pandas as pd from .dataclasses import Hub, EdgeMetadata, OptimizationMetric, Route, Filter, VerboseRoute, PathNode from threading import Lock -from collections import deque +from collections import defaultdict, deque from itertools import count +import warnings class RouteGraph: @@ -36,8 +37,8 @@ def __init__( # a list of coordinate names for the destination coords in the datasets (name to dataset matching is automatic) destCoordKeys: list[str] = ["destination_lat", "destination_lng"], ): - self.sourceCoordKeys = set(sourceCoordKeys) - self.destCoordKeys = set(destCoordKeys) + self.sourceCoordKeys = sourceCoordKeys + self.destCoordKeys = destCoordKeys self.compressed = compressed self.extraMetricsKeys = extraMetricsKeys @@ -258,8 +259,8 @@ def _generateHubs(self): data = self._loadData(hubType) added = set() - thisSourceKeys = self.sourceCoordKeys & set(data.columns) - thisDestinationKeys = self.destCoordKeys & set(data.columns) + thisSourceKeys = [k for k in self.sourceCoordKeys if k in data.columns] + thisDestinationKeys = [k for k in self.destCoordKeys if k in data.columns] # get required and extra columns required_cols = { @@ -270,13 +271,10 @@ def _generateHubs(self): } # collect extra data from the dataset columns that are not required but marked as extra - extra_metric_cols = [] - for m in self.extraMetricsKeys: - if m not in required_cols: - try: - extra_metric_cols.append(m) - except KeyError: - continue + extra_metric_cols = [ + m for m in self.extraMetricsKeys + if m in data.columns and m not in required_cols + ] for row in tqdm(data.itertuples(index=False), desc=f"Generating {hubType} Hubs", unit="hub"): # create hubs if they don't exist @@ -284,11 +282,37 @@ def _generateHubs(self): hub = Hub(coords=[getattr(row, k) for k in thisSourceKeys], id=row.source, hubType=hubType) self.addHub(hub) added.add(row.source) + elif row.source not in self.Graph[hubType].keys(): + other_type = self.getHubById(row.source) + warnings.warn( + f"Hub {row.source}, with type {hubType}, already exists \ + as a {other_type.hubType} hub and will be skipped. \ + \n If you want to add it as a {hubType} hub, please delete or rename the other hub first." + ) + elif self.Graph[hubType][row.source].coords != [getattr(row, k) for k in thisSourceKeys]: + warnings.warn( + f"Hub {row.source}, with type {hubType}, was found with two different coordinates. \ + \nThis may be due to a data error or an ordering error in the source/dest-CoordKeys.\ + \nThe graph will ignore this hub and keep only the first instance." + ) if row.destination not in added: hub = Hub(coords=[getattr(row, k) for k in thisDestinationKeys], id=row.destination, hubType=hubType) self.addHub(hub) added.add(row.destination) + elif row.source not in self.Graph[hubType].keys(): + other_type = self.getHubById(row.source) + warnings.warn( + f"Hub {row.source}, with type {hubType}, already exists \ + as a {other_type.hubType} hub and will be skipped. \ + \n If you want to add it as a {hubType} hub, please delete or rename the other hub first." + ) + elif self.Graph[hubType][row.source].coords != [getattr(row, k) for k in thisSourceKeys]: + warnings.warn( + f"Hub {row.source}, with type {hubType}, was found with two different coordinates. \ + \nThis may be due to a data error or an ordering error in the source/dest-CoordKeys.\ + \nThe graph will ignore this hub and keep only the first instance." + ) # get extra metrics extra_metrics = { @@ -411,15 +435,37 @@ def _dijkstra_single_source( self, start_id: str, target_ids: set[str], - allowed_modes: list[str], + allowed_modes: list[str] | None, optimization_metric: OptimizationMetric | tuple, max_segments: int, custom_filter: Filter | None, ): + """ + Pareto Dijkstra with safety guarantees: + - Correct for single and multi-target + - Guaranteed termination + - Cycle-safe + """ + counter = count() + # create the priority spec priority_spec = self._build_priority_spec(optimization_metric) + # ensure hops is in the priority spec (for safety) + if "hops" not in priority_spec: + priority_spec = (*priority_spec, "hops") + + def dominates(p1: tuple, p2: tuple) -> bool: + """Return True if p1 dominates p2 (<= all, < at least one).""" + strictly_better = False + for a, b in zip(p1, p2): + if a > b: + return False + if a < b: + strictly_better = True + return strictly_better + pq: list[tuple[tuple, int, PathNode, EdgeMetadata]] = [] start_metrics = EdgeMetadata( @@ -431,6 +477,7 @@ def _dijkstra_single_source( } ) + # create initial node start_path = PathNode( hub_id=start_id, mode="", @@ -438,14 +485,25 @@ def _dijkstra_single_source( prev=None, ) - start_priority = self._compute_priority(start_path, start_metrics, priority_spec) - heapq.heappush(pq, (start_priority, next(counter), start_path, start_metrics)) + start_priority = self._compute_priority( + start_path, start_metrics, priority_spec + ) - # best lexicographic priority seen per hub - visited: dict[str, tuple] = {} + heapq.heappush( + pq, (start_priority, next(counter), start_path, start_metrics) + ) + + # hub_id: list of non dominated labels + labels: dict[str, list[tuple]] = defaultdict(list) + + # target_id: path + results: dict[str, list[tuple[PathNode, EdgeMetadata, tuple]]] = defaultdict(list) - # best result per target - results: dict[str, tuple[PathNode, EdgeMetadata, tuple]] = {} + # remaining target tracker for early termination + remaining_targets = set(target_ids) + + # limit labels per hub + MAX_LABELS_PER_HUB = 50 if allowed_modes is None: allowed_modes = list(self.TransportModes.values()) @@ -457,17 +515,30 @@ def _dijkstra_single_source( hub_id = path_node.hub_id path_len = path_node.length - prev_priority = visited.get(hub_id) - if prev_priority is not None and prev_priority <= priority: + # reject dominated labels + if any(dominates(p, priority) for p in labels[hub_id]): continue - visited[hub_id] = priority - # record result if this hub is a target - if hub_id in target_ids: - prev = results.get(hub_id) - if prev is None or priority < prev[2]: - results[hub_id] = (path_node, acc_metrics, priority) + # prune labels dominated by this one + labels[hub_id] = [ + p for p in labels[hub_id] if not dominates(priority, p) + ] + labels[hub_id].append(priority) + + # limit labels + if len(labels[hub_id]) > MAX_LABELS_PER_HUB: + labels[hub_id] = labels[hub_id][:MAX_LABELS_PER_HUB] + # if this hub is a target add to results + if hub_id in remaining_targets: + results[hub_id].append((path_node, acc_metrics, priority)) + remaining_targets.remove(hub_id) + + # stop once all targets are found + if not remaining_targets: + break + + # depth guard if path_len >= max_segments: continue @@ -496,10 +567,12 @@ def _dijkstra_single_source( ): continue + # accumulate metrics new_acc_metrics = EdgeMetadata( transportMode=None, **acc_metrics.metrics, ) + for k, v in conn_metrics.metrics.items(): if isinstance(v, (int, float)): new_acc_metrics.metrics[k] = ( @@ -515,15 +588,28 @@ def _dijkstra_single_source( prev=path_node, ) - new_priority = self._compute_priority(new_path_node, new_acc_metrics, priority_spec) + new_priority = self._compute_priority( + new_path_node, new_acc_metrics, priority_spec + ) heapq.heappush( pq, - (new_priority, next(counter), new_path_node, new_acc_metrics), + ( + new_priority, + next(counter), + new_path_node, + new_acc_metrics, + ), ) - # strip priority from results (external behavior unchanged) - return {k: (v[0], v[1]) for k, v in results.items()} + # create final results + final_results: dict[str, tuple[PathNode, EdgeMetadata]] = {} + + for hub_id, entries in results.items(): + best = min(entries, key=lambda e: e[2]) + final_results[hub_id] = (best[0], best[1]) + + return final_results def _build_route( self, diff --git a/tests/unit/test_routegraph_init.py b/tests/unit/test_routegraph_init.py index 263131a..1576ea1 100644 --- a/tests/unit/test_routegraph_init.py +++ b/tests/unit/test_routegraph_init.py @@ -2,6 +2,11 @@ from unittest.mock import patch from multimodalrouter import RouteGraph from threading import Lock +import tempfile +import pandas as pd +import os +import contextlib +import io class TestRouteGraphInit(unittest.TestCase): @@ -83,8 +88,190 @@ def test_init_with_special_keys(self): destCoordKeys=['c', 'd'] ) - self.assertEqual(graph.sourceCoordKeys, {'a', 'b'}) - self.assertEqual(graph.destCoordKeys, {'c', 'd'}) + self.assertEqual(graph.sourceCoordKeys, ['a', 'b']) + self.assertEqual(graph.destCoordKeys, ['c', 'd']) + + def test_multi_dataset_unique_coord_keys_are_matched_deterministically(self): + """ + Ensure coordinate keys are matched deterministically across datasets + where each dataset uses different coordinate column names. + """ + with tempfile.TemporaryDirectory() as tmpdir: + + path1 = os.path.join(tmpdir, "air.csv") + path2 = os.path.join(tmpdir, "sea.csv") + + # Dataset 1 uses *_lat / *_lng + df1 = pd.DataFrame( + columns=[ + "source", + "destination", + "distance", + "source_lat", + "source_lng", + "destination_lat", + "destination_lng", + ], + data=[("A1", "B1", 5, 10, 20, 30, 40)], + ) + + # Dataset 2 uses *_y / *_x instead + df2 = pd.DataFrame( + columns=[ + "source", + "destination", + "distance", + "src_y", + "src_x", + "dst_y", + "dst_x", + ], + data=[("A2", "B2", 7, 100, 200, 300, 400)], + ) + + df1.to_csv(path1, index=False) + df2.to_csv(path2, index=False) + + graph = RouteGraph( + maxDistance=50, + transportModes={"AIR": "fly", "SEA": "ship"}, + dataPaths={"AIR": path1, "SEA": path2}, + compressed=False, + extraMetricsKeys=[], + drivingEnabled=False, + # NOTE: superset of all possible keys — order matters + # define source y and x sepperately + sourceCoordKeys=["source_lat", "src_y", "source_lng", "src_x"], + # define source coords and destination y and x separately + destCoordKeys=["destination_lat", "destination_lng", "dst_y", "dst_x"], + ) + + f = io.StringIO() + with contextlib.redirect_stdout(f), contextlib.redirect_stderr(f): + graph.build() + + # test dataset 1 + hub_a1 = graph.getHub("AIR", "A1") + hub_b1 = graph.getHub("AIR", "B1") + + self.assertEqual(hub_a1.coords, [10, 20]) + self.assertEqual(hub_b1.coords, [30, 40]) + + # test dataset 2 + hub_a2 = graph.getHub("SEA", "A2") + hub_b2 = graph.getHub("SEA", "B2") + + # Must preserve order from sourceCoordKeys/destCoordKeys + self.assertEqual(hub_a2.coords, [100, 200]) + self.assertEqual(hub_b2.coords, [300, 400]) + + def test_extreme_multi_dataset_all_matching_coord_keys_are_used_and_ordered(self): + """ + Stress test: + - multiple datasets + - overlapping + unique coord keys + - multiple matching coord columns -> higher dimensional coords + - scrambled column order + - superset matching with deterministic ordering + - multiple rows + """ + with tempfile.TemporaryDirectory() as tmpdir: + + air_path = os.path.join(tmpdir, "air.csv") + sea_path = os.path.join(tmpdir, "sea.csv") + rail_path = os.path.join(tmpdir, "rail.csv") + + # ---------------- AIR ---------------- + air_df = pd.DataFrame( + { + "source_lat": [10, 11], + "source_lng": [20, 21], + "destination_lat": [30, 31], + "destination_lng": [40, 41], + "source": ["A1", "A2"], + "destination": ["B1", "B2"], + "distance": [5, 6], + } + ).sample(frac=1, axis=1) + + sea_df = pd.DataFrame( + { + "src_y": [100, 101], + "src_x": [200, 201], + "dst_y": [300, 301], + "dst_x": [400, 401], + "source": ["S1", "S2"], + "destination": ["T1", "T2"], + "distance": [7, 8], + } + ).sample(frac=1, axis=1) + + # 4D coords (using both lat/lng and y/x) + rail_df = pd.DataFrame( + { + "source_lat": [1000], + "source_lng": [2000], + "src_y": [1], + "src_x": [2], + "destination_lat": [3000], + "destination_lng": [4000], + "dst_y": [3], + "dst_x": [4], + "source": ["R1"], + "destination": ["R2"], + "distance": [20], + } + ).sample(frac=1, axis=1) + + print(rail_df[:]) + + air_df.to_csv(air_path, index=False) + sea_df.to_csv(sea_path, index=False) + rail_df.to_csv(rail_path, index=False) + + graph = RouteGraph( + maxDistance=100, + transportModes={"AIR": "fly", "SEA": "ship", "RAIL": "rail"}, + dataPaths={"AIR": air_path, "SEA": sea_path, "RAIL": rail_path}, + compressed=False, + extraMetricsKeys=[], + drivingEnabled=False, + sourceCoordKeys=[ + "source_lat", + "source_lng", + "src_y", + "src_x", + ], + destCoordKeys=[ + "destination_lat", + "dst_y", + "destination_lng", + "dst_x", + ], + ) + + f = io.StringIO() + with contextlib.redirect_stdout(f), contextlib.redirect_stderr(f): + graph.build() + + self.assertEqual(graph.getHub("AIR", "A1").coords, [10, 20]) + self.assertEqual(graph.getHub("AIR", "A2").coords, [11, 21]) + self.assertEqual(graph.getHub("AIR", "B1").coords, [30, 40]) + self.assertEqual(graph.getHub("AIR", "B2").coords, [31, 41]) + + self.assertEqual(graph.getHub("SEA", "S1").coords, [100, 200]) + self.assertEqual(graph.getHub("SEA", "S2").coords, [101, 201]) + self.assertEqual(graph.getHub("SEA", "T1").coords, [300, 400]) + self.assertEqual(graph.getHub("SEA", "T2").coords, [301, 401]) + + self.assertEqual( + graph.getHub("RAIL", "R1").coords, + [1000, 2000, 1, 2], + ) + self.assertEqual( + graph.getHub("RAIL", "R2").coords, + [3000, 3, 4000, 4], + ) if __name__ == '__main__': diff --git a/tests/unit/test_routegraph_private_methods.py b/tests/unit/test_routegraph_private_methods.py index 4f74410..90475eb 100644 --- a/tests/unit/test_routegraph_private_methods.py +++ b/tests/unit/test_routegraph_private_methods.py @@ -1,5 +1,6 @@ import unittest from unittest.mock import patch +import warnings from multimodalrouter import RouteGraph, Hub import os import tempfile @@ -35,9 +36,14 @@ def setUpClass(cls): # remove the print output from build f = io.StringIO() - with contextlib.redirect_stdout(f), contextlib.redirect_stderr(f): + with (warnings.catch_warnings(), + contextlib.redirect_stdout(f), + contextlib.redirect_stderr(f)): + + warnings.simplefilter("ignore") cls.mainGraph.build() + @classmethod def tearDownClass(cls): # remove temp file diff --git a/tests/unit/test_routegraph_public_features.py b/tests/unit/test_routegraph_public_features.py index 8d8dd79..17d0db9 100644 --- a/tests/unit/test_routegraph_public_features.py +++ b/tests/unit/test_routegraph_public_features.py @@ -1,5 +1,6 @@ import unittest from unittest.mock import patch +import warnings from multimodalrouter import RouteGraph, Hub, Filter, EdgeMetadata, PathNode import os import tempfile @@ -20,7 +21,7 @@ def setUpClass(cls): columns=['source', 'destination', 'distance', 'source_lat', 'source_lng', 'destination_lat', 'destination_lng'], data=[('A', 'B', 2, 1, 1, 1, 3), ('C', 'D', 1, 2, 1, 1, 4), - ('B', 'D', 1, 3, 1, 1, 4)] + ('B', 'D', 1, 1, 3, 1, 4)] ) testDf.to_csv(cls.temp_file_path, index=False) @@ -1015,3 +1016,212 @@ def filter(self, start, end, edge, path): # long chain is rejected by filter; direct path wins lexicographically path_nodes = [n[0] for n in route.path] self.assertEqual(path_nodes, ['A', 'K']) + + def test_multi_target_pareto_prefix_preservation(self): + """ + Ensures that non-dominated prefixes are preserved + when searching multiple targets simultaneously. + """ + + rows = [ + # A -> B fast but expensive + ('A', 'B', 1, 0, 0, 1, 0), + + # A -> C slow but cheap + ('A', 'C', 5, 0, 0, 0, 5), + + # B only leads to T1 + ('B', 'T1', 100, 1, 0, 2, 0), + + # C only leads to T2 cheaply + ('C', 'T2', 1, 0, 5, 0, 6), + ] + + df = pd.DataFrame( + rows, + columns=[ + 'source', 'destination', 'distance', + 'source_lat', 'source_lng', + 'destination_lat', 'destination_lng', + ], + ) + + path = os.path.join(self.temp_dir.name, 'pareto_multi_target.csv') + df.to_csv(path, index=False) + + graph = RouteGraph( + maxDistance=200, + transportModes={'H': 'mv'}, + dataPaths={'H': path}, + drivingEnabled=False, + ) + + graph.build() + + routes = graph.find_shortest_paths( + start_id='A', + end_ids={'T1', 'T2'}, + allowed_modes=['mv'], + optimization_metric=['distance'], + verbose=True, + ) + + # assert that all targets are found + self.assertIn('T1', routes) + self.assertIn('T2', routes) + + # check paths + self.assertEqual( + [n[0] for n in routes['T1'].path], + ['A', 'B', 'T1'] + ) + self.assertEqual( + [n[0] for n in routes['T2'].path], + ['A', 'C', 'T2'] + ) + + def test_deferred_pareto_improvement(self): + """ + Path that is worse early but better later must survive. + """ + + rows = [ + ('A', 'B', 1, 0, 0, 1, 0), # fast, expensive later + ('A', 'C', 5, 0, 0, 0, 5), # slow, cheap later + ('B', 'D', 100, 1, 0, 2, 0), + ('C', 'D', 1, 0, 5, 0, 6), + ] + + df = pd.DataFrame(rows, columns=[ + 'source', 'destination', 'distance', + 'source_lat', 'source_lng', + 'destination_lat', 'destination_lng', + ]) + + path = os.path.join(self.temp_dir.name, 'deferred.csv') + df.to_csv(path, index=False) + + graph = RouteGraph( + maxDistance=200, + transportModes={'H': 'mv'}, + dataPaths={'H': path}, + drivingEnabled=False, + ) + + graph.build() + + route = graph.find_shortest_path( + 'A', + 'D', + allowed_modes=['mv'], + optimization_metric=['distance'], + ) + + # must be A -> C -> D, NOT A -> B -> D + self.assertEqual( + [n[0] for n in route.path], + ['A', 'C', 'D'] + ) + + def test_single_target_pareto_terminates_early(self): + """ + Regression test: + Single-target Pareto Dijkstra must terminate immediately + after the target is settled (no full frontier expansion). + """ + + rows = [] + + # create many equivalent paths + for i in range(20): + rows.append(('A', f'B{i}', 1, 0, 0, i, 0)) + rows.append((f'B{i}', 'C', 1, i, 0, 20, 0)) + + # add cycles to stress test + for i in range(20): + rows.append((f'B{i}', f'B{i}', 0, i, 0, i, 0)) # zero-cost self-loop + + df = pd.DataFrame( + columns=[ + 'source', 'destination', 'distance', + 'source_lat', 'source_lng', + 'destination_lat', 'destination_lng', + ], + data=rows, + ) + + path = os.path.join(self.temp_dir.name, 'pareto_single_target.csv') + df.to_csv(path, index=False) + + graph = RouteGraph( + maxDistance=100, + transportModes={'H': 'mv'}, + dataPaths={'H': path}, + drivingEnabled=False, + ) + graph.build() + + route = graph.find_shortest_path( + start_id='A', + end_id='C', + allowed_modes=['mv'], + optimization_metric=['distance'], + max_segments=10, + verbose=False, + ) + + # assert that a path was found + self.assertIsNotNone(route) + + # assert path is correct + hubs = [n[0] for n in route.path] + self.assertEqual(hubs[0], 'A') + self.assertEqual(hubs[-1], 'C') + + # assert path is optimal + self.assertEqual(route.totalMetrics.getMetric('distance'), 2) + + def test_zero_cost_cycle_does_not_explode(self): + """ + Regression test: + Zero-cost cycles must not cause infinite Pareto expansion. + """ + + rows = [ + ('A', 'B', 1, 0, 0, 1, 0), + ('B', 'C', 1, 1, 0, 2, 0), + ('C', 'B', 0, 2, 0, 1, 0), # zero-cost cycle + ] + + df = pd.DataFrame( + columns=[ + 'source', 'destination', 'distance', + 'source_lat', 'source_lng', + 'destination_lat', 'destination_lng', + ], + data=rows, + ) + + path = os.path.join(self.temp_dir.name, 'zero_cycle.csv') + df.to_csv(path, index=False) + + graph = RouteGraph( + maxDistance=100, + transportModes={'H': 'mv'}, + dataPaths={'H': path}, + drivingEnabled=False, + ) + + graph.build() + + route = graph.find_shortest_path( + start_id='A', + end_id='C', + allowed_modes=['mv'], + optimization_metric=['distance'], + max_segments=20, + verbose=False, + ) + + self.assertIsNotNone(route) + self.assertEqual(route.totalMetrics.getMetric('distance'), 2)