From f3b1482b5597b276a9a537b0b6d592234bebd0f4 Mon Sep 17 00:00:00 2001 From: Tamme Dittrich Date: Thu, 2 Nov 2023 17:28:05 +0100 Subject: [PATCH] Add BloomFilter to system and peer state --- ntp-proto/src/packet/mod.rs | 8 +++--- ntp-proto/src/packet/v5/extension_fields.rs | 9 ++---- .../src/packet/v5/server_reference_id.rs | 10 +++++-- ntp-proto/src/peer.rs | 28 +++++++++++++++---- ntp-proto/src/system.rs | 22 +++++++++++---- ntpd/src/daemon/system.rs | 2 +- 6 files changed, 54 insertions(+), 25 deletions(-) diff --git a/ntp-proto/src/packet/mod.rs b/ntp-proto/src/packet/mod.rs index 28da2056a..deda39743 100644 --- a/ntp-proto/src/packet/mod.rs +++ b/ntp-proto/src/packet/mod.rs @@ -1054,6 +1054,10 @@ impl<'a> NtpPacket<'a> { pub fn untrusted_extension_fields(&self) -> impl Iterator { self.efdata.untrusted.iter() } + + pub fn push_untrusted(&mut self, ef: ExtensionField<'static>) { + self.efdata.untrusted.push(ef); + } } // Returns whether all uid extension fields found match the given uid, or @@ -1182,10 +1186,6 @@ impl<'a> NtpPacket<'a> { NtpHeader::V5(ref mut header) => header.root_dispersion = root_dispersion, } } - - pub fn push_untrusted(&mut self, ef: ExtensionField<'static>) { - self.efdata.untrusted.push(ef); - } } impl<'a> Default for NtpPacket<'a> { diff --git a/ntp-proto/src/packet/v5/extension_fields.rs b/ntp-proto/src/packet/v5/extension_fields.rs index a98ef3547..0e8f416b6 100644 --- a/ntp-proto/src/packet/v5/extension_fields.rs +++ b/ntp-proto/src/packet/v5/extension_fields.rs @@ -1,6 +1,6 @@ use crate::packet::error::ParsingError; use crate::packet::v5::server_reference_id::BloomFilter; -use crate::ExtensionField; +use crate::packet::ExtensionField; use std::borrow::Cow; use std::convert::Infallible; use std::io::Write; @@ -100,12 +100,7 @@ impl ReferenceIdRequest { let offset = usize::from(self.offset); let payload_len = usize::from(self.payload_len); - let bytes = filter - .as_bytes() - .as_slice() - .get(offset..)? - .get(..payload_len)? - .into(); + let bytes = filter.as_bytes().get(offset..)?.get(..payload_len)?.into(); Some(ReferenceIdResponse { bytes }) } diff --git a/ntp-proto/src/packet/v5/server_reference_id.rs b/ntp-proto/src/packet/v5/server_reference_id.rs index 87422f53e..0511aeb7a 100644 --- a/ntp-proto/src/packet/v5/server_reference_id.rs +++ b/ntp-proto/src/packet/v5/server_reference_id.rs @@ -80,13 +80,17 @@ impl BloomFilter { } } + pub fn add(&mut self, other: &BloomFilter) { + for (ours, theirs) in self.0.iter_mut().zip(other.0.iter()) { + *ours |= theirs; + } + } + pub fn union<'a>(others: impl Iterator) -> Self { let mut union = Self::new(); for other in others { - for (ours, theirs) in union.0.iter_mut().zip(other.0.iter()) { - *ours |= theirs; - } + union.add(other); } union diff --git a/ntp-proto/src/peer.rs b/ntp-proto/src/peer.rs index 2f25a28a8..1081daf33 100644 --- a/ntp-proto/src/peer.rs +++ b/ntp-proto/src/peer.rs @@ -1,6 +1,8 @@ #[cfg(feature = "ntpv5")] -use crate::packet::v5::server_reference_id::{BloomFilter, RemoteBloomFilter}; -use crate::packet::NtpHeader; +use crate::packet::{ + v5::server_reference_id::{BloomFilter, RemoteBloomFilter}, + ExtensionField, NtpHeader, +}; use crate::{ config::SourceDefaultsConfig, cookiestash::CookieStash, @@ -8,7 +10,6 @@ use crate::{ packet::{Cipher, NtpAssociationMode, NtpLeapIndicator, NtpPacket, RequestIdentifier}, system::SystemSnapshot, time_types::{NtpDuration, NtpInstant, NtpTimestamp, PollInterval}, - ExtensionField, }; use serde::{Deserialize, Serialize}; use std::{io::Cursor, net::SocketAddr}; @@ -221,6 +222,7 @@ impl PeerSnapshot { pub fn accept_synchronization( &self, local_stratum: u8, + system: &SystemSnapshot, ) -> Result<(), AcceptSynchronizationError> { use AcceptSynchronizationError::*; @@ -238,10 +240,19 @@ impl PeerSnapshot { // Note, this can only ever be an issue if the peer is not using // hardware as its source, so ignore reference_id if stratum is 1. if self.stratum != 1 && self.reference_id == self.our_id { - info!("Peer rejected because of detected synchornization loop"); + info!("Peer rejected because of detected synchronization loop (ref id)"); return Err(Loop); } + #[cfg(feature = "ntpv5")] + match self.bloom_filter { + Some(filter) if filter.contains_id(&system.server_id) => { + info!("Peer rejected because of detected synchronization loop (bloom filter)"); + return Err(Loop); + } + _ => {} + } + // An unreachable error occurs if the server is unreachable. if !self.reach.is_reachable() { info!("Peer is unreachable"); @@ -865,10 +876,17 @@ mod test { let mut peer = Peer::test_peer(); + #[cfg(feature = "ntpv5")] + let server_id = ServerId::new(&mut rand::thread_rng()); + macro_rules! accept { () => {{ let snapshot = PeerSnapshot::from_peer(&peer); - snapshot.accept_synchronization(16) + snapshot.accept_synchronization( + 16, + #[cfg(feature = "ntpv5")] + &server_id, + ) }}; } diff --git a/ntp-proto/src/system.rs b/ntp-proto/src/system.rs index 75cfbad8f..e48cf559e 100644 --- a/ntp-proto/src/system.rs +++ b/ntp-proto/src/system.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "ntpv5")] -use rand::thread_rng; use serde::{Deserialize, Serialize}; #[cfg(feature = "ntpv5")] @@ -69,11 +67,25 @@ impl SystemSnapshot { self.accumulated_steps_threshold = config.accumulated_step_panic_threshold; } - pub fn update_used_peers(&mut self, mut used_peers: impl Iterator) { - if let Some(system_peer_snapshot) = used_peers.next() { + pub fn update_used_peers(&mut self, used_peers: impl Iterator) { + let mut used_peers = used_peers.peekable(); + if let Some(system_peer_snapshot) = used_peers.peek() { self.stratum = system_peer_snapshot.stratum.saturating_add(1); self.reference_id = system_peer_snapshot.source_id; } + + #[cfg(feature = "ntpv5")] + { + self.bloom_filter = BloomFilter::new(); + for peer in used_peers { + if let Some(bf) = &peer.bloom_filter { + self.bloom_filter.add(bf); + } else { + tracing::warn!("Using peer without a bloom filter!"); + } + } + self.bloom_filter.add_id(&self.server_id); + } } } @@ -87,7 +99,7 @@ impl Default for SystemSnapshot { #[cfg(feature = "ntpv5")] bloom_filter: BloomFilter::new(), #[cfg(feature = "ntpv5")] - server_id: ServerId::new(&mut thread_rng()), + server_id: ServerId::new(&mut rand::thread_rng()), } } } diff --git a/ntpd/src/daemon/system.rs b/ntpd/src/daemon/system.rs index 21a8b30ce..4297fe84c 100644 --- a/ntpd/src/daemon/system.rs +++ b/ntpd/src/daemon/system.rs @@ -447,7 +447,7 @@ impl System { snapshot: PeerSnapshot, ) -> Result<(), C::Error> { let usable = snapshot - .accept_synchronization(self.synchronization_config.local_stratum) + .accept_synchronization(self.synchronization_config.local_stratum, &self.system) .is_ok(); self.clock_controller()?.peer_update(index, usable); self.peers.get_mut(&index).unwrap().snapshot = Some(snapshot);