diff --git a/configuration/bambam-omf/travel-mode-filter.json b/configuration/bambam-omf/travel-mode-filter.json index 4cac566..8bbbe90 100644 --- a/configuration/bambam-omf/travel-mode-filter.json +++ b/configuration/bambam-omf/travel-mode-filter.json @@ -42,6 +42,7 @@ ], "island_algorithm_configuration": { "min_distance": 2, - "distance_unit": "miles" + "distance_unit": "miles", + "parallel_execution": false } } \ No newline at end of file diff --git a/rust/bambam-omf/src/app/network.rs b/rust/bambam-omf/src/app/network.rs index b263271..6fbccd1 100644 --- a/rust/bambam-omf/src/app/network.rs +++ b/rust/bambam-omf/src/app/network.rs @@ -39,6 +39,7 @@ impl From<&NetworkEdgeListConfiguration> for SegmentAccessRestrictionWhen { pub struct IslandDetectionAlgorithmConfiguration { pub min_distance: f64, pub distance_unit: DistanceUnit, + pub parallel_execution: bool, } /// runs an OMF network import using the provided configuration. diff --git a/rust/bambam-omf/src/graph/component_algorithm.rs b/rust/bambam-omf/src/graph/component_algorithm.rs index 8b97173..225f9f5 100644 --- a/rust/bambam-omf/src/graph/component_algorithm.rs +++ b/rust/bambam-omf/src/graph/component_algorithm.rs @@ -2,7 +2,7 @@ use std::collections::{HashSet, VecDeque}; use geo::{line_string, Haversine, Length, Point}; use indexmap::IndexMap; -use kdam::tqdm; +use kdam::{tqdm, BarExt}; use rayon::prelude::*; use routee_compass_core::model::{ network::{Edge, EdgeId, EdgeList, EdgeListId, Vertex, VertexId}, @@ -22,6 +22,7 @@ pub fn island_detection_algorithm( vertices: &[Vertex], distance_threshold: f64, distance_threshold_unit: DistanceUnit, + parallel_execution: bool, ) -> Result, OvertureMapsCollectionError> { let forward_adjacency: DenseAdjacencyList = build_adjacency(edge_lists, vertices.len(), true) .map_err(|s| { @@ -30,28 +31,127 @@ pub fn island_detection_algorithm( )) })?; - let island_edges: Result, _> = edge_lists - .par_iter() - .flat_map(|&el| el.0.par_iter()) - .filter_map(|edge| { - match is_component_island_parallel( - edge, - distance_threshold, - distance_threshold_unit, - edge_lists, - vertices, - &forward_adjacency, - ) { - Ok(true) => Some(Ok((edge.edge_list_id, edge.edge_id))), - Ok(false) => None, - Err(e) => Some(Err(e)), + let island_edges: Result, _> = if parallel_execution { + edge_lists + .par_iter() + .flat_map(|&el| el.0.par_iter()) + .filter_map(|edge| { + match is_component_island_parallel( + edge, + distance_threshold, + distance_threshold_unit, + edge_lists, + vertices, + &forward_adjacency, + ) { + Ok(true) => Some(Ok((edge.edge_list_id, edge.edge_id))), + Ok(false) => None, + Err(e) => Some(Err(e)), + } + }) + .collect() + } else { + // Progress bar + let total_edges = edge_lists.iter().map(|el| el.len()).sum::(); + let mut pb = tqdm!( + total = total_edges, + desc = "computing components - scanning edges" + ); + + // Initialization + let mut visited = HashSet::<(EdgeListId, EdgeId)>::new(); + let mut queue = VecDeque::<(EdgeListId, EdgeId)>::new(); + let mut flagged = Vec::<(EdgeListId, EdgeId)>::new(); + + // Main Loop + // Each non-skipped iteration of the large loop is a separate component + for start_edge in edge_lists.iter().flat_map(|el| el.edges()) { + // NOTE: Due to directionality and the fact that I am using only forward connectivity + // it is necessary to have two `if visited contains` checks. + if visited.contains(&(start_edge.edge_list_id, start_edge.edge_id)) { + continue; + } + + // Initialize the component + queue.push_back((start_edge.edge_list_id, start_edge.edge_id)); + let mut max_distance_reached = uom_length::new::(0.0); + let mut component = Vec::<(EdgeListId, EdgeId)>::new(); + let start_midpoint = compute_midpoint(start_edge, vertices); + + // Loop through the queue (explore the component) + loop { + if let Some((current_el_id, current_e_id)) = queue.pop_front() { + if visited.contains(&(current_el_id, current_e_id)) { + continue; + } + component.push((current_el_id, current_e_id)); + let current_edge = edge_lists[current_el_id.0].0[current_e_id.0]; + + let current_distance = is_component_island_sequential( + ¤t_edge, + start_midpoint, + &mut visited, + &mut queue, + vertices, + &forward_adjacency, + ); + + // Update the max_distance + let current_distance_uom = + uom_length::new::(current_distance as f64); + max_distance_reached = max_distance_reached.max(current_distance_uom); + + // Update bar + if let Err(e) = pb.update(1) { + log::warn!("error during update of progress bar: {e}") + }; + } else { + break; + }; } - }) - .collect(); + + // At the end, flag all the edges in the component for deletion + if max_distance_reached < distance_threshold_unit.to_uom(distance_threshold) { + flagged.append(&mut component); + } + } + + eprintln!(); + Ok(flagged) + }; island_edges } +/// returns the f32 distance in meters from the current edge midpoint to the initial edge midpoint. +fn is_component_island_sequential( + edge: &Edge, + initial_midpoint: Point, + visited: &mut HashSet<(EdgeListId, EdgeId)>, + queue: &mut VecDeque<(EdgeListId, EdgeId)>, + vertices: &[Vertex], + adjacency: &DenseAdjacencyList, +) -> f32 { + let (edge_list_id, edge_id) = (edge.edge_list_id, edge.edge_id); + + // Update counter + let current_midpoint = compute_midpoint(edge, vertices); + let current_distance_to_start_meters = + Haversine.length(&line_string![initial_midpoint.0, current_midpoint.0]); + + // get all neighbors, add them to queue + let outward_edges: Vec<&(EdgeListId, EdgeId)> = + adjacency[edge.dst_vertex_id.0].keys().collect(); + for (edge_list_id, edge_id) in outward_edges { + queue.push_back((*edge_list_id, *edge_id)); + } + + // mark as visited + visited.insert((edge_list_id, edge_id)); + + current_distance_to_start_meters +} + /// parallelizable implementation fn is_component_island_parallel( edge: &Edge, diff --git a/rust/bambam-omf/src/graph/omf_graph.rs b/rust/bambam-omf/src/graph/omf_graph.rs index a566ac1..129f3f7 100644 --- a/rust/bambam-omf/src/graph/omf_graph.rs +++ b/rust/bambam-omf/src/graph/omf_graph.rs @@ -172,6 +172,7 @@ impl OmfGraphVectorized { &vertices, algorithm_config.min_distance, algorithm_config.distance_unit, + algorithm_config.parallel_execution, )?; // Refactor Vec into Hashmap