Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "multimodalrouter"
version = "0.1.13"
version = "0.1.16"
description = "A graph-based routing library for dynamic routing."
readme = "README.md"
license = { file = "LICENSE.md" }
Expand Down
148 changes: 117 additions & 31 deletions src/multimodalrouter/graph/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import pandas as pd
from .dataclasses import Hub, EdgeMetadata, OptimizationMetric, Route, Filter, VerboseRoute, PathNode
from threading import Lock
from collections import deque
from collections import defaultdict, deque
from itertools import count
import warnings


class RouteGraph:
Expand All @@ -36,8 +37,8 @@ def __init__(
# a list of coordinate names for the destination coords in the datasets (name to dataset matching is automatic)
destCoordKeys: list[str] = ["destination_lat", "destination_lng"],
):
self.sourceCoordKeys = set(sourceCoordKeys)
self.destCoordKeys = set(destCoordKeys)
self.sourceCoordKeys = sourceCoordKeys
self.destCoordKeys = destCoordKeys

self.compressed = compressed
self.extraMetricsKeys = extraMetricsKeys
Expand Down Expand Up @@ -258,8 +259,8 @@ def _generateHubs(self):
data = self._loadData(hubType)
added = set()

thisSourceKeys = self.sourceCoordKeys & set(data.columns)
thisDestinationKeys = self.destCoordKeys & set(data.columns)
thisSourceKeys = [k for k in self.sourceCoordKeys if k in data.columns]
thisDestinationKeys = [k for k in self.destCoordKeys if k in data.columns]

# get required and extra columns
required_cols = {
Expand All @@ -270,25 +271,48 @@ def _generateHubs(self):
}

# collect extra data from the dataset columns that are not required but marked as extra
extra_metric_cols = []
for m in self.extraMetricsKeys:
if m not in required_cols:
try:
extra_metric_cols.append(m)
except KeyError:
continue
extra_metric_cols = [
m for m in self.extraMetricsKeys
if m in data.columns and m not in required_cols
]

for row in tqdm(data.itertuples(index=False), desc=f"Generating {hubType} Hubs", unit="hub"):
# create hubs if they don't exist
if row.source not in added:
hub = Hub(coords=[getattr(row, k) for k in thisSourceKeys], id=row.source, hubType=hubType)
self.addHub(hub)
added.add(row.source)
elif row.source not in self.Graph[hubType].keys():
other_type = self.getHubById(row.source)
warnings.warn(
f"Hub {row.source}, with type {hubType}, already exists \
as a {other_type.hubType} hub and will be skipped. \
\n If you want to add it as a {hubType} hub, please delete or rename the other hub first."
)
elif self.Graph[hubType][row.source].coords != [getattr(row, k) for k in thisSourceKeys]:
warnings.warn(
f"Hub {row.source}, with type {hubType}, was found with two different coordinates. \
\nThis may be due to a data error or an ordering error in the source/dest-CoordKeys.\
\nThe graph will ignore this hub and keep only the first instance."
)

if row.destination not in added:
hub = Hub(coords=[getattr(row, k) for k in thisDestinationKeys], id=row.destination, hubType=hubType)
self.addHub(hub)
added.add(row.destination)
elif row.source not in self.Graph[hubType].keys():
other_type = self.getHubById(row.source)
warnings.warn(
f"Hub {row.source}, with type {hubType}, already exists \
as a {other_type.hubType} hub and will be skipped. \
\n If you want to add it as a {hubType} hub, please delete or rename the other hub first."
)
elif self.Graph[hubType][row.source].coords != [getattr(row, k) for k in thisSourceKeys]:
warnings.warn(
f"Hub {row.source}, with type {hubType}, was found with two different coordinates. \
\nThis may be due to a data error or an ordering error in the source/dest-CoordKeys.\
\nThe graph will ignore this hub and keep only the first instance."
)

