Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ toml = { version = ">=0.6.0,<0.9.0", default-features = false, features = ["pars
timestamped-socket = "0.2.2"
clock-steering = "0.2.1"
pps-time = "0.2.3"
statime = { git = "https://github.com/pendulum-project/statime.git", branch = "csptp-hackathon" }

# TLS
rustls23 = { package = "rustls", version = "0.23.16", features = ["logging", "std"] }
Expand Down
1 change: 1 addition & 0 deletions ntp-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ arbitrary = { workspace = true, optional = true }
aead.workspace = true
aes-siv.workspace = true
zeroize.workspace = true
statime.workspace = true

[dev-dependencies]
serde_json.workspace = true
Expand Down
15 changes: 15 additions & 0 deletions ntp-proto/src/algorithm/kalman/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,21 @@ impl<C: NtpClock, SourceId: Hash + Eq + Copy + Debug + Send + 'static> TimeSyncC
)
}

fn add_two_way_source(
&mut self,
id: Self::SourceId,
source_config: SourceConfig,
) -> Self::NtpSourceController {
self.sources.insert(id, (None, false));
KalmanSourceController::new(
id,
self.algo_config,
None,
source_config,
AveragingBuffer::default(),
)
}

fn remove_source(&mut self, id: SourceId) {
self.sources.remove(&id);
}
Expand Down
6 changes: 6 additions & 0 deletions ntp-proto/src/algorithm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ pub trait TimeSyncController: Sized + Send + 'static {
measurement_noise_estimate: f64,
period: Option<f64>,
) -> Self::OneWaySourceController;
/// Create a new two way source with given identity (used e.g. with CSPTP sources)
fn add_two_way_source(
&mut self,
id: Self::SourceId,
source_config: SourceConfig,
) -> Self::NtpSourceController;
/// Notify the controller that a previous source has gone
fn remove_source(&mut self, id: Self::SourceId);
/// Notify the controller that the status of a source (whether
Expand Down
178 changes: 178 additions & 0 deletions ntp-proto/src/csptp/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
use std::io::Cursor;

use statime::datastructures::{WireFormat, common::TlvSetBuilder};

const CSPTP_REQUEST_TLV: u16 = 0xFF00;
const CSPTP_RESPONSE_TLV: u16 = 0xFF01;

pub struct CsptpPacket<'a> {
inner: statime::datastructures::messages::Message<'a>,
}

