Skip to content

Commit c5c6c49

Browse files
davidv1992rnijveld
authored andcommitted
Reworked controller to simplify logic in system.
1 parent d2bf8c1 commit c5c6c49

File tree

12 files changed

+101
-37
lines changed

12 files changed

+101
-37
lines changed

ntp-proto/src/algorithm/kalman/mod.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,12 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> KalmanClockController<C, S
142142
combined.estimate.frequency_variance().sqrt() * 1e6
143143
);
144144

145+
if self.in_startup {
146+
self.clock
147+
.disable_ntp_algorithm()
148+
.expect("Cannot update clock");
149+
}
150+
145151
let freq_delta = combined.estimate.frequency() - self.desired_freq;
146152
let freq_uncertainty = combined.estimate.frequency_variance().sqrt();
147153
let offset_delta = combined.estimate.offset();
@@ -345,23 +351,27 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> TimeSyncController<C, Sour
345351
algo_config: Self::AlgorithmConfig,
346352
) -> Result<Self, C::Error> {
347353
// Setup clock
348-
clock.disable_ntp_algorithm()?;
349-
clock.status_update(NtpLeapIndicator::Unknown)?;
350-
clock.set_frequency(0.0)?;
354+
let freq_offset = clock.get_frequency()?;
351355

352356
Ok(KalmanClockController {
353357
sources: HashMap::new(),
354358
clock,
355359
synchronization_config,
356360
source_defaults_config,
357361
algo_config,
358-
freq_offset: 0.0,
362+
freq_offset,
359363
desired_freq: 0.0,
360364
timedata: TimeSnapshot::default(),
361365
in_startup: true,
362366
})
363367
}
364368

369+
fn take_control(&mut self) -> Result<(), <C as NtpClock>::Error> {
370+
self.clock.disable_ntp_algorithm()?;
371+
self.clock.status_update(NtpLeapIndicator::Unknown)?;
372+
Ok(())
373+
}
374+
365375
fn add_source(&mut self, id: SourceId) -> Self::SourceController {
366376
self.sources.insert(id, (None, false));
367377
KalmanSourceController::new(id, self.algo_config, self.source_defaults_config)
@@ -426,6 +436,10 @@ mod tests {
426436
Ok(self.current_time)
427437
}
428438

439+
fn get_frequency(&self) -> Result<f64, Self::Error> {
440+
Ok(0.0)
441+
}
442+
429443
fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
430444
*self.has_steered.borrow_mut() = true;
431445
Ok(self.current_time)

ntp-proto/src/algorithm/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ pub trait TimeSyncController<C: NtpClock, SourceId: Hash + Eq + Copy + Debug>: S
6868
algorithm_config: Self::AlgorithmConfig,
6969
) -> Result<Self, C::Error>;
7070

71+
/// Take control of the clock (should not be done in new!)
72+
fn take_control(&mut self) -> Result<(), C::Error>;
73+
7174
/// Create a new source with given identity
7275
fn add_source(&mut self, id: SourceId) -> Self::SourceController;
7376
/// Notify the controller that a previous source has gone

ntp-proto/src/clock.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ pub trait NtpClock: Clone + Send + 'static {
1515
// Change the frequency of the clock, returning the time
1616
// at which the change was applied.
1717
fn set_frequency(&self, freq: f64) -> Result<NtpTimestamp, Self::Error>;
18+
19+
// Get the frequency of the clock
20+
fn get_frequency(&self) -> Result<f64, Self::Error>;
21+
1822
// Change the current time of the clock by offset. Returns
1923
// the time at which the change was applied.
2024
fn step_clock(&self, offset: NtpDuration) -> Result<NtpTimestamp, Self::Error>;

ntp-proto/src/packet/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1441,6 +1441,10 @@ mod tests {
14411441
panic!("Unexpected clock steer");
14421442
}
14431443

1444+
fn get_frequency(&self) -> Result<f64, Self::Error> {
1445+
Ok(0.0)
1446+
}
1447+
14441448
fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
14451449
panic!("Unexpected clock steer");
14461450
}

ntp-proto/src/server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,10 @@ mod tests {
436436
panic!("Shouldn't be called by server");
437437
}
438438

439+
fn get_frequency(&self) -> Result<f64, Self::Error> {
440+
Ok(0.0)
441+
}
442+
439443
fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
440444
panic!("Shouldn't be called by server");
441445
}