# get extra metrics
extra_metrics = {
Expand Down Expand Up @@ -411,15 +435,37 @@ def _dijkstra_single_source(
self,
start_id: str,
target_ids: set[str],
allowed_modes: list[str],
allowed_modes: list[str] | None,
optimization_metric: OptimizationMetric | tuple,
max_segments: int,
custom_filter: Filter | None,
):
"""
Pareto Dijkstra with safety guarantees:
- Correct for single and multi-target
- Guaranteed termination
- Cycle-safe
"""

counter = count()

# create the priority spec
priority_spec = self._build_priority_spec(optimization_metric)

# ensure hops is in the priority spec (for safety)
if "hops" not in priority_spec:
priority_spec = (*priority_spec, "hops")

def dominates(p1: tuple, p2: tuple) -> bool:
"""Return True if p1 dominates p2 (<= all, < at least one)."""
strictly_better = False
for a, b in zip(p1, p2):
if a > b:
return False
if a < b:
strictly_better = True
return strictly_better

pq: list[tuple[tuple, int, PathNode, EdgeMetadata]] = []

start_metrics = EdgeMetadata(
Expand All @@ -431,21 +477,33 @@ def _dijkstra_single_source(
}
)

# create initial node
start_path = PathNode(
hub_id=start_id,
mode="",
edge=EdgeMetadata(),
prev=None,
)

start_priority = self._compute_priority(start_path, start_metrics, priority_spec)
heapq.heappush(pq, (start_priority, next(counter), start_path, start_metrics))
start_priority = self._compute_priority(
start_path, start_metrics, priority_spec
)

# best lexicographic priority seen per hub
visited: dict[str, tuple] = {}
heapq.heappush(
pq, (start_priority, next(counter), start_path, start_metrics)
)

# hub_id: list of non dominated labels
labels: dict[str, list[tuple]] = defaultdict(list)

# target_id: path
results: dict[str, list[tuple[PathNode, EdgeMetadata, tuple]]] = defaultdict(list)

# best result per target
results: dict[str, tuple[PathNode, EdgeMetadata, tuple]] = {}
# remaining target tracker for early termination
remaining_targets = set(target_ids)

# limit labels per hub
MAX_LABELS_PER_HUB = 50

if allowed_modes is None:
allowed_modes = list(self.TransportModes.values())
Expand All @@ -457,17 +515,30 @@ def _dijkstra_single_source(
hub_id = path_node.hub_id
path_len = path_node.length

prev_priority = visited.get(hub_id)
if prev_priority is not None and prev_priority <= priority:
# reject dominated labels
if any(dominates(p, priority) for p in labels[hub_id]):
continue
visited[hub_id] = priority

# record result if this hub is a target
if hub_id in target_ids:
prev = results.get(hub_id)
if prev is None or priority < prev[2]:
results[hub_id] = (path_node, acc_metrics, priority)
# prune labels dominated by this one
labels[hub_id] = [
p for p in labels[hub_id] if not dominates(priority, p)
]
labels[hub_id].append(priority)

# limit labels
if len(labels[hub_id]) > MAX_LABELS_PER_HUB:
labels[hub_id] = labels[hub_id][:MAX_LABELS_PER_HUB]

# if this hub is a target add to results
if hub_id in remaining_targets:
results[hub_id].append((path_node, acc_metrics, priority))
remaining_targets.remove(hub_id)

# stop once all targets are found
if not remaining_targets:
break

# depth guard
if path_len >= max_segments:
continue

Expand Down Expand Up @@ -496,10 +567,12 @@ def _dijkstra_single_source(
):
continue

# accumulate metrics
new_acc_metrics = EdgeMetadata(
transportMode=None,
**acc_metrics.metrics,
)

for k, v in conn_metrics.metrics.items():
if isinstance(v, (int, float)):
new_acc_metrics.metrics[k] = (
Expand All @@ -515,15 +588,28 @@ def _dijkstra_single_source(
prev=path_node,
)

new_priority = self._compute_priority(new_path_node, new_acc_metrics, priority_spec)
new_priority = self._compute_priority(
new_path_node, new_acc_metrics, priority_spec
)

heapq.heappush(
pq,
(new_priority, next(counter), new_path_node, new_acc_metrics),
(
new_priority,
next(counter),
new_path_node,
new_acc_metrics,
),
)

# strip priority from results (external behavior unchanged)
return {k: (v[0], v[1]) for k, v in results.items()}
# create final results
final_results: dict[str, tuple[PathNode, EdgeMetadata]] = {}

for hub_id, entries in results.items():
best = min(entries, key=lambda e: e[2])
final_results[hub_id] = (best[0], best[1])

return final_results

def _build_route(
self,
Expand Down
Loading
Loading