From 53a15d82a4f65c312eb95158d90c84a59903fe9f Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Fri, 13 Oct 2023 10:28:57 +0200 Subject: [PATCH] Improve handling of permissions around clock control. --- CHANGELOG.md | 4 + ntp-proto/src/algorithm/kalman/mod.rs | 35 +++--- ntp-proto/src/algorithm/mod.rs | 4 +- ntp-proto/src/clock.rs | 2 +- ntpd/bin/ntp-daemon.rs | 7 +- ntpd/src/daemon/system.rs | 154 ++++++++++++++++++-------- 6 files changed, 137 insertions(+), 69 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba8702de7..20da42467 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [unreleased] +### Added +- Server can now be run without permission to change the system clock so long + as no time sources are configured. + ### Changed - The sources section can be left out of the configuration now. - When no sources are configured, the daemon will merely state it won't change diff --git a/ntp-proto/src/algorithm/kalman/mod.rs b/ntp-proto/src/algorithm/kalman/mod.rs index a99908249..b486cd901 100644 --- a/ntp-proto/src/algorithm/kalman/mod.rs +++ b/ntp-proto/src/algorithm/kalman/mod.rs @@ -311,19 +311,13 @@ impl TimeSyncController Self { + ) -> Result { // Setup clock - clock - .disable_ntp_algorithm() - .expect("Unable to change system time"); - clock - .status_update(NtpLeapIndicator::Unknown) - .expect("Unable to update clock"); - clock - .set_frequency(0.0) - .expect("Unable to set system clock frequency"); - - KalmanClockController { + clock.disable_ntp_algorithm()?; + clock.status_update(NtpLeapIndicator::Unknown)?; + clock.set_frequency(0.0)?; + + Ok(KalmanClockController { peers: HashMap::new(), clock, synchronization_config, @@ -333,7 +327,7 @@ impl TimeSyncController Default for StateUpdate { } } -pub trait TimeSyncController { +pub trait TimeSyncController: Sized { type AlgorithmConfig: Debug + Copy + DeserializeOwned; /// Create a new clock controller controling the given clock @@ -54,7 +54,7 @@ pub trait TimeSyncController { synchronization_config: SynchronizationConfig, peer_defaults_config: SourceDefaultsConfig, algorithm_config: Self::AlgorithmConfig, - ) -> Self; + ) -> Result; /// Update used system config fn update_config( &mut self, diff --git a/ntp-proto/src/clock.rs b/ntp-proto/src/clock.rs index 88a6e1593..31c7fd80d 100644 --- a/ntp-proto/src/clock.rs +++ b/ntp-proto/src/clock.rs @@ -7,7 +7,7 @@ use crate::{ /// This needs to be a trait as a single system can have multiple clocks /// which need different implementation for steering and/or now. pub trait NtpClock: Clone + Send + 'static { - type Error: std::error::Error; + type Error: std::error::Error + Send + Sync; // Get current time fn now(&self) -> Result; diff --git a/ntpd/bin/ntp-daemon.rs b/ntpd/bin/ntp-daemon.rs index 8d12c563c..0b21dfcdd 100644 --- a/ntpd/bin/ntp-daemon.rs +++ b/ntpd/bin/ntp-daemon.rs @@ -1,6 +1,9 @@ #![forbid(unsafe_code)] +use std::process; + #[tokio::main] -async fn main() -> Result<(), Box> { - ntpd::daemon_main().await +async fn main() { + let result = ntpd::daemon_main().await; + process::exit(if result.is_ok() { 0 } else { 1 }); } diff --git a/ntpd/src/daemon/system.rs b/ntpd/src/daemon/system.rs index 833b2bb55..97ec34d23 100644 --- a/ntpd/src/daemon/system.rs +++ b/ntpd/src/daemon/system.rs @@ -97,15 +97,35 @@ pub async fn spawn( ); for peer_config in peer_configs { + // Force early clock controller initialization when peers are configured + system.clock_controller().map_err(|e| { + tracing::error!("Could not start clock controller: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; match peer_config { PeerConfig::Standard(cfg) => { - system.add_spawner(StandardSpawner::new(cfg.clone(), NETWORK_WAIT_PERIOD)); + system + .add_spawner(StandardSpawner::new(cfg.clone(), NETWORK_WAIT_PERIOD)) + .map_err(|e| { + tracing::error!("Could not spawn peer: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; } PeerConfig::Nts(cfg) => { - system.add_spawner(NtsSpawner::new(cfg.clone(), NETWORK_WAIT_PERIOD)); + system + .add_spawner(NtsSpawner::new(cfg.clone(), NETWORK_WAIT_PERIOD)) + .map_err(|e| { + tracing::error!("Could not spawn peer: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; } PeerConfig::Pool(cfg) => { - system.add_spawner(PoolSpawner::new(cfg.clone(), NETWORK_WAIT_PERIOD)); + system + .add_spawner(PoolSpawner::new(cfg.clone(), NETWORK_WAIT_PERIOD)) + .map_err(|e| { + tracing::error!("Could not spawn peer: {}", e); + std::io::Error::new(std::io::ErrorKind::Other, e) + })?; } } } @@ -153,7 +173,7 @@ struct System { peer_channels: PeerChannels, clock: C, - controller: KalmanClockController, + controller: Option>, // which timestamps to use (this is a hint, OS or hardware may ignore) enable_timestamps: EnableTimestamps, @@ -219,13 +239,8 @@ impl System { synchronization_config_receiver: synchronization_config_receiver.clone(), source_defaults_config_receiver: peer_defaults_config_receiver.clone(), }, - clock: clock.clone(), - controller: KalmanClockController::new( - clock, - synchronization_config, - peer_defaults_config, - synchronization_config.algorithm, - ), + clock, + controller: None, enable_timestamps, interface, }, @@ -242,7 +257,23 @@ impl System { ) } - fn add_spawner(&mut self, spawner: impl Spawner + Send + Sync + 'static) -> SpawnerId { + fn clock_controller(&mut self) -> Result<&mut KalmanClockController, C::Error> { + if self.controller.is_none() { + self.controller = Some(KalmanClockController::new( + self.clock.clone(), + self.synchronization_config, + self.peer_defaults_config, + self.synchronization_config.algorithm, + )?); + } + // Won't panic as the above if ensures controller contains something + Ok(self.controller.as_mut().unwrap()) + } + + fn add_spawner( + &mut self, + spawner: impl Spawner + Send + Sync + 'static, + ) -> Result { let (notify_tx, notify_rx) = mpsc::channel(MESSAGE_BUFFER_SIZE); let id = spawner.get_id(); let spawner_data = SystemSpawnerData { id, notify_tx }; @@ -250,7 +281,7 @@ impl System { self.spawners.push(spawner_data); let spawn_tx = self.spawn_tx.clone(); tokio::spawn(async move { spawner.run(spawn_tx, notify_rx).await }); - id + Ok(id) } async fn run(&mut self, mut wait: Pin<&mut SingleshotSleep>) -> std::io::Result<()> { @@ -275,7 +306,9 @@ impl System { tracing::warn!(msg); } Some(spawn_event) => { - self.handle_spawn_event(spawn_event).await; + if let Err(e) = self.handle_spawn_event(spawn_event).await { + tracing::error!("Could not spawn peer: {}", e); + } } } } @@ -295,11 +328,13 @@ impl System { fn handle_config_update(&mut self) { let synchronization_config = *self.synchronization_config_receiver.borrow_and_update(); let peer_defaults_config = *self.peer_defaults_config_receiver.borrow_and_update(); - self.controller.update_config( - synchronization_config, - peer_defaults_config, - synchronization_config.algorithm, - ); + if let Some(controller) = self.controller.as_mut() { + controller.update_config( + synchronization_config, + peer_defaults_config, + synchronization_config.algorithm, + ); + } self.synchronization_config = synchronization_config; self.peer_defaults_config = peer_defaults_config; } @@ -307,8 +342,10 @@ impl System { fn handle_timer(&mut self, wait: &mut Pin<&mut SingleshotSleep>) { tracing::debug!("Timer expired"); // note: local needed for borrow checker - let update = self.controller.time_update(); - self.handle_algorithm_state_update(update, wait); + if let Some(controller) = self.controller.as_mut() { + let update = controller.time_update(); + self.handle_algorithm_state_update(update, wait); + } } async fn handle_peer_update( @@ -320,13 +357,19 @@ impl System { match msg { MsgForSystem::MustDemobilize(index) => { - self.handle_peer_demobilize(index).await; + if let Err(e) = self.handle_peer_demobilize(index).await { + unreachable!("Could not demobilize peer: {}", e); + }; } MsgForSystem::NewMeasurement(index, snapshot, measurement) => { - self.handle_peer_measurement(index, snapshot, measurement, wait); + if let Err(e) = self.handle_peer_measurement(index, snapshot, measurement, wait) { + unreachable!("Could not process peer measurement: {}", e); + } } MsgForSystem::UpdatedSnapshot(index, snapshot) => { - self.handle_peer_snapshot(index, snapshot); + if let Err(e) = self.handle_peer_snapshot(index, snapshot) { + unreachable!("Could not update peer snapshot: {}", e); + } } MsgForSystem::NetworkIssue(index) => { self.handle_peer_network_issue(index).await?; @@ -346,7 +389,9 @@ impl System { } async fn handle_peer_network_issue(&mut self, index: PeerId) -> std::io::Result<()> { - self.controller.peer_remove(index); + self.clock_controller() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))? + .peer_remove(index); // Restart the peer reusing its configuration. let state = self.peers.remove(&index).unwrap(); @@ -368,7 +413,9 @@ impl System { } async fn handle_peer_unreachable(&mut self, index: PeerId) -> std::io::Result<()> { - self.controller.peer_remove(index); + self.clock_controller() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))? + .peer_remove(index); // Restart the peer reusing its configuration. let state = self.peers.remove(&index).unwrap(); @@ -389,14 +436,17 @@ impl System { Ok(()) } - fn handle_peer_snapshot(&mut self, index: PeerId, snapshot: PeerSnapshot) { - self.controller.peer_update( - index, - snapshot - .accept_synchronization(self.synchronization_config.local_stratum) - .is_ok(), - ); + fn handle_peer_snapshot( + &mut self, + index: PeerId, + snapshot: PeerSnapshot, + ) -> Result<(), C::Error> { + let usable = snapshot + .accept_synchronization(self.synchronization_config.local_stratum) + .is_ok(); + self.clock_controller()?.peer_update(index, usable); self.peers.get_mut(&index).unwrap().snapshot = Some(snapshot); + Ok(()) } fn handle_peer_measurement( @@ -405,11 +455,16 @@ impl System { snapshot: PeerSnapshot, measurement: ntp_proto::Measurement, wait: &mut Pin<&mut SingleshotSleep>, - ) { - self.handle_peer_snapshot(index, snapshot); + ) -> Result<(), C::Error> { + if let Err(e) = self.handle_peer_snapshot(index, snapshot) { + panic!("Could not handle peer snapshot: {}", e); + } // note: local needed for borrow checker - let update = self.controller.peer_measurement(index, measurement); + let update = self + .clock_controller()? + .peer_measurement(index, measurement); self.handle_algorithm_state_update(update, wait); + Ok(()) } fn handle_algorithm_state_update( @@ -440,8 +495,8 @@ impl System { } } - async fn handle_peer_demobilize(&mut self, index: PeerId) { - self.controller.peer_remove(index); + async fn handle_peer_demobilize(&mut self, index: PeerId) -> Result<(), C::Error> { + self.clock_controller()?.peer_remove(index); let state = self.peers.remove(&index).unwrap(); // Restart the peer reusing its configuration. @@ -458,13 +513,14 @@ impl System { .await .expect("Could not notify spawner"); } + Ok(()) } async fn create_peer( &mut self, spawner_id: SpawnerId, mut params: PeerCreateParameters, - ) -> PeerId { + ) -> Result { let source_id = params.id; info!(source_id=?source_id, addr=?params.addr, spawner=?spawner_id, "new peer"); self.peers.insert( @@ -476,7 +532,7 @@ impl System { spawner_id, }, ); - self.controller.peer_add(source_id); + self.clock_controller()?.peer_add(source_id); PeerTask::spawn( source_id, @@ -501,15 +557,16 @@ impl System { let _ = s.notify_tx.send(SystemEvent::PeerRegistered(params)).await; } - source_id + Ok(source_id) } - async fn handle_spawn_event(&mut self, event: SpawnEvent) { + async fn handle_spawn_event(&mut self, event: SpawnEvent) -> Result<(), C::Error> { match event.action { SpawnAction::Create(params) => { - self.create_peer(event.id, params).await; + self.create_peer(event.id, params).await?; } } + Ok(()) } async fn add_server(&mut self, config: ServerConfig) { @@ -534,7 +591,11 @@ impl System { self.peers.iter().map(|(index, data)| { data.snapshot .map(|snapshot| { - if let Some(timedata) = self.controller.peer_snapshot(*index) { + if let Some(timedata) = self + .controller + .as_ref() + .and_then(|c| c.peer_snapshot(*index)) + { ObservablePeerState::Observable(ObservedPeerState { timedata, unanswered_polls: snapshot.reach.unanswered_polls(), @@ -642,7 +703,7 @@ mod tests { SingleshotSleep::new_disabled(tokio::time::sleep(std::time::Duration::from_secs(0))); tokio::pin!(wait); - let id = system.add_spawner(DummySpawner::empty()); + let id = system.add_spawner(DummySpawner::empty()).unwrap(); let mut indices = vec![]; @@ -653,7 +714,8 @@ mod tests { id, PeerCreateParameters::from_new_ip_and_port(format!("127.0.0.{i}"), 123), ) - .await, + .await + .unwrap(), ); }