ntp-proto/src/source.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,10 @@ mod test {
847847
panic!("Shouldn't be called by source");
848848
}
849849

850+
fn get_frequency(&self) -> Result<f64, Self::Error> {
851+
Ok(0.0)
852+
}
853+
850854
fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
851855
panic!("Shouldn't be called by source");
852856
}

ntp-proto/src/system.rs

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ pub struct System<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> {
187187

188188
sources: HashMap<SourceId, Option<NtpSourceSnapshot>>,
189189

190-
clock: C,
191-
controller: Option<KalmanClockController<C, SourceId>>,
190+
controller: KalmanClockController<C, SourceId>,
191+
controller_took_control: bool,
192192
}
193193

194194
impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> System<C, SourceId> {
@@ -197,7 +197,7 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> System<C, SourceId> {
197197
synchronization_config: SynchronizationConfig,
198198
source_defaults_config: SourceDefaultsConfig,
199199
ip_list: Arc<[IpAddr]>,
200-
) -> Self {
200+
) -> Result<Self, C::Error> {
201201
// Setup system snapshot
202202
let mut system = SystemSnapshot {
203203
stratum: synchronization_config.local_stratum,
@@ -209,32 +209,36 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> System<C, SourceId> {
209209
system.time_snapshot.leap_indicator = NtpLeapIndicator::NoWarning;
210210
}
211211

212-
System {
212+
Ok(System {
213213
synchronization_config,
214214
source_defaults_config,
215215
system,
216216
ip_list,
217217
sources: Default::default(),
218-
clock,
219-
controller: None,
220-
}
218+
controller: KalmanClockController::new(
219+
clock,
220+
synchronization_config,
221+
source_defaults_config,
222+
synchronization_config.algorithm,
223+
)?,
224+
controller_took_control: false,
225+
})
221226
}
222227

223228
pub fn system_snapshot(&self) -> SystemSnapshot {
224229
self.system
225230
}
226231

227-
fn clock_controller(&mut self) -> Result<&mut KalmanClockController<C, SourceId>, C::Error> {
228-
let controller = match self.controller.take() {
229-
Some(controller) => controller,
230-
None => KalmanClockController::new(
231-
self.clock.clone(),
232-
self.synchronization_config,
233-
self.source_defaults_config,
234-
self.synchronization_config.algorithm,
235-
)?,
236-
};
237-
Ok(self.controller.insert(controller))
232+
pub fn check_clock_access(&mut self) -> Result<(), C::Error> {
233+
self.ensure_controller_control()
234+
}
235+
236+
fn ensure_controller_control(&mut self) -> Result<(), C::Error> {
237+
if !self.controller_took_control {
238+
self.controller.take_control()?;
239+
self.controller_took_control = true;
240+
}
241+
Ok(())
238242
}
239243

240244
#[allow(clippy::type_complexity)]
@@ -250,7 +254,8 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> System<C, SourceId> {
250254
),
251255
C::Error,
252256
> {
253-
let controller = self.clock_controller()?.add_source(id);
257+
self.ensure_controller_control()?;
258+
let controller = self.controller.add_source(id);
254259
self.sources.insert(id, None);
255260
Ok(NtpSource::new(
256261
source_addr,
@@ -274,7 +279,8 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> System<C, SourceId> {
274279
),
275280
C::Error,
276281
> {
277-
let controller = self.clock_controller()?.add_source(id);
282+
self.ensure_controller_control()?;
283+
let controller = self.controller.add_source(id);
278284
self.sources.insert(id, None);
279285
Ok(NtpSource::new_nts(
280286
source_addr,
@@ -287,7 +293,7 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> System<C, SourceId> {
287293
}
288294

289295
pub fn handle_source_remove(&mut self, id: SourceId) -> Result<(), C::Error> {
290-
self.clock_controller()?.remove_source(id);
296+
self.controller.remove_source(id);
291297
self.sources.remove(&id);
292298
Ok(())
293299
}
@@ -305,10 +311,10 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> System<C, SourceId> {
305311
&self.system,
306312
)
307313
.is_ok();
308-
self.clock_controller()?.source_update(id, usable);
314+
self.controller.source_update(id, usable);
309315
*self.sources.get_mut(&id).unwrap() = Some(update.snapshot);
310316
if let Some(message) = update.message {
311-
let update = self.clock_controller()?.source_message(id, message);
317+
let update = self.controller.source_message(id, message);
312318
Ok(self.handle_algorithm_state_update(update))
313319
} else {
314320
Ok(actions!())
@@ -343,13 +349,8 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug> System<C, SourceId> {
343349

344350
pub fn handle_timer(&mut self) -> SystemActionIterator<KalmanSourceController<SourceId>> {
345351
tracing::debug!("Timer expired");
346-
// note: local needed for borrow checker
347-
if let Some(controller) = self.controller.as_mut() {
348-
let update = controller.time_update();
349-
self.handle_algorithm_state_update(update)
350-
} else {
351-
actions!()
352-
}
352+
let update = self.controller.time_update();
353+
self.handle_algorithm_state_update(update)
353354
}
354355

355356
pub fn update_ip_list(&mut self, ip_list: Arc<[IpAddr]>) {

ntpd/src/daemon/clock.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ impl NtpClock for NtpClockWrapper {
3131
.map(convert_clock_timestamp)
3232
}
3333

34+
fn get_frequency(&self) -> Result<f64, Self::Error> {
35+
self.0.get_frequency().map(|v| v * 1e-6)
36+
}
37+
3438
fn step_clock(
3539
&self,
3640
offset: ntp_proto::NtpDuration,

ntpd/src/daemon/ntp_source.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,10 @@ mod tests {
549549
//ignore
550550
}
551551

552+
fn get_frequency(&self) -> Result<f64, Self::Error> {
553+
Ok(0.0)
554+
}
555+
552556
fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
553557
panic!("Shouldn't be called by source");
554558
}
@@ -597,7 +601,8 @@ mod tests {
597601
SynchronizationConfig::default(),
598602
SourceDefaultsConfig::default(),
599603
Arc::new([]),
600-
);
604+
)
605+
.unwrap();
601606

602607
let Ok((source, _)) = system.create_ntp_source(
603608
index,

ntpd/src/daemon/observer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ mod tests {
213213
Ok(NtpTimestamp::default())
214214
}
215215

216+
fn get_frequency(&self) -> Result<f64, Self::Error> {
217+
Ok(0.0)
218+
}
219+
216220
fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
217221
Ok(NtpTimestamp::default())
218222
}

ntpd/src/daemon/server.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,10 @@ mod tests {
254254
panic!("Shouldn't be called by source");
255255
}
256256

257+
fn get_frequency(&self) -> Result<f64, Self::Error> {
258+
Ok(0.0)
259+
}
260+
257261
fn step_clock(&self, _offset: NtpDuration) -> Result<NtpTimestamp, Self::Error> {
258262
panic!("Shouldn't be called by source");
259263
}

ntpd/src/daemon/system.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ pub async fn spawn(
102102
source_defaults_config,
103103
keyset,
104104
ip_list,
105+
!source_configs.is_empty(),
105106
);
106107

107108
for source_config in source_configs {
@@ -193,6 +194,7 @@ struct SystemTask<C: NtpClock, T: Wait> {
193194
}
194195

195196
impl<C: NtpClock + Sync, T: Wait> SystemTask<C, T> {
197+
#[allow(clippy::too_many_arguments)]
196198
fn new(
197199
clock: C,
198200
interface: Option<InterfaceName>,
@@ -201,13 +203,24 @@ impl<C: NtpClock + Sync, T: Wait> SystemTask<C, T> {
201203
source_defaults_config: SourceDefaultsConfig,
202204
keyset: tokio::sync::watch::Receiver<Arc<KeySet>>,
203205
ip_list: tokio::sync::watch::Receiver<Arc<[IpAddr]>>,
206+
have_sources: bool,
204207
) -> (Self, DaemonChannels) {
205-
let system = System::new(
208+
let Ok(mut system) = System::new(
206209
clock.clone(),
207210
synchronization_config,
208211
source_defaults_config,
209212
ip_list.borrow().clone(),
210-
);
213+
) else {
214+
tracing::error!("Could not start system");
215+
std::process::exit(70);
216+
};
217+
218+
if have_sources {
219+
if let Err(e) = system.check_clock_access() {
220+
tracing::error!("Could not control clock: {}", e);
221+
std::process::exit(70);
222+
}
223+
}
211224

212225
// Create communication channels
213226
let (system_snapshot_sender, system_snapshot_receiver) =

0 commit comments

Comments
 (0)