From fd6586c0894c8a95ca75694bebc3807dd811a6ba Mon Sep 17 00:00:00 2001 From: Tamme Dittrich <tamme@tweedegolf.com> Date: Thu, 2 Nov 2023 16:15:32 +0100 Subject: [PATCH] Implement peer handling for bloom filters --- ntp-proto/src/lib.rs | 5 + ntp-proto/src/packet/extension_fields.rs | 7 ++ ntp-proto/src/packet/mod.rs | 27 +++- ntp-proto/src/packet/v5/extension_fields.rs | 45 ++++++- .../src/packet/v5/server_reference_id.rs | 57 +++++++-- ntp-proto/src/peer.rs | 118 +++++++++++++++++- ntp-proto/src/system.rs | 21 ++++ ntpd/src/daemon/observer.rs | 12 ++ 8 files changed, 268 insertions(+), 24 deletions(-) diff --git a/ntp-proto/src/lib.rs b/ntp-proto/src/lib.rs index f18ffc209..20f85841c 100644 --- a/ntp-proto/src/lib.rs +++ b/ntp-proto/src/lib.rs @@ -69,6 +69,11 @@ mod exports { KeyExchangeClient, KeyExchangeError, KeyExchangeResult, KeyExchangeServer, NtsRecord, NtsRecordDecoder, WriteError, }; + + #[cfg(feature = "ntpv5")] + pub mod v5 { + pub use crate::packet::v5::server_reference_id::{BloomFilter, ServerId}; + } } #[cfg(feature = "__internal-api")] diff --git a/ntp-proto/src/packet/extension_fields.rs b/ntp-proto/src/packet/extension_fields.rs index 62a455e20..beebf2f23 100644 --- a/ntp-proto/src/packet/extension_fields.rs +++ b/ntp-proto/src/packet/extension_fields.rs @@ -5,6 +5,9 @@ use std::{ use crate::keyset::DecodedServerCookie; +#[cfg(feature = "ntpv5")] +use crate::packet::v5::extension_fields::{ReferenceIdRequest, ReferenceIdResponse}; + use super::{crypto::EncryptResult, error::ParsingError, Cipher, CipherProvider, Mac}; #[derive(Debug, Copy, Clone, PartialEq, Eq)] @@ -550,6 +553,10 @@ impl<'a> ExtensionField<'a> { TypeId::DraftIdentification => { EF::decode_draft_identification(message, extension_header_version) } + #[cfg(feature = "ntpv5")] + TypeId::ReferenceIdRequest => Ok(ReferenceIdRequest::decode(message)?.into()), + #[cfg(feature = "ntpv5")] + TypeId::ReferenceIdResponse => Ok(ReferenceIdResponse::decode(message).into()), type_id => EF::decode_unknown(type_id.to_type_id(), message), } } diff --git a/ntp-proto/src/packet/mod.rs b/ntp-proto/src/packet/mod.rs index fd906a827..28da2056a 100644 --- a/ntp-proto/src/packet/mod.rs +++ b/ntp-proto/src/packet/mod.rs @@ -19,7 +19,7 @@ mod extension_fields; mod mac; #[cfg(feature = "ntpv5")] -mod v5; +pub mod v5; pub use crypto::{ AesSivCmac256, AesSivCmac512, Cipher, CipherHolder, CipherProvider, DecryptError, @@ -118,7 +118,7 @@ pub struct NtpPacket<'a> { } #[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum NtpHeader { +pub enum NtpHeader { V3(NtpHeaderV3V4), V4(NtpHeaderV3V4), #[cfg(feature = "ntpv5")] @@ -126,7 +126,7 @@ enum NtpHeader { } #[derive(Debug, Copy, Clone, PartialEq, Eq)] -struct NtpHeaderV3V4 { +pub struct NtpHeaderV3V4 { leap: NtpLeapIndicator, mode: NtpAssociationMode, stratum: u8, @@ -652,7 +652,14 @@ impl<'a> NtpPacket<'a> { .untrusted .into_iter() .chain(input.efdata.authenticated) - .filter(|ef| matches!(ef, ExtensionField::UniqueIdentifier(_))) + .filter_map(|ef| match ef { + uid @ ExtensionField::UniqueIdentifier(_) => Some(uid), + ExtensionField::ReferenceIdRequest(req) => { + let response = req.to_response(&system.bloom_filter)?; + Some(ExtensionField::ReferenceIdResponse(response).into_owned()) + } + _ => None, + }) .chain(std::iter::once(ExtensionField::DraftIdentification( Cow::Borrowed(v5::DRAFT_VERSION), ))) @@ -871,6 +878,10 @@ impl<'a> NtpPacket<'a> { } } + pub fn header(&self) -> NtpHeader { + self.header + } + pub fn leap(&self) -> NtpLeapIndicator { match self.header { NtpHeader::V3(header) => header.leap, @@ -1039,6 +1050,10 @@ impl<'a> NtpPacket<'a> { } } } + + pub fn untrusted_extension_fields(&self) -> impl Iterator<Item = &ExtensionField> { + self.efdata.untrusted.iter() + } } // Returns whether all uid extension fields found match the given uid, or @@ -1167,6 +1182,10 @@ impl<'a> NtpPacket<'a> { NtpHeader::V5(ref mut header) => header.root_dispersion = root_dispersion, } } + + pub fn push_untrusted(&mut self, ef: ExtensionField<'static>) { + self.efdata.untrusted.push(ef); + } } impl<'a> Default for NtpPacket<'a> { diff --git a/ntp-proto/src/packet/v5/extension_fields.rs b/ntp-proto/src/packet/v5/extension_fields.rs index dabfc1fb3..a98ef3547 100644 --- a/ntp-proto/src/packet/v5/extension_fields.rs +++ b/ntp-proto/src/packet/v5/extension_fields.rs @@ -1,5 +1,8 @@ +use crate::packet::error::ParsingError; use crate::packet::v5::server_reference_id::BloomFilter; +use crate::ExtensionField; use std::borrow::Cow; +use std::convert::Infallible; use std::io::Write; #[derive(Debug, Copy, Clone, Eq, PartialEq)] @@ -108,12 +111,17 @@ impl ReferenceIdRequest { } pub fn serialize(&self, mut writer: impl Write) -> std::io::Result<()> { + let payload_len = self.payload_len; + let ef_len: u16 = payload_len + 4; + writer.write_all(&Type::ReferenceIdRequest.to_bits().to_be_bytes())?; + writer.write_all(&ef_len.to_be_bytes())?; writer.write_all(&self.offset.to_be_bytes())?; writer.write_all(&[0; 2])?; - let words = self.payload_len / 4; - assert_eq!(self.payload_len % 4, 0); + let words = payload_len / 4; + assert_eq!(payload_len % 4, 0); + for _ in 1..words { writer.write_all(&[0; 4])?; } @@ -121,11 +129,22 @@ impl ReferenceIdRequest { Ok(()) } - pub(crate) fn offset(&self) -> u16 { + pub fn decode(msg: &[u8]) -> Result<Self, ParsingError<Infallible>> { + let payload_len = + u16::try_from(msg.len()).expect("NTP fields can not be longer than u16::MAX"); + let offset_bytes: [u8; 2] = msg[0..2].try_into().unwrap(); + + Ok(Self { + payload_len, + offset: u16::from_be_bytes(offset_bytes), + }) + } + + pub fn offset(&self) -> u16 { self.offset } - pub(crate) fn payload_len(&self) -> u16 { + pub fn payload_len(&self) -> u16 { self.payload_len } } @@ -168,11 +187,29 @@ impl<'a> ReferenceIdResponse<'a> { Ok(()) } + pub fn decode(bytes: &'a [u8]) -> Self { + Self { + bytes: Cow::Borrowed(bytes), + } + } + pub fn bytes(&self) -> &[u8] { &*self.bytes } } +impl From<ReferenceIdRequest> for ExtensionField<'static> { + fn from(value: ReferenceIdRequest) -> Self { + Self::ReferenceIdRequest(value) + } +} + +impl<'a> From<ReferenceIdResponse<'a>> for ExtensionField<'a> { + fn from(value: ReferenceIdResponse<'a>) -> Self { + Self::ReferenceIdResponse(value) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/ntp-proto/src/packet/v5/server_reference_id.rs b/ntp-proto/src/packet/v5/server_reference_id.rs index 6d5f4d9bd..87422f53e 100644 --- a/ntp-proto/src/packet/v5/server_reference_id.rs +++ b/ntp-proto/src/packet/v5/server_reference_id.rs @@ -1,8 +1,9 @@ use crate::packet::v5::extension_fields::{ReferenceIdRequest, ReferenceIdResponse}; use crate::packet::v5::NtpClientCookie; use rand::distributions::{Distribution, Standard}; -use rand::Rng; +use rand::{thread_rng, Rng}; use std::array::from_fn; +use std::fmt::{Debug, Formatter}; #[derive(Copy, Clone, Debug)] struct U12(u16); @@ -41,7 +42,7 @@ impl TryFrom<u16> for U12 { } } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub struct ServerId([U12; 10]); impl ServerId { @@ -54,7 +55,13 @@ impl ServerId { } } -#[derive(Clone, Eq, PartialEq, Debug)] +impl Default for ServerId { + fn default() -> Self { + Self::new(&mut thread_rng()) + } +} + +#[derive(Copy, Clone, Eq, PartialEq)] pub struct BloomFilter([u8; Self::BYTES]); impl BloomFilter { pub const BYTES: usize = 512; @@ -110,6 +117,25 @@ impl<'a> FromIterator<&'a BloomFilter> for BloomFilter { } } +impl Default for BloomFilter { + fn default() -> Self { + Self::new() + } +} + +impl Debug for BloomFilter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let str: String = self + .0 + .chunks_exact(32) + .map(|chunk| chunk.iter().fold(0, |acc, b| acc | b)) + .map(|b| char::from_u32(0x2800 + b as u32).unwrap()) + .collect(); + + f.debug_tuple("BloomFilter").field(&str).finish() + } +} + pub struct RemoteBloomFilter { filter: BloomFilter, chunk_size: u16, @@ -167,7 +193,7 @@ impl RemoteBloomFilter { pub fn handle_response( &mut self, cookie: NtpClientCookie, - response: ReferenceIdResponse, + response: &ReferenceIdResponse, ) -> Result<(), ResponseHandlingError> { let Some((offset, expected_cookie)) = self.last_requested else { return Err(ResponseHandlingError::NotAwaitingResponse); @@ -199,6 +225,17 @@ impl RemoteBloomFilter { } } +impl Debug for RemoteBloomFilter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RemoteBloomFilter") + .field("chunk_size", &self.chunk_size) + .field("last_requested", &self.last_requested) + .field("next_to_request", &self.next_to_request) + .field("is_filled", &self.is_filled) + .finish() + } +} + #[derive(Debug, Copy, Clone)] pub enum ResponseHandlingError { NotAwaitingResponse, @@ -285,7 +322,7 @@ mod tests { assert!(matches!( bf.handle_response( NtpClientCookie::new_random(), - ReferenceIdResponse::new(&[0u8; 16]).unwrap() + &ReferenceIdResponse::new(&[0u8; 16]).unwrap() ), Err(NotAwaitingResponse) )); @@ -296,18 +333,18 @@ mod tests { assert_eq!(req.payload_len(), chunk_size); assert!(matches!( - bf.handle_response(cookie, ReferenceIdResponse::new(&[0; 24]).unwrap()), + bf.handle_response(cookie, &ReferenceIdResponse::new(&[0; 24]).unwrap()), Err(MismatchedLength) )); let mut wrong_cookie = cookie; wrong_cookie.0[0] ^= 0xFF; // Flip all bits in first byte assert!(matches!( - bf.handle_response(wrong_cookie, ReferenceIdResponse::new(&[0; 16]).unwrap()), + bf.handle_response(wrong_cookie, &ReferenceIdResponse::new(&[0; 16]).unwrap()), Err(MismatchedCookie) )); - bf.handle_response(cookie, ReferenceIdResponse::new(&[1; 16]).unwrap()) + bf.handle_response(cookie, &ReferenceIdResponse::new(&[1; 16]).unwrap()) .unwrap(); assert_eq!(bf.next_to_request, 16); assert_eq!(bf.last_requested, None); @@ -323,7 +360,7 @@ mod tests { assert!(bf.full_filter().is_none()); let bytes: Vec<_> = (0..req.payload_len()).map(|_| chunk as u8 + 1).collect(); let response = ReferenceIdResponse::new(&bytes).unwrap(); - bf.handle_response(cookie, response).unwrap(); + bf.handle_response(cookie, &response).unwrap(); } assert_eq!(bf.next_to_request, 0); @@ -346,7 +383,7 @@ mod tests { let cookie = NtpClientCookie::new_random(); let request = bf.next_request(cookie); let response = request.to_response(&target_filter).unwrap(); - bf.handle_response(cookie, response).unwrap(); + bf.handle_response(cookie, &response).unwrap(); } let result_filter = bf.full_filter().unwrap(); diff --git a/ntp-proto/src/peer.rs b/ntp-proto/src/peer.rs index cf199b341..2f25a28a8 100644 --- a/ntp-proto/src/peer.rs +++ b/ntp-proto/src/peer.rs @@ -1,5 +1,6 @@ -use std::{io::Cursor, net::SocketAddr}; - +#[cfg(feature = "ntpv5")] +use crate::packet::v5::server_reference_id::{BloomFilter, RemoteBloomFilter}; +use crate::packet::NtpHeader; use crate::{ config::SourceDefaultsConfig, cookiestash::CookieStash, @@ -7,8 +8,10 @@ use crate::{ packet::{Cipher, NtpAssociationMode, NtpLeapIndicator, NtpPacket, RequestIdentifier}, system::SystemSnapshot, time_types::{NtpDuration, NtpInstant, NtpTimestamp, PollInterval}, + ExtensionField, }; use serde::{Deserialize, Serialize}; +use std::{io::Cursor, net::SocketAddr}; use tracing::{debug, info, instrument, trace, warn}; const MAX_STRATUM: u8 = 16; @@ -78,6 +81,9 @@ pub struct Peer { peer_defaults_config: SourceDefaultsConfig, protocol_version: ProtocolVersion, + + #[cfg(feature = "ntpv5")] + bloom_filter: Option<RemoteBloomFilter>, } #[derive(Debug, Copy, Clone)] @@ -206,6 +212,9 @@ pub struct PeerSnapshot { pub stratum: u8, pub reference_id: ReferenceId, + + #[cfg(feature = "ntpv5")] + pub bloom_filter: Option<BloomFilter>, } impl PeerSnapshot { @@ -251,6 +260,11 @@ impl PeerSnapshot { reference_id: peer.reference_id, reach: peer.reach, poll_interval: peer.last_poll_interval, + #[cfg(feature = "ntpv5")] + bloom_filter: peer + .bloom_filter + .as_ref() + .and_then(|bf| bf.full_filter().copied()), } } } @@ -271,6 +285,8 @@ pub fn peer_snapshot() -> PeerSnapshot { our_id: ReferenceId::from_int(1), reach, poll_interval: crate::time_types::PollIntervalLimits::default().min, + #[cfg(feature = "ntpv5")] + bloom_filter: None, } } @@ -360,6 +376,9 @@ impl Peer { peer_defaults_config, protocol_version: Default::default(), // TODO make this configurable + + #[cfg(feature = "ntpv5")] + bloom_filter: None, } } @@ -394,6 +413,7 @@ impl Peer { .max(self.remote_min_poll_interval) } + #[cfg_attr(not(feature = "ntpv5"), allow(unused_mut))] pub fn generate_poll_message<'a>( &mut self, buf: &'a mut [u8], @@ -408,7 +428,7 @@ impl Peer { self.tries = self.tries.saturating_add(1); let poll_interval = self.current_poll_interval(system); - let (packet, identifier) = match &mut self.nts { + let (mut packet, identifier) = match &mut self.nts { Some(nts) => { let cookie = nts.cookies.get().ok_or_else(|| { std::io::Error::new(std::io::ErrorKind::Other, NtsError::OutOfCookies) @@ -438,6 +458,14 @@ impl Peer { // Ensure we don't spam the remote with polls if it is not reachable self.backoff_interval = poll_interval.inc(peer_defaults_config.poll_interval_limits); + #[cfg(feature = "ntpv5")] + if let NtpHeader::V5(header) = packet.header() { + if let Some(ref mut filter) = self.bloom_filter { + let req_ef = filter.next_request(header.client_cookie); + packet.push_untrusted(ExtensionField::ReferenceIdRequest(req_ef)); + } + } + // Write packet to buffer let mut cursor = Cursor::new(buf); packet.serialize( @@ -576,11 +604,10 @@ impl Peer { self.stratum = message.stratum(); self.reference_id = message.reference_id(); - // Handle new requested poll interval #[cfg(feature = "ntpv5")] - if message.version() == 5 { + if let NtpHeader::V5(header) = message.header() { + // Handle new requested poll interval let requested_poll = message.poll(); - if requested_poll > self.remote_min_poll_interval { debug!( ?requested_poll, @@ -589,6 +616,24 @@ impl Peer { ); self.remote_min_poll_interval = requested_poll; } + + // Update our bloom filter + if let Some(filter) = &mut self.bloom_filter { + let bloom_responses = + message + .untrusted_extension_fields() + .filter_map(|ef| match ef { + ExtensionField::ReferenceIdResponse(response) => Some(response), + _ => None, + }); + + for ref_id in bloom_responses { + let result = filter.handle_response(header.client_cookie, ref_id); + if let Err(err) = result { + info!(?err, "Invalid ReferenceIdResponse from peer, ignoring...") + } + } + } } // generate a measurement @@ -643,6 +688,9 @@ impl Peer { peer_defaults_config: SourceDefaultsConfig::default(), protocol_version: Default::default(), + + #[cfg(feature = "ntpv5")] + bloom_filter: Some(RemoteBloomFilter::new(16).unwrap()), } } } @@ -680,6 +728,10 @@ mod test { use crate::{packet::NoCipher, time_types::PollIntervalLimits, NtpClock}; use super::*; + #[cfg(feature = "ntpv5")] + use crate::packet::v5::server_reference_id::{BloomFilter, ServerId}; + #[cfg(feature = "ntpv5")] + use rand::thread_rng; use std::time::Duration; #[derive(Debug, Clone, Default)] @@ -1275,4 +1327,58 @@ mod test { let (poll, _) = NtpPacket::deserialize(poll, &NoCipher).unwrap(); assert_eq!(poll.version(), 5); } + + #[cfg(feature = "ntpv5")] + #[test] + fn bloom_filters_will_synchronize_at_some_point() { + let mut server_filter = BloomFilter::new(); + server_filter.add_id(&ServerId::new(&mut thread_rng())); + + let mut client = Peer::test_peer(); + client.protocol_version = ProtocolVersion::V5; + + let clock = TestClock::default(); + let system = SystemSnapshot::default(); + + let mut server_system = SystemSnapshot::default(); + server_system.bloom_filter = server_filter.clone(); + + let mut tries = 0; + + while client + .bloom_filter + .as_ref() + .unwrap() + .full_filter() + .is_none() + && tries < 100 + { + let mut buf = [0; 1024]; + let req = client + .generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default()) + .unwrap(); + + let (req, _) = NtpPacket::deserialize(req, &NoCipher).unwrap(); + let response = + NtpPacket::timestamp_response(&server_system, req, NtpTimestamp::default(), &clock); + let resp_bytes = response.serialize_without_encryption_vec().unwrap(); + + client + .handle_incoming( + system, + &resp_bytes, + NtpInstant::now(), + NtpTimestamp::default(), + NtpTimestamp::default(), + ) + .unwrap(); + + tries += 1; + } + + assert_eq!( + Some(&server_filter), + client.bloom_filter.unwrap().full_filter() + ); + } } diff --git a/ntp-proto/src/system.rs b/ntp-proto/src/system.rs index e1db76079..75cfbad8f 100644 --- a/ntp-proto/src/system.rs +++ b/ntp-proto/src/system.rs @@ -1,5 +1,9 @@ +#[cfg(feature = "ntpv5")] +use rand::thread_rng; use serde::{Deserialize, Serialize}; +#[cfg(feature = "ntpv5")] +use crate::packet::v5::server_reference_id::{BloomFilter, ServerId}; use crate::{ config::SynchronizationConfig, identifiers::ReferenceId, @@ -48,6 +52,15 @@ pub struct SystemSnapshot { /// Timekeeping data #[serde(flatten)] pub time_snapshot: TimeSnapshot, + + #[cfg(feature = "ntpv5")] + /// Bloom filter that contains all currently used time sources + #[serde(skip)] + pub bloom_filter: BloomFilter, + #[cfg(feature = "ntpv5")] + /// NTPv5 reference ID for this instance + #[serde(skip)] + pub server_id: ServerId, } impl SystemSnapshot { @@ -71,6 +84,10 @@ impl Default for SystemSnapshot { reference_id: ReferenceId::NONE, accumulated_steps_threshold: None, time_snapshot: TimeSnapshot::default(), + #[cfg(feature = "ntpv5")] + bloom_filter: BloomFilter::new(), + #[cfg(feature = "ntpv5")] + server_id: ServerId::new(&mut thread_rng()), } } } @@ -108,6 +125,8 @@ mod tests { reach: Default::default(), stratum: 2, reference_id: ReferenceId::NONE, + #[cfg(feature = "ntpv5")] + bloom_filter: None, }, PeerSnapshot { source_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), @@ -117,6 +136,8 @@ mod tests { reach: Default::default(), stratum: 3, reference_id: ReferenceId::NONE, + #[cfg(feature = "ntpv5")] + bloom_filter: None, }, ] .into_iter(), diff --git a/ntpd/src/daemon/observer.rs b/ntpd/src/daemon/observer.rs index cc6b6888d..bbec05a51 100644 --- a/ntpd/src/daemon/observer.rs +++ b/ntpd/src/daemon/observer.rs @@ -132,8 +132,12 @@ async fn observer( #[cfg(test)] mod tests { + #[cfg(feature = "unstable_ntpv5")] + use rand::thread_rng; use std::{borrow::BorrowMut, time::Duration}; + #[cfg(feature = "unstable_ntpv5")] + use ntp_proto::v5::{BloomFilter, ServerId}; use ntp_proto::{ NtpClock, NtpDuration, NtpLeapIndicator, NtpTimestamp, PollInterval, PollIntervalLimits, Reach, ReferenceId, TimeSnapshot, @@ -227,6 +231,10 @@ mod tests { leap_indicator: NtpLeapIndicator::Leap59, accumulated_steps: NtpDuration::ZERO, }, + #[cfg(feature = "unstable_ntpv5")] + bloom_filter: BloomFilter::new(), + #[cfg(feature = "unstable_ntpv5")] + server_id: ServerId::new(&mut thread_rng()), }); let handle = tokio::spawn(async move { @@ -293,6 +301,10 @@ mod tests { leap_indicator: NtpLeapIndicator::Leap59, accumulated_steps: NtpDuration::ZERO, }, + #[cfg(feature = "unstable_ntpv5")] + bloom_filter: BloomFilter::new(), + #[cfg(feature = "unstable_ntpv5")] + server_id: ServerId::new(&mut thread_rng()), }); let handle = tokio::spawn(async move {