From 514544c693404f5dcc41d056875a2dec5e47c5e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Julian=20B=C3=BCttner?= Date: Thu, 29 Feb 2024 00:32:15 +0100 Subject: [PATCH] Get objects and dependencies by callback --- src/reader.rs | 3 + src/reader/get_objs_and_deps_callback.rs | 216 +++++++++++++++++++++++ 2 files changed, 219 insertions(+) create mode 100644 src/reader/get_objs_and_deps_callback.rs diff --git a/src/reader.rs b/src/reader.rs index da3ac6b1..2043b9d4 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -21,6 +21,9 @@ use std::convert::From; use std::io::{self, Read}; use std::iter; +mod get_objs_and_deps_callback; +pub use get_objs_and_deps_callback::GetObjsContinue; + /// Trait to allow generic objects (not just BTreeMap) in some methods. pub trait StoreObjs { /// Insert given object at given key index. diff --git a/src/reader/get_objs_and_deps_callback.rs b/src/reader/get_objs_and_deps_callback.rs new file mode 100644 index 00000000..1c6e4bbf --- /dev/null +++ b/src/reader/get_objs_and_deps_callback.rs @@ -0,0 +1,216 @@ +use std::collections::{HashMap, HashSet}; +use std::ops::Deref; +use std::rc::Rc; + +use super::*; + +struct PendingObj { + obj: Rc, + found_deps: HashSet>, + missing_ids: HashSet, +} +impl PendingObj { + fn new(obj: Rc) -> Self { + Self { + found_deps: HashSet::new(), + missing_ids: obj.dependencies().into_iter().collect(), + obj, + } + } + fn to_obj_and_dependencies(&self) -> ObjAndDependencies { + ObjAndDependencies { + dependencies: self + .found_deps + .iter() + .map(|obj| (obj.id(), obj.deref().clone())) + .collect(), + obj: (*self.obj).clone(), + } + } + fn insert_if_missing(&mut self, other: Rc) { + // TODO: the flamechart shows this method does _horribly_. + // Must be optimized. + if !self.missing_ids.remove(&other.id()) { + // Object was not wanted: noop + return; + } + // Object wanted, insert and update dependencies + for id in other.dependencies() { + self.missing_ids.insert(id); + } + for dep in self.found_deps.iter() { + self.missing_ids.remove(&dep.id()); + } + } + fn complete(&self) -> bool { + self.missing_ids.is_empty() + } +} + +trait ObjDependencyIds { + fn dependencies(&self) -> Vec; +} +impl ObjDependencyIds for OsmObj { + /// Get IDs of first level dependencies + fn dependencies(&self) -> Vec { + let it_way = self + .way() + .into_iter() + .flat_map(|w| &w.nodes) + .cloned() + .map(OsmId::Node); + let it_rel = self + .relation() + .into_iter() + .flat_map(|w| &w.refs) + .map(|r| r.member); + it_way.chain(it_rel).collect() + } +} + +fn insert_into_pending(pending: &mut HashMap, obj: Rc) { + for (_id, pending) in pending.iter_mut() { + pending.insert_if_missing(obj.clone()); + } +} + +fn pop_finish( + pending: &mut HashMap, +) -> impl Iterator { + let result: Vec = pending + .values() + .filter(|p| p.complete()) + .map(PendingObj::to_obj_and_dependencies) + .collect(); + pending.retain(|_key, p| !p.complete()); + result.into_iter() +} + +/// Object fulfilling a search predicate, +/// containing all recursive dependencies. +/// It will be the only argument for the provided callback in [get_objs_and_deps_callback]. +pub struct ObjAndDependencies { + /// The main object fulfilling the search predicate. + pub obj: OsmObj, + /// All recusrive dependencies of obj. + pub dependencies: HashMap, +} + +/// Tell [get_objs_and_deps_callback] whether to continue +/// it's search or not. +#[derive(PartialEq, Eq)] +pub enum GetObjsContinue { + /// Instructs get_objs_and_deps_callback to continue. + Continue, + /// Instructs get_objs_and_deps_callback to stop and return. + Stop, +} + +impl OsmPbfReader { + /// If files get too big and the memory is limited, + /// this method allows to process all objects in an + /// efficient fashion. max_loaded_object is an upper bound, of how many + /// predicate matching objects are to be kept in memory. + /// Keep in mind, that every predicate matching object also keeps + /// their dependencies until they are complete. + /// Higher values require more memory, but less passes over the file. + pub fn get_objs_and_deps_callback( + &mut self, + mut pred: F, + mut callback: C, + max_loaded_objects: usize, + ) -> Result<()> + where + R: io::Seek, + F: FnMut(&OsmObj) -> bool, + C: FnMut(ObjAndDependencies) -> GetObjsContinue, + { + let mut last_id_of_marked_as_pending: Option = None; + // For pending object we want something like a vector, + // but to pop in O(1). We can't take a hash map, because we + // might alter the object. + let mut pending_objects: HashMap = HashMap::new(); + let mut processed_counter: usize = 0; + + // How often to pop the completed objects. + const PROCESSED_MODULO: usize = 128; + + 'outer: loop { + self.rewind()?; + let mut obj_iter = self.iter(); + + // Phase 1: Only fetch dependencies. + // We could, for example, be in the second round, starting from the beginning, + // only completing pending objects, because there were too many in memory. + for obj in obj_iter.by_ref() { + let obj = Rc::new(obj?); + insert_into_pending(&mut pending_objects, obj.clone()); + if processed_counter % PROCESSED_MODULO == 0 { + for obj in pop_finish(&mut pending_objects) { + if callback(obj) == GetObjsContinue::Stop { + break 'outer; + } + } + } + processed_counter += 1; + if last_id_of_marked_as_pending.is_none() { + break; + } + if Some(obj.id()) == last_id_of_marked_as_pending { + break; + } + } + + // Phase 2: Continue collecting new pending objects matching the predicate, until too many pending. + // The first phase is complete and potentially freed space to add more pending objects. + // We continue to complete pending objects. + for obj in obj_iter.by_ref() { + let obj = Rc::new(obj?); + insert_into_pending(&mut pending_objects, obj.clone()); + if processed_counter % PROCESSED_MODULO == 0 { + for obj in pop_finish(&mut pending_objects) { + if callback(obj) == GetObjsContinue::Stop { + break 'outer; + } + } + } + processed_counter += 1; + last_id_of_marked_as_pending = Some(obj.id()); + if pred(&obj) { + pending_objects.insert(obj.id(), PendingObj::new(obj)); + } + if pending_objects.len() >= max_loaded_objects { + break; + } + } + + // Phase 3: Continue while only completing pending objects. + // There are many pending objects again, and we only want to complete them / + // reduce the count. + // TODO: is this phase necessary, or are dependencies _always_ listed before + // there parent? + for obj in obj_iter.by_ref() { + let obj = Rc::new(obj?); + insert_into_pending(&mut pending_objects, obj.clone()); + if processed_counter % PROCESSED_MODULO == 0 { + for obj in pop_finish(&mut pending_objects) { + if callback(obj) == GetObjsContinue::Stop { + break 'outer; + } + } + } + processed_counter += 1; + } + + for obj in pop_finish(&mut pending_objects) { + if callback(obj) == GetObjsContinue::Stop { + break 'outer; + } + } + if pending_objects.is_empty() { + break; + } + } + Ok(()) + } +}