Skip to content

Commit c8f911c

Browse files
davidv1992rnijveld
authored andcommitted
Moved some of the functionality of system into ntp-proto.
1 parent 5962e69 commit c8f911c

File tree

5 files changed

+238
-170
lines changed

5 files changed

+238
-170
lines changed

ntp-proto/src/algorithm/kalman/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::HashMap, fmt::Debug, hash::Hash};
1+
use std::{collections::HashMap, fmt::Debug, hash::Hash, time::Duration};
22

33
use tracing::{error, info, instrument};
44

@@ -233,7 +233,7 @@ impl<C: NtpClock, PeerID: Hash + Eq + Copy + Debug> KalmanClockController<C, Pee
233233
}
234234
}
235235

236-
fn steer_offset(&mut self, change: f64, freq_delta: f64) -> Option<NtpTimestamp> {
236+
fn steer_offset(&mut self, change: f64, freq_delta: f64) -> Option<Duration> {
237237
if change.abs() > self.algo_config.step_threshold {
238238
// jump
239239
self.check_offset_steer(change);
@@ -251,13 +251,14 @@ impl<C: NtpClock, PeerID: Hash + Eq + Copy + Debug> KalmanClockController<C, Pee
251251
.algo_config
252252
.slew_maximum_frequency_offset
253253
.min(change.abs() / self.algo_config.slew_minimum_duration);
254-
let duration = NtpDuration::from_seconds(change.abs() / freq);
254+
let duration = Duration::from_secs_f64(change.abs() / freq);
255255
info!(
256256
"Slewing by {}ms over {}s",
257257
change * 1e3,
258-
duration.to_seconds()
258+
duration.as_secs_f64(),
259259
);
260-
Some(self.change_desired_frequency(-freq * change.signum(), freq_delta) + duration)
260+
self.change_desired_frequency(-freq * change.signum(), freq_delta);
261+
Some(duration)
261262
}
262263
}
263264

ntp-proto/src/algorithm/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{fmt::Debug, hash::Hash};
1+
use std::{fmt::Debug, hash::Hash, time::Duration};
22

33
use serde::{de::DeserializeOwned, Deserialize, Serialize};
44

@@ -29,7 +29,7 @@ pub struct StateUpdate<PeerID: Eq + Copy + Debug> {
2929
// Update to the used peers, if any
3030
pub used_peers: Option<Vec<PeerID>>,
3131
// Requested timestamp for next non-measurement update
32-
pub next_update: Option<NtpTimestamp>,
32+
pub next_update: Option<Duration>,
3333
}
3434

3535
// Note: this default implementation is neccessary since the

ntp-proto/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ mod exports {
6868
FilterAction, FilterList, IpSubnet, Server, ServerAction, ServerConfig, ServerReason,
6969
ServerResponse, ServerStatHandler, SubnetParseError,
7070
};
71-
pub use super::system::{SystemSnapshot, TimeSnapshot};
71+
pub use super::system::{System, SystemSnapshot, TimeSnapshot};
7272
#[cfg(feature = "__internal-fuzz")]
7373
pub use super::time_types::fuzz_duration_from_seconds;
7474
pub use super::time_types::{

ntp-proto/src/system.rs

Lines changed: 154 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
11
use serde::{Deserialize, Serialize};
2+
use std::collections::HashMap;
3+
use std::net::IpAddr;
4+
use std::sync::Arc;
5+
use std::time::Duration;
6+
use std::{fmt::Debug, hash::Hash};
27

38
#[cfg(feature = "ntpv5")]
49
use crate::packet::v5::server_reference_id::{BloomFilter, ServerId};
510
#[cfg(feature = "ntpv5")]
611
use crate::peer::ProtocolVersion;
712
use crate::{
8-
config::SynchronizationConfig,
13+
algorithm::{KalmanClockController, ObservablePeerTimedata, StateUpdate, TimeSyncController},
14+
clock::NtpClock,
15+
config::{SourceDefaultsConfig, SynchronizationConfig},
916
identifiers::ReferenceId,
1017
packet::NtpLeapIndicator,
11-
peer::PeerSnapshot,
18+
peer::{Measurement, PeerSnapshot},
1219
time_types::{NtpDuration, PollInterval},
1320
};
1421

@@ -106,9 +113,153 @@ impl Default for SystemSnapshot {
106113
}
107114
}
108115

116+
pub struct System<C: NtpClock, PeerId: Hash + Eq + Copy + Debug> {
117+
synchronization_config: SynchronizationConfig,
118+
peer_defaults_config: SourceDefaultsConfig,
119+
system: SystemSnapshot,
120+
ip_list: Arc<[IpAddr]>,
121+
122+
peers: HashMap<PeerId, Option<PeerSnapshot>>,
123+
124+
clock: C,
125+
controller: Option<KalmanClockController<C, PeerId>>,
126+
}
127+
128+
impl<C: NtpClock, PeerId: Hash + Eq + Copy + Debug> System<C, PeerId> {
129+
pub fn new(
130+
clock: C,
131+
synchronization_config: SynchronizationConfig,
132+
peer_defaults_config: SourceDefaultsConfig,
133+
ip_list: Arc<[IpAddr]>,
134+
) -> Self {
135+
// Setup system snapshot
136+
let mut system = SystemSnapshot {
137+
stratum: synchronization_config.local_stratum,
138+
..Default::default()
139+
};
140+
141+
if synchronization_config.local_stratum == 1 {
142+
// We are a stratum 1 server so mark our selves synchronized.
143+
system.time_snapshot.leap_indicator = NtpLeapIndicator::NoWarning;
144+
}
145+
146+
System {
147+
synchronization_config,
148+
peer_defaults_config,
149+
system,
150+
ip_list,
151+
peers: Default::default(),
152+
clock,
153+
controller: None,
154+
}
155+
}
156+
157+
pub fn system_snapshot(&self) -> SystemSnapshot {
158+
self.system
159+
}
160+
161+
fn clock_controller(&mut self) -> Result<&mut KalmanClockController<C, PeerId>, C::Error> {
162+
let controller = match self.controller.take() {
163+
Some(controller) => controller,
164+
None => KalmanClockController::new(
165+
self.clock.clone(),
166+
self.synchronization_config,
167+
self.peer_defaults_config,
168+
self.synchronization_config.algorithm,
169+
)?,
170+
};
171+
Ok(self.controller.insert(controller))
172+
}
173+
174+
pub fn handle_peer_create(&mut self, id: PeerId) -> Result<(), C::Error> {
175+
self.clock_controller()?.peer_add(id);
176+
self.peers.insert(id, None);
177+
Ok(())
178+
}
179+
180+
pub fn handle_peer_remove(&mut self, id: PeerId) -> Result<(), C::Error> {
181+
self.clock_controller()?.peer_remove(id);
182+
self.peers.remove(&id);
183+
Ok(())
184+
}
185+
186+
pub fn handle_peer_snapshot(
187+
&mut self,
188+
id: PeerId,
189+
snapshot: PeerSnapshot,
190+
) -> Result<(), C::Error> {
191+
let usable = snapshot
192+
.accept_synchronization(
193+
self.synchronization_config.local_stratum,
194+
self.ip_list.as_ref(),
195+
&self.system,
196+
)
197+
.is_ok();
198+
self.clock_controller()?.peer_update(id, usable);
199+
*self.peers.get_mut(&id).unwrap() = Some(snapshot);
200+
Ok(())
201+
}
202+
203+
pub fn handle_peer_measurement(
204+
&mut self,
205+
id: PeerId,
206+
snapshot: PeerSnapshot,
207+
measurement: Measurement,
208+
) -> Result<Option<Duration>, C::Error> {
209+
if let Err(e) = self.handle_peer_snapshot(id, snapshot) {
210+
panic!("Could not handle peer snapshot: {}", e);
211+
}
212+
// note: local needed for borrow checker
213+
let update = self.clock_controller()?.peer_measurement(id, measurement);
214+
Ok(self.handle_algorithm_state_update(update))
215+
}
216+
217+
fn handle_algorithm_state_update(&mut self, update: StateUpdate<PeerId>) -> Option<Duration> {
218+
if let Some(ref used_peers) = update.used_peers {
219+
self.system.update_used_peers(used_peers.iter().map(|v| {
220+
self.peers.get(v).and_then(|snapshot| *snapshot).expect(
221+
"Critical error: Peer used for synchronization that is not known to system",
222+
)
223+
}));
224+
}
225+
if let Some(time_snapshot) = update.time_snapshot {
226+
self.system
227+
.update_timedata(time_snapshot, &self.synchronization_config);
228+
}
229+
update.next_update
230+
}
231+
232+
pub fn handle_timer(&mut self) -> Option<Duration> {
233+
tracing::debug!("Timer expired");
234+
// note: local needed for borrow checker
235+
if let Some(controller) = self.controller.as_mut() {
236+
let update = controller.time_update();
237+
self.handle_algorithm_state_update(update)
238+
} else {
239+
None
240+
}
241+
}
242+
243+
pub fn observe_peer(&self, id: PeerId) -> Option<(PeerSnapshot, ObservablePeerTimedata)> {
244+
if let Some(ref controller) = self.controller {
245+
self.peers
246+
.get(&id)
247+
.copied()
248+
.flatten()
249+
.and_then(|v| controller.peer_snapshot(id).map(|s| (v, s)))
250+
} else {
251+
None
252+
}
253+
}
254+
255+
pub fn update_ip_list(&mut self, ip_list: Arc<[IpAddr]>) {
256+
self.ip_list = ip_list;
257+
}
258+
}
259+
109260
#[cfg(test)]
110261
mod tests {
111-
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
262+
use std::net::{Ipv4Addr, SocketAddr};
112263

113264
use crate::time_types::PollIntervalLimits;
114265

0 commit comments

Comments
 (0)