From f1bce2c60456c6850b06967d52f07556ee78655f Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Sat, 19 Jul 2025 11:29:17 +0200 Subject: [PATCH 01/10] Initial setup for csptp experimentation. --- Cargo.lock | 70 +++++++++++++++++++ Cargo.toml | 1 + ntp-proto/Cargo.toml | 1 + ntp-proto/src/csptp/mod.rs | 23 ++++++ ntp-proto/src/lib.rs | 1 + .../src/packet/v5/server_reference_id.rs | 2 +- ntp.toml | 8 +-- 7 files changed, 101 insertions(+), 5 deletions(-) create mode 100644 ntp-proto/src/csptp/mod.rs diff --git a/Cargo.lock b/Cargo.lock index a4b047a38..4fcf5af5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,6 +69,15 @@ version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3d036a3c4ab069c7b410a2ce876bd74808d2d0888a82667669f8e783a898bf1" +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +dependencies = [ + "serde", +] + [[package]] name = "aws-lc-rs" version = "1.14.1" @@ -93,6 +102,12 @@ dependencies = [ "libloading", ] +[[package]] +name = "az" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b7e4c2464d97fe331d41de9d5db0def0a96f4d823b8b32a2efd503578988973" + [[package]] name = "backtrace" version = "0.3.76" @@ -143,6 +158,12 @@ dependencies = [ "generic-array", ] +[[package]] +name = "bytemuck" +version = "1.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3995eaeebcdf32f91f980d360f78732ddc061097ab4e39991ae7a6ace9194677" + [[package]] name = "bytes" version = "1.10.1" @@ -267,6 +288,12 @@ dependencies = [ "libc", ] +[[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + [[package]] name = "crypto-common" version = "0.1.6" @@ -331,6 +358,18 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1ced73b1dacfc750a6db6c0a0c3a3853c8b41997e2e2c563dc90804ae6867959" +[[package]] +name = "fixed" +version = "1.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "707070ccf8c4173548210893a0186e29c266901b71ed20cd9e2ca0193dfe95c3" +dependencies = [ + "az", + "bytemuck", + "half", + "typenum", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -382,6 +421,16 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280" +[[package]] +name = "half" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "459196ed295495a68f7d7fe1d84f6c4b7ff0e21fe3017b2f283c6fac3ad803c9" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hashbrown" version = "0.16.0" @@ -487,6 +536,12 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "libm" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" + [[package]] name = "log" version = "0.4.28" @@ -558,6 +613,7 @@ dependencies = [ "rustls-platform-verifier", "serde", "serde_json", + "statime", "tokio", "tokio-rustls", "tracing", @@ -962,6 +1018,20 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "statime" +version = "0.4.0" +source = "git+https://github.com/pendulum-project/statime.git?branch=csptp-hackathon#d2350db7bb53649729a9890eb6e90359f7be2a0b" +dependencies = [ + "arrayvec", + "az", + "fixed", + "libm", + "log", + "rand", + "serde", +] + [[package]] name = "subtle" version = "2.6.1" diff --git a/Cargo.toml b/Cargo.toml index 138095064..fcf40df23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ toml = { version = ">=0.6.0,<0.9.0", default-features = false, features = ["pars timestamped-socket = "0.2.2" clock-steering = "0.2.1" pps-time = "0.2.3" +statime = { git = "https://github.com/pendulum-project/statime.git", branch = "csptp-hackathon" } # TLS rustls23 = { package = "rustls", version = "0.23.16", features = ["logging", "std"] } diff --git a/ntp-proto/Cargo.toml b/ntp-proto/Cargo.toml index efd267c55..8d6a1edb4 100644 --- a/ntp-proto/Cargo.toml +++ b/ntp-proto/Cargo.toml @@ -32,6 +32,7 @@ arbitrary = { workspace = true, optional = true } aead.workspace = true aes-siv.workspace = true zeroize.workspace = true +statime.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/ntp-proto/src/csptp/mod.rs b/ntp-proto/src/csptp/mod.rs new file mode 100644 index 000000000..71a64b513 --- /dev/null +++ b/ntp-proto/src/csptp/mod.rs @@ -0,0 +1,23 @@ +use std::io::Cursor; + +pub struct CsptpPacket<'a> { + inner: statime::datastructures::messages::Message<'a>, +} + +impl<'a> CsptpPacket<'a> { + fn deserialize(data: &'a [u8]) -> Result { + Ok(CsptpPacket { + inner: statime::datastructures::messages::Message::deserialize(data)?, + }) + } + + fn serialize( + &self, + w: &mut Cursor<&mut [u8]>, + ) -> Result<(), statime::datastructures::WireFormatError> { + let start = w.position() as usize; + let bytes = self.inner.serialize(&mut w.get_mut()[start..])?; + w.set_position((start + bytes) as u64); + Ok(()) + } +} diff --git a/ntp-proto/src/lib.rs b/ntp-proto/src/lib.rs index 3ffc9fb68..af405a8d1 100644 --- a/ntp-proto/src/lib.rs +++ b/ntp-proto/src/lib.rs @@ -12,6 +12,7 @@ mod algorithm; mod clock; mod config; mod cookiestash; +mod csptp; mod identifiers; mod io; mod ipfilter; diff --git a/ntp-proto/src/packet/v5/server_reference_id.rs b/ntp-proto/src/packet/v5/server_reference_id.rs index 11d395449..342876edc 100644 --- a/ntp-proto/src/packet/v5/server_reference_id.rs +++ b/ntp-proto/src/packet/v5/server_reference_id.rs @@ -34,7 +34,7 @@ impl TryFrom for U12 { type Error = (); fn try_from(value: u16) -> Result { - if value > Self::MAX.into() { + if value > >::into(Self::MAX) { Err(()) } else { Ok(Self(value)) diff --git a/ntp.toml b/ntp.toml index 7dd28f298..28c683e46 100644 --- a/ntp.toml +++ b/ntp.toml @@ -6,9 +6,9 @@ observation-path = "/var/run/ntpd-rs/observe" # Pool servers from ntppool.org. See http://www.pool.ntp.org/join.html # for more information [[source]] -mode = "pool" -address = "ntpd-rs.pool.ntp.org" -count = 4 +mode = "nts" +address = "ke.experimental.ntspool.trifectatech.org" +ntp-version = 4 # Alternative configuration for IPv6 only machines #[[source]] @@ -23,4 +23,4 @@ count = 4 single-step-panic-threshold = 1800 startup-step-panic-threshold = { forward="inf", backward = 1800 } #accumulated-step-panic-threshold = 1800 -#minimum-agreeing-sources = 3 +minimum-agreeing-sources = 1 From a4f7d9dc3e00c6d691da4d92d2a2b4bb29f683d9 Mon Sep 17 00:00:00 2001 From: Ruben Nijveld Date: Sat, 19 Jul 2025 11:49:14 +0200 Subject: [PATCH 02/10] Add server setup for CSPTP --- ntpd/src/daemon/config/mod.rs | 2 + ntpd/src/daemon/config/server.rs | 6 ++ ntpd/src/daemon/csptp_server.rs | 159 +++++++++++++++++++++++++++++++ ntpd/src/daemon/mod.rs | 14 ++- ntpd/src/daemon/system.rs | 28 ++++++ ntpd/src/force_sync/mod.rs | 1 + 6 files changed, 205 insertions(+), 5 deletions(-) create mode 100644 ntpd/src/daemon/csptp_server.rs diff --git a/ntpd/src/daemon/config/mod.rs b/ntpd/src/daemon/config/mod.rs index 7f0522901..9575a7d35 100644 --- a/ntpd/src/daemon/config/mod.rs +++ b/ntpd/src/daemon/config/mod.rs @@ -356,6 +356,8 @@ pub struct Config { pub sources: Vec, #[serde(rename = "server", default)] pub servers: Vec, + #[serde(rename = "csptp-server", default)] + pub csptp_servers: Vec, #[serde(rename = "nts-ke-server", default)] pub nts_ke: Vec, #[serde(default)] diff --git a/ntpd/src/daemon/config/server.rs b/ntpd/src/daemon/config/server.rs index 087f9bdc4..b1c669be4 100644 --- a/ntpd/src/daemon/config/server.rs +++ b/ntpd/src/daemon/config/server.rs @@ -41,6 +41,12 @@ fn default_stale_key_count() -> usize { 7 } +#[derive(Debug, PartialEq, Eq, Clone, Deserialize)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +pub struct CsptpServerConfig { + pub listen: SocketAddr, +} + #[derive(Debug, PartialEq, Eq, Clone, Deserialize)] #[serde(rename_all = "kebab-case", deny_unknown_fields)] pub struct ServerConfig { diff --git a/ntpd/src/daemon/csptp_server.rs b/ntpd/src/daemon/csptp_server.rs new file mode 100644 index 000000000..c7cec625b --- /dev/null +++ b/ntpd/src/daemon/csptp_server.rs @@ -0,0 +1,159 @@ +use std::{sync::Arc, time::Duration}; + +use ntp_proto::{KeySet, NtpClock, SystemSnapshot}; +use timestamped_socket::socket::{RecvResult, open_ip}; +use tokio::task::JoinHandle; +use tracing::{Instrument, Span, debug, instrument, warn}; + +use crate::daemon::config::CsptpServerConfig; + +const MAX_PACKET_SIZE: usize = 1024; + +pub struct CsptpServerTask { + config: CsptpServerConfig, + network_wait_period: std::time::Duration, + system_receiver: tokio::sync::watch::Receiver, + keyset: tokio::sync::watch::Receiver>, + server: CsptpServer, +} + +impl CsptpServerTask { + #[instrument(level = tracing::Level::ERROR, name = "CSPTP Server", skip_all, fields(address = debug(config.listen)))] + pub fn spawn( + config: CsptpServerConfig, + mut system_receiver: tokio::sync::watch::Receiver, + mut keyset: tokio::sync::watch::Receiver>, + clock: C, + network_wait_period: Duration, + ) -> JoinHandle<()> { + tokio::spawn( + (async move { + let server = CsptpServer::new( + config.clone().into(), + clock, + *system_receiver.borrow_and_update(), + keyset.borrow_and_update().clone(), + ); + + let mut process = CsptpServerTask { + config, + network_wait_period, + system_receiver, + keyset, + server, + }; + + process.serve().await; + }) + .instrument(Span::current()), + ) + } + + async fn serve(&mut self) { + let mut cur_socket = None; + loop { + // open socket if it is not already open + let socket = match &mut cur_socket { + Some(socket) => socket, + None => { + let new_socket = loop { + let socket_res = open_ip( + self.config.listen, + timestamped_socket::socket::GeneralTimestampMode::SoftwareRecv, + ); + + match socket_res { + Ok(socket) => break socket, + Err(error) => { + warn!(?error, ?self.config.listen, "Could not open server socket"); + tokio::time::sleep(self.network_wait_period).await; + } + } + }; + + // system and keyset may now be wildly out of date, ensure they are always updated. + self.server + .update_system(*self.system_receiver.borrow_and_update()); + self.server + .update_keyset(self.keyset.borrow_and_update().clone()); + + cur_socket.insert(new_socket) + } + }; + + let mut buf = [0_u8; MAX_PACKET_SIZE]; + tokio::select! { + recv_res = socket.recv(&mut buf) => { + match recv_res { + Ok(RecvResult { + bytes_read: _length, + remote_addr: _source_addr, + timestamp: Some(_timestamp), + }) => { + let mut _send_buf = [0u8; MAX_PACKET_SIZE]; + // TODO: parse and respond + } + Ok(_) => { + debug!("received a packet without a timestamp"); + } + Err(receive_error) => { + warn!(?receive_error, "could not receive packet"); + + // For a server, we only trigger NetworkGone restarts + // on ENETDOWN. ENETUNREACH, EHOSTDOWN and EHOSTUNREACH + // do not signal restart-worthy conditions for the a + // server (they essentially indicate problems with the + // remote network/host, which is not relevant for a server). + // Furthermore, they can conceivably be triggered by a + // malicious third party, and triggering restart on them + // would then result in a denial-of-service. + if matches!(receive_error.raw_os_error(), Some(libc::ENETDOWN)) { + cur_socket = None; + } + } + } + }, + _ = self.system_receiver.changed(), if self.system_receiver.has_changed().is_ok() => { + self.server.update_system(*self.system_receiver.borrow_and_update()); + } + _ = self.keyset.changed(), if self.keyset.has_changed().is_ok() => { + self.server.update_keyset(self.keyset.borrow_and_update().clone()); + } + } + } + } +} + +pub struct CsptpServer { + config: CsptpServerConfig, + clock: C, + system: SystemSnapshot, + keyset: Arc, +} + +impl CsptpServer { + /// Create a new server + pub fn new( + config: CsptpServerConfig, + clock: C, + system: SystemSnapshot, + keyset: Arc, + ) -> Self { + Self { + config, + clock, + system, + keyset, + } + } + + /// Provide the server with the latest [`SystemSnapshot`] + pub fn update_system(&mut self, system: SystemSnapshot) { + self.system = system; + } + + /// Provide the server with a new [`KeySet`] + pub fn update_keyset(&mut self, keyset: Arc) { + self.keyset = keyset; + } +} diff --git a/ntpd/src/daemon/mod.rs b/ntpd/src/daemon/mod.rs index 0a434ee31..8f3ee1044 100644 --- a/ntpd/src/daemon/mod.rs +++ b/ntpd/src/daemon/mod.rs @@ -1,5 +1,6 @@ mod clock; pub mod config; +mod csptp_server; pub mod keyexchange; mod local_ip_provider; mod ntp_source; @@ -83,11 +84,13 @@ pub(crate) fn initialize_logging_parse_config( fn run(options: NtpDaemonOptions) -> Result<(), Box> { let config = initialize_logging_parse_config(options.log_level, options.config); - let runtime = if config.servers.is_empty() && config.nts_ke.is_empty() { - Builder::new_current_thread().enable_all().build()? - } else { - Builder::new_multi_thread().enable_all().build()? - }; + let runtime = + if config.servers.is_empty() && config.nts_ke.is_empty() && config.csptp_servers.is_empty() + { + Builder::new_current_thread().enable_all().build()? + } else { + Builder::new_multi_thread().enable_all().build()? + }; runtime.block_on(async { // give the user a warning that we use the command line option @@ -117,6 +120,7 @@ fn run(options: NtpDaemonOptions) -> Result<(), Box> { clock_config, &config.sources, &config.servers, + &config.csptp_servers, keyset.clone(), ) .await?; diff --git a/ntpd/src/daemon/system.rs b/ntpd/src/daemon/system.rs index 8f5f3e04b..89d03cdac 100644 --- a/ntpd/src/daemon/system.rs +++ b/ntpd/src/daemon/system.rs @@ -1,6 +1,8 @@ #[cfg(feature = "pps")] use crate::daemon::pps_source::PpsSourceTask; use crate::daemon::{ + config::CsptpServerConfig, + csptp_server::CsptpServerTask, sock_source::SockSourceTask, spawn::{SourceCreateParameters, spawner_task}, }; @@ -99,6 +101,7 @@ pub async fn spawn>, ) -> std::io::Result<(JoinHandle>, DaemonChannels)> { let ip_list = super::local_ip_provider::spawn()?; @@ -164,6 +167,11 @@ pub async fn spawn, servers: Vec, + csptp_servers: Vec, spawners: Vec, clock: C, @@ -273,6 +282,7 @@ impl) -> std::io::Result { clock_config, &config.sources, &[], // No serving when operating in force sync mode + &[], // No serving when operating in force sync mode keyset.clone(), ) .await?; From 4f2b1898532161e606ee73e39df222f519522954 Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Sat, 19 Jul 2025 13:00:28 +0200 Subject: [PATCH 03/10] Added extension field parsing for csptp --- ntp-proto/src/csptp/mod.rs | 61 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/ntp-proto/src/csptp/mod.rs b/ntp-proto/src/csptp/mod.rs index 71a64b513..908bd4cc8 100644 --- a/ntp-proto/src/csptp/mod.rs +++ b/ntp-proto/src/csptp/mod.rs @@ -1,5 +1,10 @@ use std::io::Cursor; +use statime::datastructures::WireFormat; + +const CSPTP_REQUEST_TLV: u16 = 0xFF00; +const CSPTP_RESPONSE_TLV: u16 = 0xFF01; + pub struct CsptpPacket<'a> { inner: statime::datastructures::messages::Message<'a>, } @@ -20,4 +25,60 @@ impl<'a> CsptpPacket<'a> { w.set_position((start + bytes) as u64); Ok(()) } + + pub fn get_csptp_request_flags(&self) -> Option { + for tlv in self.inner.suffix.tlv() { + if tlv.tlv_type.to_primitive() == CSPTP_REQUEST_TLV { + let flags = tlv.value.get(0).copied().unwrap_or_default(); + return Some(CsptpRequestFlags { + csptp_status: flags & 1 != 0, + alt_timescale: flags & 2 != 0, + }); + } + } + + None + } + + pub fn get_csptp_response_data(&self) -> Option { + for tlv in self.inner.suffix.tlv() { + if tlv.tlv_type.to_primitive() == CSPTP_RESPONSE_TLV && tlv.value.len() >= 18 { + return Some(CsptpResponseData { + req_ingress_timestamp: + statime::datastructures::common::WireTimestamp::deserialize( + &tlv.value[0..10], + ) + .unwrap(), + req_ingress_correction: + statime::datastructures::common::TimeInterval::deserialize( + &tlv.value[10..18], + ) + .unwrap(), + }); + } + } + + None + } + + pub fn get_origin_timestamp(&self) -> Option { + match self.inner.body { + statime::datastructures::messages::MessageBody::Sync(sync_message) => Some(sync_message.origin_timestamp), + _ => None + } + } + + pub fn get_correction(&self) -> statime::datastructures::common::TimeInterval { + self.inner.header.correction_field + } +} + +pub struct CsptpRequestFlags { + pub csptp_status: bool, + pub alt_timescale: bool, +} + +pub struct CsptpResponseData { + pub req_ingress_timestamp: statime::datastructures::common::WireTimestamp, + pub req_ingress_correction: statime::datastructures::common::TimeInterval, } From 4d93e0fad5f92ee23bef327f9a0b37adc7f9ef92 Mon Sep 17 00:00:00 2001 From: Ruben Nijveld Date: Sat, 19 Jul 2025 13:18:44 +0200 Subject: [PATCH 04/10] Add conversions from statime types --- ntp-proto/src/time_types.rs | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/ntp-proto/src/time_types.rs b/ntp-proto/src/time_types.rs index 4e10c7201..d747a1480 100644 --- a/ntp-proto/src/time_types.rs +++ b/ntp-proto/src/time_types.rs @@ -3,6 +3,7 @@ use rand::{ distributions::{Distribution, Standard}, }; use serde::{Deserialize, Serialize, de::Unexpected}; +use statime::datastructures::common::{TimeInterval, WireTimestamp}; use std::ops::{Add, AddAssign, Div, DivAssign, Mul, MulAssign, Neg, Sub, SubAssign}; use std::time::{Duration, Instant}; @@ -76,6 +77,21 @@ impl NtpTimestamp { } } + pub fn from_statime(statime: &WireTimestamp) -> Self { + const EPOCH_OFFSET: u64 = (70 * 365 + 17) * 86400; + + // TODO: this is definitely not a constant + const TAI_OFFSET: u64 = 37; + + // add epoch and tai offset to get to NTP timescale + let seconds = statime + .seconds + .wrapping_add(EPOCH_OFFSET) + .wrapping_add(TAI_OFFSET) as u32; + + NtpTimestamp::from_seconds_nanos_since_ntp_era(seconds, statime.nanos) + } + pub(crate) const fn to_bits(self) -> [u8; 8] { self.timestamp.to_be_bytes() } @@ -217,6 +233,10 @@ impl NtpDuration { pub const ZERO: Self = Self { duration: 0 }; pub const MAX: Self = Self { duration: i64::MAX }; + pub fn from_statime(statime: &TimeInterval) -> Self { + Self::from_seconds(statime.to_nanos() / 1_000_000f64) + } + pub(crate) const fn from_bits(bits: [u8; 8]) -> Self { Self { duration: i64::from_be_bytes(bits), From 61a1580689ca1f0ef0f39c48a7401bf5d31dcb09 Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Sat, 19 Jul 2025 13:50:21 +0200 Subject: [PATCH 05/10] Added packet generation. --- ntp-proto/src/csptp/mod.rs | 96 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 93 insertions(+), 3 deletions(-) diff --git a/ntp-proto/src/csptp/mod.rs b/ntp-proto/src/csptp/mod.rs index 908bd4cc8..cdf1645ec 100644 --- a/ntp-proto/src/csptp/mod.rs +++ b/ntp-proto/src/csptp/mod.rs @@ -1,6 +1,6 @@ use std::io::Cursor; -use statime::datastructures::WireFormat; +use statime::datastructures::{common::TlvSetBuilder, WireFormat}; const CSPTP_REQUEST_TLV: u16 = 0xFF00; const CSPTP_RESPONSE_TLV: u16 = 0xFF01; @@ -63,14 +63,104 @@ impl<'a> CsptpPacket<'a> { pub fn get_origin_timestamp(&self) -> Option { match self.inner.body { - statime::datastructures::messages::MessageBody::Sync(sync_message) => Some(sync_message.origin_timestamp), - _ => None + statime::datastructures::messages::MessageBody::Sync(sync_message) => { + Some(sync_message.origin_timestamp) + } + _ => None, } } pub fn get_correction(&self) -> statime::datastructures::common::TimeInterval { self.inner.header.correction_field } + + pub fn request(buffer: &'a mut [u8], sequence_id: u16) -> Self { + let flags = [0u8; 4]; + let request_tlv = statime::datastructures::common::Tlv { + tlv_type: statime::datastructures::common::TlvType::Reserved(CSPTP_REQUEST_TLV), + value: flags.as_slice().into(), + }; + let mut tlvs = statime::datastructures::common::TlvSetBuilder::new(buffer); + tlvs.add(request_tlv).unwrap(); + Self { + inner: statime::datastructures::messages::Message { + header: statime::datastructures::messages::Header { + sdo_id: statime::config::SdoId::try_from(0x300).unwrap(), + version: statime::datastructures::messages::PtpVersion::new(2, 0).unwrap(), + domain_number: 0, + alternate_master_flag: false, + two_step_flag: false, + unicast_flag: true, + ptp_profile_specific_1: false, + ptp_profile_specific_2: false, + leap61: false, + leap59: false, + current_utc_offset_valid: false, + ptp_timescale: true, + time_tracable: false, + frequency_tracable: false, + synchronization_uncertain: false, + correction_field: Default::default(), + source_port_identity: Default::default(), + sequence_id, + log_message_interval: 0x7f, + }, + body: statime::datastructures::messages::MessageBody::Sync( + statime::datastructures::messages::sync::SyncMessage { + origin_timestamp: Default::default(), + }, + ), + suffix: tlvs.build(), + }, + } + } + + pub fn timestamp_response( + buffer: &'a mut [u8], + request: CsptpPacket<'_>, + receive_timestamp: statime::datastructures::common::WireTimestamp, + send_timestamp: statime::datastructures::common::WireTimestamp, + ) -> Self { + let mut tlvs = TlvSetBuilder::new(buffer); + let mut innerbuf = [0u8;18]; + receive_timestamp.serialize(&mut innerbuf[0..10]).unwrap(); + request.get_correction().serialize(&mut innerbuf[10..18]).unwrap(); + tlvs.add(statime::datastructures::common::Tlv { + tlv_type: statime::datastructures::common::TlvType::Reserved(CSPTP_RESPONSE_TLV), + value: innerbuf.as_slice().into(), + }).unwrap(); + Self { + inner: statime::datastructures::messages::Message { + header: statime::datastructures::messages::Header { + sdo_id: statime::config::SdoId::try_from(0x300).unwrap(), + version: statime::datastructures::messages::PtpVersion::new(2, 0).unwrap(), + domain_number: 0, + alternate_master_flag: false, + two_step_flag: false, + unicast_flag: true, + ptp_profile_specific_1: false, + ptp_profile_specific_2: false, + leap61: false, + leap59: false, + current_utc_offset_valid: false, + ptp_timescale: true, + time_tracable: false, + frequency_tracable: false, + synchronization_uncertain: false, + correction_field: Default::default(), + source_port_identity: Default::default(), + sequence_id: request.inner.header.sequence_id, + log_message_interval: 0x7f, + }, + body: statime::datastructures::messages::MessageBody::Sync( + statime::datastructures::messages::sync::SyncMessage { + origin_timestamp: send_timestamp, + }, + ), + suffix: tlvs.build(), + }, + } + } } pub struct CsptpRequestFlags { From 3cee988305157a118d425fe82ea801a05e4a1fe6 Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Sat, 19 Jul 2025 14:16:58 +0200 Subject: [PATCH 06/10] Implemented responding for csptp server. --- Cargo.lock | 1 + ntp-proto/src/csptp/mod.rs | 16 +++++++---- ntp-proto/src/lib.rs | 1 + ntp-proto/src/time_types.rs | 14 +++++++++ ntpd/Cargo.toml | 1 + ntpd/src/daemon/csptp_server.rs | 50 ++++++++++++++++++++++++++++----- 6 files changed, 70 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4fcf5af5e..baaac0cf5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -632,6 +632,7 @@ dependencies = [ "rustls", "serde", "serde_json", + "statime", "timestamped-socket", "tokio", "tokio-rustls", diff --git a/ntp-proto/src/csptp/mod.rs b/ntp-proto/src/csptp/mod.rs index cdf1645ec..234ef84b9 100644 --- a/ntp-proto/src/csptp/mod.rs +++ b/ntp-proto/src/csptp/mod.rs @@ -1,6 +1,6 @@ use std::io::Cursor; -use statime::datastructures::{common::TlvSetBuilder, WireFormat}; +use statime::datastructures::{WireFormat, common::TlvSetBuilder}; const CSPTP_REQUEST_TLV: u16 = 0xFF00; const CSPTP_RESPONSE_TLV: u16 = 0xFF01; @@ -10,13 +10,13 @@ pub struct CsptpPacket<'a> { } impl<'a> CsptpPacket<'a> { - fn deserialize(data: &'a [u8]) -> Result { + pub fn deserialize(data: &'a [u8]) -> Result { Ok(CsptpPacket { inner: statime::datastructures::messages::Message::deserialize(data)?, }) } - fn serialize( + pub fn serialize( &self, w: &mut Cursor<&mut [u8]>, ) -> Result<(), statime::datastructures::WireFormatError> { @@ -122,13 +122,17 @@ impl<'a> CsptpPacket<'a> { send_timestamp: statime::datastructures::common::WireTimestamp, ) -> Self { let mut tlvs = TlvSetBuilder::new(buffer); - let mut innerbuf = [0u8;18]; + let mut innerbuf = [0u8; 18]; receive_timestamp.serialize(&mut innerbuf[0..10]).unwrap(); - request.get_correction().serialize(&mut innerbuf[10..18]).unwrap(); + request + .get_correction() + .serialize(&mut innerbuf[10..18]) + .unwrap(); tlvs.add(statime::datastructures::common::Tlv { tlv_type: statime::datastructures::common::TlvType::Reserved(CSPTP_RESPONSE_TLV), value: innerbuf.as_slice().into(), - }).unwrap(); + }) + .unwrap(); Self { inner: statime::datastructures::messages::Message { header: statime::datastructures::messages::Header { diff --git a/ntp-proto/src/lib.rs b/ntp-proto/src/lib.rs index af405a8d1..79da664e0 100644 --- a/ntp-proto/src/lib.rs +++ b/ntp-proto/src/lib.rs @@ -147,6 +147,7 @@ mod exports { pub use crate::packet::v5::server_reference_id::{BloomFilter, ServerId}; } + pub use super::csptp::{CsptpPacket, CsptpRequestFlags, CsptpResponseData}; pub use super::generic::NtpVersion; } diff --git a/ntp-proto/src/time_types.rs b/ntp-proto/src/time_types.rs index d747a1480..6185d4281 100644 --- a/ntp-proto/src/time_types.rs +++ b/ntp-proto/src/time_types.rs @@ -92,6 +92,20 @@ impl NtpTimestamp { NtpTimestamp::from_seconds_nanos_since_ntp_era(seconds, statime.nanos) } + pub fn to_statime(&self) -> WireTimestamp { + const EPOCH_OFFSET: u64 = (70 * 365 + 17) * 86400; + + // TODO: this is definitely not a constant + const TAI_OFFSET: u64 = 37; + + let seconds = (self.timestamp >> 32) + .wrapping_sub(TAI_OFFSET) + .wrapping_sub(EPOCH_OFFSET); + let nanos = (((self.timestamp & 0xFFFFFFFF) * 1000000000) >> 32) as u32; + + WireTimestamp { seconds, nanos } + } + pub(crate) const fn to_bits(self) -> [u8; 8] { self.timestamp.to_be_bytes() } diff --git a/ntpd/Cargo.toml b/ntpd/Cargo.toml index 60615dab2..6f6bc4a7d 100644 --- a/ntpd/Cargo.toml +++ b/ntpd/Cargo.toml @@ -23,6 +23,7 @@ libc.workspace = true timestamped-socket.workspace = true clock-steering.workspace = true pps-time = { workspace = true, optional = true } +statime.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/ntpd/src/daemon/csptp_server.rs b/ntpd/src/daemon/csptp_server.rs index c7cec625b..ad34a46f2 100644 --- a/ntpd/src/daemon/csptp_server.rs +++ b/ntpd/src/daemon/csptp_server.rs @@ -1,6 +1,7 @@ -use std::{sync::Arc, time::Duration}; +use std::{io::Cursor, sync::Arc, time::Duration}; -use ntp_proto::{KeySet, NtpClock, SystemSnapshot}; +use ntp_proto::{CsptpPacket, KeySet, NtpClock, SystemSnapshot}; +use statime::datastructures::common::WireTimestamp; use timestamped_socket::socket::{RecvResult, open_ip}; use tokio::task::JoinHandle; use tracing::{Instrument, Span, debug, instrument, warn}; @@ -86,12 +87,14 @@ impl CsptpServerTask { recv_res = socket.recv(&mut buf) => { match recv_res { Ok(RecvResult { - bytes_read: _length, - remote_addr: _source_addr, - timestamp: Some(_timestamp), + bytes_read: length, + remote_addr: source_addr, + timestamp: Some(timestamp), }) => { - let mut _send_buf = [0u8; MAX_PACKET_SIZE]; - // TODO: parse and respond + let mut send_buf = [0u8; MAX_PACKET_SIZE]; + if let Some(buf) = self.server.respond(&mut send_buf, &buf[..length], timestamp) { + let _ = socket.send_to(buf, source_addr).await; + } } Ok(_) => { debug!("received a packet without a timestamp"); @@ -157,3 +160,36 @@ impl CsptpServer { self.keyset = keyset; } } + +impl CsptpServer { + fn respond<'a>( + &self, + buffer: &'a mut [u8], + request: &[u8], + timestamp: timestamped_socket::socket::Timestamp, + ) -> Option<&'a [u8]> { + let packet = CsptpPacket::deserialize(request).ok()?; + if packet.get_origin_timestamp().is_none() || packet.get_csptp_request_flags().is_none() { + return None; + } + + let mut tlvbuffer = [0u8; MAX_PACKET_SIZE]; + let receive_timestamp = WireTimestamp { + seconds: timestamp.seconds as _, + nanos: timestamp.nanos, + }; + let send_time = self.clock.now().ok()?; + let send_timestamp = send_time.to_statime(); + let response = CsptpPacket::timestamp_response( + &mut tlvbuffer, + packet, + receive_timestamp, + send_timestamp, + ); + + let mut cursor = Cursor::new(buffer); + response.serialize(&mut cursor).ok()?; + let size = cursor.position() as usize; + Some(&cursor.into_inner()[..size]) + } +} From 3a3891c4ff9331d89458aad6c5d6f55c4ba02a27 Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Sat, 19 Jul 2025 15:02:27 +0200 Subject: [PATCH 07/10] Added two way source controller. --- ntp-proto/src/source.rs | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/ntp-proto/src/source.rs b/ntp-proto/src/source.rs index d0fcd5bfc..9f438701f 100644 --- a/ntp-proto/src/source.rs +++ b/ntp-proto/src/source.rs @@ -96,6 +96,44 @@ pub struct NtpSource> { + controller: Controller, +} + +impl> TwoWaySource { + pub(crate) fn new(controller: Controller) -> TwoWaySource { + TwoWaySource { controller } + } + + pub fn handle_measurement( + &mut self, + measurement: Measurement, + ) -> Option { + self.controller.handle_measurement(measurement) + } + + pub fn handle_message(&mut self, message: Controller::ControllerMessage) { + self.controller.handle_message(message) + } + + pub fn observe( + &self, + name: String, + address: String, + id: SourceId, + ) -> ObservableSourceState { + ObservableSourceState { + timedata: self.controller.observe(), + unanswered_polls: 0, + poll_interval: crate::time_types::PollInterval::from_byte(0), + nts_cookies: None, + name, + address, + id, + } + } +} + pub struct OneWaySource> { controller: Controller, } From 82b3dbdf3cb4ead6c6ab887008abc54345baa188 Mon Sep 17 00:00:00 2001 From: Ruben Nijveld Date: Sat, 19 Jul 2025 17:10:28 +0200 Subject: [PATCH 08/10] CSPTP source setup --- ntp-proto/src/algorithm/kalman/mod.rs | 15 +++ ntp-proto/src/algorithm/mod.rs | 6 ++ ntp-proto/src/lib.rs | 2 +- ntp-proto/src/system.rs | 15 ++- ntpd/src/daemon/config/mod.rs | 2 + ntpd/src/daemon/config/ntp_source.rs | 52 ++++++++++ ntpd/src/daemon/csptp_source.rs | 60 ++++++++++++ ntpd/src/daemon/mod.rs | 1 + ntpd/src/daemon/spawn/csptp.rs | 133 ++++++++++++++++++++++++++ ntpd/src/daemon/spawn/mod.rs | 13 +++ ntpd/src/daemon/system.rs | 24 ++++- ntpd/src/force_sync/algorithm.rs | 13 +++ ntpd/src/force_sync/mod.rs | 1 + 13 files changed, 334 insertions(+), 3 deletions(-) create mode 100644 ntpd/src/daemon/csptp_source.rs create mode 100644 ntpd/src/daemon/spawn/csptp.rs diff --git a/ntp-proto/src/algorithm/kalman/mod.rs b/ntp-proto/src/algorithm/kalman/mod.rs index 8f437798c..de8518219 100644 --- a/ntp-proto/src/algorithm/kalman/mod.rs +++ b/ntp-proto/src/algorithm/kalman/mod.rs @@ -435,6 +435,21 @@ impl TimeSyncC ) } + fn add_two_way_source( + &mut self, + id: Self::SourceId, + source_config: SourceConfig, + ) -> Self::NtpSourceController { + self.sources.insert(id, (None, false)); + KalmanSourceController::new( + id, + self.algo_config, + None, + source_config, + AveragingBuffer::default(), + ) + } + fn remove_source(&mut self, id: SourceId) { self.sources.remove(&id); } diff --git a/ntp-proto/src/algorithm/mod.rs b/ntp-proto/src/algorithm/mod.rs index f4f27d023..741ebf230 100644 --- a/ntp-proto/src/algorithm/mod.rs +++ b/ntp-proto/src/algorithm/mod.rs @@ -90,6 +90,12 @@ pub trait TimeSyncController: Sized + Send + 'static { measurement_noise_estimate: f64, period: Option, ) -> Self::OneWaySourceController; + /// Create a new two way source with given identity (used e.g. with CSPTP sources) + fn add_two_way_source( + &mut self, + id: Self::SourceId, + source_config: SourceConfig, + ) -> Self::NtpSourceController; /// Notify the controller that a previous source has gone fn remove_source(&mut self, id: Self::SourceId); /// Notify the controller that the status of a source (whether diff --git a/ntp-proto/src/lib.rs b/ntp-proto/src/lib.rs index 79da664e0..b144acb6a 100644 --- a/ntp-proto/src/lib.rs +++ b/ntp-proto/src/lib.rs @@ -119,7 +119,7 @@ mod exports { AcceptSynchronizationError, Measurement, NtpSource, NtpSourceAction, NtpSourceActionIterator, NtpSourceSnapshot, NtpSourceUpdate, ObservableSourceState, OneWaySource, OneWaySourceSnapshot, OneWaySourceUpdate, ProtocolVersion, Reach, - SourceNtsData, + SourceNtsData, TwoWaySource, }; pub use super::system::{ System, SystemAction, SystemActionIterator, SystemSnapshot, SystemSourceUpdate, diff --git a/ntp-proto/src/system.rs b/ntp-proto/src/system.rs index 5b82c3f55..bf481e724 100644 --- a/ntp-proto/src/system.rs +++ b/ntp-proto/src/system.rs @@ -6,7 +6,7 @@ use std::time::Duration; use std::{fmt::Debug, hash::Hash}; use crate::packet::v5::server_reference_id::{BloomFilter, ServerId}; -use crate::source::{NtpSourceUpdate, SourceSnapshot}; +use crate::source::{NtpSourceUpdate, SourceSnapshot, TwoWaySource}; use crate::{NtpTimestamp, OneWaySource, OneWaySourceUpdate}; use crate::{ algorithm::{StateUpdate, TimeSyncController}, @@ -324,6 +324,19 @@ impl Result, ::Error> + { + self.ensure_controller_control()?; + let controller = self.controller.add_two_way_source(id, source_config); + self.sources.insert(id, None); + Ok(TwoWaySource::new(controller)) + } + pub fn handle_source_remove( &mut self, id: SourceId, diff --git a/ntpd/src/daemon/config/mod.rs b/ntpd/src/daemon/config/mod.rs index 9575a7d35..6eb8e2b9a 100644 --- a/ntpd/src/daemon/config/mod.rs +++ b/ntpd/src/daemon/config/mod.rs @@ -447,6 +447,7 @@ impl Config { NtpSourceConfig::Sock(_) => count += 1, #[cfg(feature = "pps")] NtpSourceConfig::Pps(_) => {} // PPS sources don't count + NtpSourceConfig::Csptp(_) => count += 1, } } count @@ -482,6 +483,7 @@ impl Config { NtpSourceConfig::Sock(_) => false, #[cfg(feature = "pps")] NtpSourceConfig::Pps(_) => false, + NtpSourceConfig::Csptp(_) => false, NtpSourceConfig::Standard(config) => { matches!(config.first.ntp_version, ProtocolVersion::V5) } diff --git a/ntpd/src/daemon/config/ntp_source.rs b/ntpd/src/daemon/config/ntp_source.rs index 88a938616..06450815d 100644 --- a/ntpd/src/daemon/config/ntp_source.rs +++ b/ntpd/src/daemon/config/ntp_source.rs @@ -399,6 +399,12 @@ impl<'de> Deserialize<'de> for PpsSourceConfig { } } +#[derive(Deserialize, Debug, PartialEq, Clone)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +pub struct CsptpSourceConfig { + pub address: CsptpAddress, +} + #[derive(Debug, Deserialize, PartialEq, Clone)] #[serde(tag = "mode")] pub enum NtpSourceConfig { @@ -415,6 +421,8 @@ pub enum NtpSourceConfig { #[cfg(feature = "pps")] #[serde(rename = "pps")] Pps(PpsSourceConfig), + #[serde(rename = "csptp")] + Csptp(CsptpSourceConfig), } /// A normalized address has a host and a port part. However, the host may be @@ -461,6 +469,9 @@ pub struct NtpAddress(pub NormalizedAddress); #[derive(Debug, Clone, PartialEq, Eq)] pub struct NtsKeAddress(pub NormalizedAddress); +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CsptpAddress(pub NormalizedAddress); + impl<'de> Deserialize<'de> for NtpAddress { fn deserialize(deserializer: D) -> Result where @@ -485,6 +496,18 @@ impl<'de> Deserialize<'de> for NtsKeAddress { } } +impl<'de> Deserialize<'de> for CsptpAddress { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Ok(NormalizedAddress::from_string_csptp(s) + .map_err(serde::de::Error::custom)? + .into()) + } +} + impl From for NtpAddress { fn from(addr: NormalizedAddress) -> Self { Self(addr) @@ -497,6 +520,12 @@ impl From for NtsKeAddress { } } +impl From for CsptpAddress { + fn from(addr: NormalizedAddress) -> Self { + Self(addr) + } +} + impl Deref for NtsKeAddress { type Target = NormalizedAddress; @@ -513,9 +542,18 @@ impl Deref for NtpAddress { } } +impl Deref for CsptpAddress { + type Target = NormalizedAddress; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + impl NormalizedAddress { const NTP_DEFAULT_PORT: u16 = 123; const NTS_KE_DEFAULT_PORT: u16 = 4460; + const CSPTP_DEFAULT_PORT: u16 = 319; /// Specifically, this adds the `:123` port if no port is specified pub(crate) fn from_string_ntp(address: String) -> std::io::Result { @@ -543,6 +581,19 @@ impl NormalizedAddress { }) } + /// Specifically, this adds the `:319` port if no port is specified + pub(crate) fn from_string_csptp(address: String) -> std::io::Result { + let (server_name, port) = Self::from_string_help(address, Self::CSPTP_DEFAULT_PORT)?; + + Ok(Self { + server_name, + port, + + #[cfg(test)] + hardcoded_dns_resolve: HardcodedDnsResolve::default(), + }) + } + fn from_string_help(address: String, default_port: u16) -> std::io::Result<(String, u16)> { if address.split(':').count() > 2 { // IPv6, try to parse it as such @@ -670,6 +721,7 @@ mod tests { NtpSourceConfig::Sock(_c) => "".to_string(), #[cfg(feature = "pps")] NtpSourceConfig::Pps(_c) => "".to_string(), + NtpSourceConfig::Csptp(c) => c.address.to_string(), } } diff --git a/ntpd/src/daemon/csptp_source.rs b/ntpd/src/daemon/csptp_source.rs new file mode 100644 index 000000000..ab61ca5ad --- /dev/null +++ b/ntpd/src/daemon/csptp_source.rs @@ -0,0 +1,60 @@ +use std::net::SocketAddr; + +use ntp_proto::{NtpClock, NtpDuration, SourceController, TwoWaySource}; +use timestamped_socket::{ + interface::InterfaceName, + socket::{Connected, Socket}, +}; +use tracing::{Instrument, Span, instrument}; + +use crate::daemon::{config::TimestampMode, ntp_source::SourceChannels, spawn::SourceId}; + +pub(crate) struct CsptpSourceTask< + C: 'static + NtpClock + Send, + Controller: SourceController, +> { + index: SourceId, + clock: C, + interface: Option, + timestamp_mode: TimestampMode, + name: String, + source_addr: SocketAddr, + socket: Option>, + channels: SourceChannels, + + source: TwoWaySource, +} + +impl> + CsptpSourceTask +{ + #[allow(clippy::too_many_arguments)] + #[instrument(level = tracing::Level::ERROR, name = "CSPTP Source", skip(timestamp_mode, clock, channels, source))] + pub fn spawn( + index: SourceId, + name: String, + source_addr: SocketAddr, + interface: Option, + clock: C, + timestamp_mode: TimestampMode, + channels: SourceChannels, + source: TwoWaySource, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn( + (async move { + let mut process = CsptpSourceTask { + index, + name, + clock, + channels, + interface, + timestamp_mode, + source_addr, + socket: None, + source, + }; + }) + .instrument(Span::current()), + ) + } +} diff --git a/ntpd/src/daemon/mod.rs b/ntpd/src/daemon/mod.rs index 8f3ee1044..875a888b7 100644 --- a/ntpd/src/daemon/mod.rs +++ b/ntpd/src/daemon/mod.rs @@ -1,6 +1,7 @@ mod clock; pub mod config; mod csptp_server; +mod csptp_source; pub mod keyexchange; mod local_ip_provider; mod ntp_source; diff --git a/ntpd/src/daemon/spawn/csptp.rs b/ntpd/src/daemon/spawn/csptp.rs new file mode 100644 index 000000000..475cfb2b0 --- /dev/null +++ b/ntpd/src/daemon/spawn/csptp.rs @@ -0,0 +1,133 @@ +use std::fmt::Display; +use std::{net::SocketAddr, ops::Deref}; + +use ntp_proto::SourceConfig; +use tokio::sync::mpsc; +use tracing::warn; + +use crate::daemon::config::CsptpSourceConfig; +use crate::daemon::spawn::{ + CsptpSourceCreateParameters, SourceCreateParameters, SourceId, SourceRemovalReason, + SourceRemovedEvent, SpawnAction, SpawnEvent, Spawner, SpawnerId, +}; + +pub struct CsptpSpawner { + id: SpawnerId, + config: CsptpSourceConfig, + source_config: SourceConfig, + resolved: Option, + has_spawned: bool, +} + +#[derive(Debug)] +pub enum CsptpSpawnError { + SendError(mpsc::error::SendError), +} + +impl Display for CsptpSpawnError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::SendError(e) => write!(f, "Channel send error: {e}"), + } + } +} + +impl From> for CsptpSpawnError { + fn from(value: mpsc::error::SendError) -> Self { + Self::SendError(value) + } +} + +impl std::error::Error for CsptpSpawnError {} + +impl CsptpSpawner { + pub fn new(config: CsptpSourceConfig, source_config: SourceConfig) -> Self { + Self { + id: Default::default(), + config, + source_config, + resolved: None, + has_spawned: false, + } + } + + async fn do_resolve(&mut self, force_resolve: bool) -> Option { + if let (false, Some(addr)) = (force_resolve, self.resolved) { + Some(addr) + } else { + match self.config.address.lookup_host().await { + Ok(mut addresses) => match addresses.next() { + None => { + warn!("Could not resolve source address, retrying"); + None + } + Some(first) => { + self.resolved = Some(first); + self.resolved + } + }, + Err(e) => { + warn!(error = ?e, "error while resolving source address, retrying"); + None + } + } + } + } +} + +impl Spawner for CsptpSpawner { + type Error = CsptpSpawnError; + + async fn try_spawn( + &mut self, + action_tx: &mpsc::Sender, + ) -> Result<(), CsptpSpawnError> { + let Some(addr) = self.do_resolve(false).await else { + return Ok(()); + }; + action_tx + .send(SpawnEvent::new( + self.id, + SpawnAction::Create(SourceCreateParameters::Csptp(CsptpSourceCreateParameters { + id: SourceId::new(), + addr, + normalized_addr: self.config.address.deref().clone(), + config: self.source_config.clone(), + nts: None, + })), + )) + .await?; + self.has_spawned = true; + Ok(()) + } + + fn is_complete(&self) -> bool { + self.has_spawned + } + + async fn handle_source_removed( + &mut self, + removed_source: SourceRemovedEvent, + ) -> Result<(), CsptpSpawnError> { + if removed_source.reason == SourceRemovalReason::Unreachable { + // force new resolution + self.resolved = None; + } + if removed_source.reason != SourceRemovalReason::Demobilized { + self.has_spawned = false; + } + Ok(()) + } + + fn get_id(&self) -> SpawnerId { + self.id + } + + fn get_addr_description(&self) -> String { + self.config.address.to_string() + } + + fn get_description(&self) -> &str { + "csptp" + } +} diff --git a/ntpd/src/daemon/spawn/mod.rs b/ntpd/src/daemon/spawn/mod.rs index df4622802..e962260e9 100644 --- a/ntpd/src/daemon/spawn/mod.rs +++ b/ntpd/src/daemon/spawn/mod.rs @@ -9,6 +9,7 @@ use tokio::{ use super::{config::NormalizedAddress, system::NETWORK_WAIT_PERIOD}; +pub mod csptp; pub mod nts; pub mod nts_pool; pub mod pool; @@ -141,6 +142,7 @@ pub enum SourceCreateParameters { Sock(SockSourceCreateParameters), #[cfg(feature = "pps")] Pps(PpsSourceCreateParameters), + Csptp(CsptpSourceCreateParameters), } impl SourceCreateParameters { @@ -150,6 +152,7 @@ impl SourceCreateParameters { Self::Sock(params) => params.id, #[cfg(feature = "pps")] Self::Pps(params) => params.id, + Self::Csptp(params) => params.id, } } @@ -159,6 +162,7 @@ impl SourceCreateParameters { Self::Sock(params) => params.path.display().to_string(), #[cfg(feature = "pps")] Self::Pps(params) => params.path.display().to_string(), + Self::Csptp(params) => params.addr.to_string(), } } } @@ -191,6 +195,15 @@ pub struct PpsSourceCreateParameters { pub period: f64, } +#[derive(Debug)] +pub struct CsptpSourceCreateParameters { + pub id: SourceId, + pub addr: SocketAddr, + pub normalized_addr: NormalizedAddress, + pub config: SourceConfig, + pub nts: Option>, +} + pub trait Spawner { type Error: std::error::Error + Send; diff --git a/ntpd/src/daemon/system.rs b/ntpd/src/daemon/system.rs index 89d03cdac..29b2822ff 100644 --- a/ntpd/src/daemon/system.rs +++ b/ntpd/src/daemon/system.rs @@ -3,8 +3,9 @@ use crate::daemon::pps_source::PpsSourceTask; use crate::daemon::{ config::CsptpServerConfig, csptp_server::CsptpServerTask, + csptp_source::CsptpSourceTask, sock_source::SockSourceTask, - spawn::{SourceCreateParameters, spawner_task}, + spawn::{SourceCreateParameters, csptp::CsptpSpawner, spawner_task}, }; use super::spawn::nts_pool::NtsPoolSpawner; @@ -160,6 +161,9 @@ pub async fn spawn { system.add_spawner(PpsSpawner::new(cfg.clone(), source_defaults_config)); } + NtpSourceConfig::Csptp(cfg) => { + system.add_spawner(CsptpSpawner::new(cfg.clone(), source_defaults_config)); + } } } @@ -559,6 +563,24 @@ impl { + let source = self.system.create_csptp_source(source_id, params.config)?; + + CsptpSourceTask::spawn( + source_id, + params.normalized_addr.to_string(), + params.addr, + self.interface, + self.clock.clone(), + self.timestamp_mode, + SourceChannels { + msg_for_system_sender: self.msg_for_system_tx.clone(), + system_update_receiver: self.system_update_sender.subscribe(), + source_snapshots: self.source_snapshots.clone(), + }, + source, + ); + } }; // Try and find a related spawner and notify that spawner. diff --git a/ntpd/src/force_sync/algorithm.rs b/ntpd/src/force_sync/algorithm.rs index 6f84e8d00..c2d9deb7e 100644 --- a/ntpd/src/force_sync/algorithm.rs +++ b/ntpd/src/force_sync/algorithm.rs @@ -177,6 +177,19 @@ impl TimeSyncController for SingleShotController { } } + fn add_two_way_source( + &mut self, + _id: Self::SourceId, + config: SourceConfig, + ) -> Self::NtpSourceController { + SingleShotSourceController:: { + delay_type: PhantomData, + min_poll_interval: config.poll_interval_limits.min, + done: false, + ignore: false, + } + } + fn remove_source(&mut self, id: Self::SourceId) { self.sources.remove(&id); } diff --git a/ntpd/src/force_sync/mod.rs b/ntpd/src/force_sync/mod.rs index 4c67928ff..bc3c84910 100644 --- a/ntpd/src/force_sync/mod.rs +++ b/ntpd/src/force_sync/mod.rs @@ -133,6 +133,7 @@ pub(crate) fn force_sync(config: Option) -> std::io::Result { match source { config::NtpSourceConfig::Standard(_) | config::NtpSourceConfig::Nts(_) + | config::NtpSourceConfig::Csptp(_) | config::NtpSourceConfig::Sock(_) => total_sources += 1, #[cfg(feature = "pps")] config::NtpSourceConfig::Pps(_) => {} // PPS sources don't count From 075ee3be31b8046d3370d4c8a0eb2556713d01b7 Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Sat, 19 Jul 2025 18:16:19 +0200 Subject: [PATCH 09/10] Implemented csptp prototype. --- ntp-proto/src/time_types.rs | 4 +- ntpd/src/daemon/csptp_server.rs | 2 +- ntpd/src/daemon/csptp_source.rs | 267 +++++++++++++++++++++++++++++++- 3 files changed, 265 insertions(+), 8 deletions(-) diff --git a/ntp-proto/src/time_types.rs b/ntp-proto/src/time_types.rs index 6185d4281..32f47da5d 100644 --- a/ntp-proto/src/time_types.rs +++ b/ntp-proto/src/time_types.rs @@ -87,7 +87,7 @@ impl NtpTimestamp { let seconds = statime .seconds .wrapping_add(EPOCH_OFFSET) - .wrapping_add(TAI_OFFSET) as u32; + .wrapping_sub(TAI_OFFSET) as u32; NtpTimestamp::from_seconds_nanos_since_ntp_era(seconds, statime.nanos) } @@ -99,7 +99,7 @@ impl NtpTimestamp { const TAI_OFFSET: u64 = 37; let seconds = (self.timestamp >> 32) - .wrapping_sub(TAI_OFFSET) + .wrapping_add(TAI_OFFSET) .wrapping_sub(EPOCH_OFFSET); let nanos = (((self.timestamp & 0xFFFFFFFF) * 1000000000) >> 32) as u32; diff --git a/ntpd/src/daemon/csptp_server.rs b/ntpd/src/daemon/csptp_server.rs index ad34a46f2..76dc70a20 100644 --- a/ntpd/src/daemon/csptp_server.rs +++ b/ntpd/src/daemon/csptp_server.rs @@ -175,7 +175,7 @@ impl CsptpServer { let mut tlvbuffer = [0u8; MAX_PACKET_SIZE]; let receive_timestamp = WireTimestamp { - seconds: timestamp.seconds as _, + seconds: (timestamp.seconds + 37) as _, nanos: timestamp.nanos, }; let send_time = self.clock.now().ok()?; diff --git a/ntpd/src/daemon/csptp_source.rs b/ntpd/src/daemon/csptp_source.rs index ab61ca5ad..abc5a1a96 100644 --- a/ntpd/src/daemon/csptp_source.rs +++ b/ntpd/src/daemon/csptp_source.rs @@ -1,13 +1,32 @@ -use std::net::SocketAddr; +use std::{ + io::Cursor, + net::{Ipv4Addr, SocketAddr}, +}; -use ntp_proto::{NtpClock, NtpDuration, SourceController, TwoWaySource}; +use ntp_proto::{ + CsptpPacket, Measurement, NtpClock, NtpDuration, NtpInstant, NtpTimestamp, + OneWaySourceSnapshot, OneWaySourceUpdate, ReferenceId, SourceController, SystemSourceUpdate, + TwoWaySource, +}; use timestamped_socket::{ interface::InterfaceName, - socket::{Connected, Socket}, + socket::{Connected, RecvResult, Socket, open_ip}, }; -use tracing::{Instrument, Span, instrument}; +use tracing::{Instrument, Span, error, instrument, warn}; -use crate::daemon::{config::TimestampMode, ntp_source::SourceChannels, spawn::SourceId}; +use crate::daemon::{ + config::TimestampMode, + exitcode, + ntp_source::{MsgForSystem, SourceChannels}, + spawn::SourceId, + util::convert_net_timestamp, +}; + +#[derive(Debug)] +enum SocketResult { + Ok, + Abort, +} pub(crate) struct CsptpSourceTask< C: 'static + NtpClock + Send, @@ -23,6 +42,9 @@ pub(crate) struct CsptpSourceTask< channels: SourceChannels, source: TwoWaySource, + + last_send_timestamp: Option, + seqid: u16, } impl> @@ -52,9 +74,244 @@ impl SocketResult { + if self.socket.is_some() { + return SocketResult::Ok; + } + + let socket_res = match self.interface { + #[cfg(target_os = "linux")] + Some(interface) => { + use timestamped_socket::socket::open_interface_udp; + + open_interface_udp( + interface, + 319, /*lets os choose*/ + self.timestamp_mode.as_interface_mode(), + None, + ) + .and_then(|socket| socket.connect(self.source_addr)) + } + _ => open_ip( + SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 319), + self.timestamp_mode.as_general_mode(), + ) + .and_then(|socket| socket.connect(self.source_addr)), + }; + + self.socket = match socket_res { + Ok(socket) => Some(socket), + Err(error) => { + warn!(?error, "Could not open socket"); + return SocketResult::Abort; + } + }; + + SocketResult::Ok + } + + async fn run(&mut self) { + let mut buf = [0u8; 1024]; + + let poll_wait = tokio::time::sleep(std::time::Duration::default()); + tokio::pin!(poll_wait); + let mut poll_wait = poll_wait; + + #[allow(clippy::large_enum_variant)] + enum SelectResult { + Timer, + Recv(Result, std::io::Error>), + SystemUpdate( + Result< + SystemSourceUpdate, + tokio::sync::broadcast::error::RecvError, + >, + ), + } + + loop { + let selected: SelectResult = tokio::select! { + () = &mut poll_wait => { + SelectResult::Timer + }, + result = self.channels.system_update_receiver.recv() => { + SelectResult::SystemUpdate(result) + }, + result = async { if let Some(ref mut socket) = self.socket { socket.recv(&mut buf).await } else { std::future::pending().await }} => { + SelectResult::Recv(result) + }, + }; + + match selected { + SelectResult::Timer => { + if matches!(self.setup_socket().await, SocketResult::Abort) { + self.channels + .msg_for_system_sender + .send(MsgForSystem::NetworkIssue(self.index)) + .await + .ok(); + self.channels + .source_snapshots + .write() + .expect("Unexpected poisoned mutex") + .remove(&self.index); + return; + } + + let mut cursor = Cursor::new(buf.as_mut_slice()); + self.seqid = self.seqid.wrapping_add(1); + let mut tlvbuffer = [0u8; 1024]; + CsptpPacket::request(&mut tlvbuffer, self.seqid) + .serialize(&mut cursor) + .unwrap(); + let packet_size = cursor.position() as usize; + let packet = &buf[..packet_size]; + + match self.clock.now() { + Err(e) => { + // we cannot determine the origin_timestamp + error!(error = ?e, "There was an error retrieving the current time"); + + // report as no permissions, since this seems the most likely + std::process::exit(exitcode::NOPERM); + } + Ok(ts) => { + self.last_send_timestamp = Some(ts); + } + } + + match self.socket.as_mut().unwrap().send(&packet).await { + Err(error) => { + warn!(?error, "poll message could not be sent"); + + match error.raw_os_error() { + Some(libc::EHOSTDOWN) + | Some(libc::EHOSTUNREACH) + | Some(libc::ENETDOWN) + | Some(libc::ENETUNREACH) => { + self.channels + .msg_for_system_sender + .send(MsgForSystem::NetworkIssue(self.index)) + .await + .ok(); + self.channels + .source_snapshots + .write() + .expect("Unexpected poisoned mutex") + .remove(&self.index); + return; + } + _ => {} + } + } + Ok(opt_send_timestamp) => { + self.channels + .source_snapshots + .write() + .expect("Unexpected poisoned mutex") + .insert( + self.index, + self.source.observe( + self.name.clone(), + self.source_addr.to_string(), + self.index, + ), + ); + + // update the last_send_timestamp with the one given by the kernel, if available + self.last_send_timestamp = opt_send_timestamp + .map(convert_net_timestamp) + .or(self.last_send_timestamp); + } + } + } + SelectResult::Recv(Ok(RecvResult { + bytes_read, + timestamp, + .. + })) => { + let packet = &buf[..bytes_read]; + let timestamp = timestamp + .map(convert_net_timestamp) + .unwrap_or_else(|| self.clock.now().unwrap()); + + let Ok(packet) = CsptpPacket::deserialize(&packet) else { + break; + }; + + let Some(response_data) = packet.get_csptp_response_data() else { + break; + }; + let Some(remote_send_timestamp) = packet.get_origin_timestamp() else { + break; + }; + + let Some(t1) = self.last_send_timestamp.take() else { + break; + }; + let t2 = NtpTimestamp::from_statime(&response_data.req_ingress_timestamp); + let t3 = NtpTimestamp::from_statime(&remote_send_timestamp); + let t4 = timestamp; + + let measurement = Measurement { + delay: (t4 - t1) - (t3 - t2), + offset: ((t2 - t1) + (t3 - t4)) / 2, + localtime: t1 + (t4 - t1) / 2, + monotime: NtpInstant::now(), + stratum: 1, + root_delay: NtpDuration::ZERO, + root_dispersion: NtpDuration::ZERO, + leap: ntp_proto::NtpLeapIndicator::NoWarning, + precision: 0, + }; + + let update = OneWaySourceUpdate { + snapshot: OneWaySourceSnapshot { + source_id: ReferenceId::PPS, + stratum: 0, + }, + message: self.source.handle_measurement(std::dbg!(measurement)), + }; + + self.channels + .msg_for_system_sender + .send(MsgForSystem::OneWaySourceUpdate(self.index, update)) + .await + .ok(); + + self.channels + .source_snapshots + .write() + .expect("Unexpected poisoned mutex") + .insert( + self.index, + self.source.observe( + self.name.clone(), + self.source_addr.to_string(), + self.index, + ), + ); + + poll_wait + .as_mut() + .reset(tokio::time::Instant::now() + std::time::Duration::from_secs(1)); + } + SelectResult::Recv(Err(_)) => { /* ignore for now */ } + SelectResult::SystemUpdate(Ok(update)) => { + self.source.handle_message(update.message); + } + SelectResult::SystemUpdate(Err(_)) => { /* ignore for now */ } + } + } + } } From c8b8408b4a1fb781b054dd886ef073124c4afce8 Mon Sep 17 00:00:00 2001 From: David Venhoek Date: Sat, 19 Jul 2025 18:29:11 +0200 Subject: [PATCH 10/10] Fix csptp timer not properly resetting. --- ntpd/src/daemon/csptp_source.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ntpd/src/daemon/csptp_source.rs b/ntpd/src/daemon/csptp_source.rs index abc5a1a96..0f0757ed2 100644 --- a/ntpd/src/daemon/csptp_source.rs +++ b/ntpd/src/daemon/csptp_source.rs @@ -12,7 +12,7 @@ use timestamped_socket::{ interface::InterfaceName, socket::{Connected, RecvResult, Socket, open_ip}, }; -use tracing::{Instrument, Span, error, instrument, warn}; +use tracing::{error, info, instrument, warn, Instrument, Span}; use crate::daemon::{ config::TimestampMode, @@ -154,6 +154,10 @@ impl { + poll_wait + .as_mut() + .reset(tokio::time::Instant::now() + std::time::Duration::from_secs(1)); + if matches!(self.setup_socket().await, SocketResult::Abort) { self.channels .msg_for_system_sender @@ -301,10 +305,6 @@ impl { /* ignore for now */ } SelectResult::SystemUpdate(Ok(update)) => {