impl<'a> CsptpPacket<'a> {
pub fn deserialize(data: &'a [u8]) -> Result<Self, statime::datastructures::WireFormatError> {
Ok(CsptpPacket {
inner: statime::datastructures::messages::Message::deserialize(data)?,
})
}

pub fn serialize(
&self,
w: &mut Cursor<&mut [u8]>,
) -> Result<(), statime::datastructures::WireFormatError> {
let start = w.position() as usize;
let bytes = self.inner.serialize(&mut w.get_mut()[start..])?;
w.set_position((start + bytes) as u64);
Ok(())
}

pub fn get_csptp_request_flags(&self) -> Option<CsptpRequestFlags> {
for tlv in self.inner.suffix.tlv() {
if tlv.tlv_type.to_primitive() == CSPTP_REQUEST_TLV {
let flags = tlv.value.get(0).copied().unwrap_or_default();
return Some(CsptpRequestFlags {
csptp_status: flags & 1 != 0,
alt_timescale: flags & 2 != 0,
});
}
}

None
}

pub fn get_csptp_response_data(&self) -> Option<CsptpResponseData> {
for tlv in self.inner.suffix.tlv() {
if tlv.tlv_type.to_primitive() == CSPTP_RESPONSE_TLV && tlv.value.len() >= 18 {
return Some(CsptpResponseData {
req_ingress_timestamp:
statime::datastructures::common::WireTimestamp::deserialize(
&tlv.value[0..10],
)
.unwrap(),
req_ingress_correction:
statime::datastructures::common::TimeInterval::deserialize(
&tlv.value[10..18],
)
.unwrap(),
});
}
}

None
}

pub fn get_origin_timestamp(&self) -> Option<statime::datastructures::common::WireTimestamp> {
match self.inner.body {
statime::datastructures::messages::MessageBody::Sync(sync_message) => {
Some(sync_message.origin_timestamp)
}
_ => None,
}
}

pub fn get_correction(&self) -> statime::datastructures::common::TimeInterval {
self.inner.header.correction_field
}

pub fn request(buffer: &'a mut [u8], sequence_id: u16) -> Self {
let flags = [0u8; 4];
let request_tlv = statime::datastructures::common::Tlv {
tlv_type: statime::datastructures::common::TlvType::Reserved(CSPTP_REQUEST_TLV),
value: flags.as_slice().into(),
};
let mut tlvs = statime::datastructures::common::TlvSetBuilder::new(buffer);
tlvs.add(request_tlv).unwrap();
Self {
inner: statime::datastructures::messages::Message {
header: statime::datastructures::messages::Header {
sdo_id: statime::config::SdoId::try_from(0x300).unwrap(),
version: statime::datastructures::messages::PtpVersion::new(2, 0).unwrap(),
domain_number: 0,
alternate_master_flag: false,
two_step_flag: false,
unicast_flag: true,
ptp_profile_specific_1: false,
ptp_profile_specific_2: false,
leap61: false,
leap59: false,
current_utc_offset_valid: false,
ptp_timescale: true,
time_tracable: false,
frequency_tracable: false,
synchronization_uncertain: false,
correction_field: Default::default(),
source_port_identity: Default::default(),
sequence_id,
log_message_interval: 0x7f,
},
body: statime::datastructures::messages::MessageBody::Sync(
statime::datastructures::messages::sync::SyncMessage {
origin_timestamp: Default::default(),
},
),
suffix: tlvs.build(),
},
}
}

pub fn timestamp_response(
buffer: &'a mut [u8],
request: CsptpPacket<'_>,
receive_timestamp: statime::datastructures::common::WireTimestamp,
send_timestamp: statime::datastructures::common::WireTimestamp,
) -> Self {
let mut tlvs = TlvSetBuilder::new(buffer);
let mut innerbuf = [0u8; 18];
receive_timestamp.serialize(&mut innerbuf[0..10]).unwrap();
request
.get_correction()
.serialize(&mut innerbuf[10..18])
.unwrap();
tlvs.add(statime::datastructures::common::Tlv {
tlv_type: statime::datastructures::common::TlvType::Reserved(CSPTP_RESPONSE_TLV),
value: innerbuf.as_slice().into(),
})
.unwrap();
Self {
inner: statime::datastructures::messages::Message {
header: statime::datastructures::messages::Header {
sdo_id: statime::config::SdoId::try_from(0x300).unwrap(),
version: statime::datastructures::messages::PtpVersion::new(2, 0).unwrap(),
domain_number: 0,
alternate_master_flag: false,
two_step_flag: false,
unicast_flag: true,
ptp_profile_specific_1: false,
ptp_profile_specific_2: false,
leap61: false,
leap59: false,
current_utc_offset_valid: false,
ptp_timescale: true,
time_tracable: false,
frequency_tracable: false,
synchronization_uncertain: false,
correction_field: Default::default(),
source_port_identity: Default::default(),
sequence_id: request.inner.header.sequence_id,
log_message_interval: 0x7f,
},
body: statime::datastructures::messages::MessageBody::Sync(
statime::datastructures::messages::sync::SyncMessage {
origin_timestamp: send_timestamp,
},
),
suffix: tlvs.build(),
},
}
}
}

pub struct CsptpRequestFlags {
pub csptp_status: bool,
pub alt_timescale: bool,
}

pub struct CsptpResponseData {
pub req_ingress_timestamp: statime::datastructures::common::WireTimestamp,
pub req_ingress_correction: statime::datastructures::common::TimeInterval,
}
4 changes: 3 additions & 1 deletion ntp-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod algorithm;
mod clock;
mod config;
mod cookiestash;
mod csptp;
mod identifiers;
mod io;
mod ipfilter;
Expand Down Expand Up @@ -118,7 +119,7 @@ mod exports {
AcceptSynchronizationError, Measurement, NtpSource, NtpSourceAction,
NtpSourceActionIterator, NtpSourceSnapshot, NtpSourceUpdate, ObservableSourceState,
OneWaySource, OneWaySourceSnapshot, OneWaySourceUpdate, ProtocolVersion, Reach,
SourceNtsData,
SourceNtsData, TwoWaySource,
};
pub use super::system::{
System, SystemAction, SystemActionIterator, SystemSnapshot, SystemSourceUpdate,
Expand Down Expand Up @@ -146,6 +147,7 @@ mod exports {
pub use crate::packet::v5::server_reference_id::{BloomFilter, ServerId};
}

pub use super::csptp::{CsptpPacket, CsptpRequestFlags, CsptpResponseData};
pub use super::generic::NtpVersion;
}

Expand Down
Loading
Loading