Skip to content

Commit

Permalink
Add BloomFilter to system and peer state
Browse files Browse the repository at this point in the history
  • Loading branch information
tdittr committed Nov 4, 2023
1 parent fd6586c commit f3b1482
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 25 deletions.
8 changes: 4 additions & 4 deletions ntp-proto/src/packet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1054,6 +1054,10 @@ impl<'a> NtpPacket<'a> {
pub fn untrusted_extension_fields(&self) -> impl Iterator<Item = &ExtensionField> {
self.efdata.untrusted.iter()
}

pub fn push_untrusted(&mut self, ef: ExtensionField<'static>) {
self.efdata.untrusted.push(ef);
}
}

// Returns whether all uid extension fields found match the given uid, or
Expand Down Expand Up @@ -1182,10 +1186,6 @@ impl<'a> NtpPacket<'a> {
NtpHeader::V5(ref mut header) => header.root_dispersion = root_dispersion,
}
}

pub fn push_untrusted(&mut self, ef: ExtensionField<'static>) {
self.efdata.untrusted.push(ef);
}
}

impl<'a> Default for NtpPacket<'a> {
Expand Down
9 changes: 2 additions & 7 deletions ntp-proto/src/packet/v5/extension_fields.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::packet::error::ParsingError;
use crate::packet::v5::server_reference_id::BloomFilter;
use crate::ExtensionField;
use crate::packet::ExtensionField;
use std::borrow::Cow;
use std::convert::Infallible;
use std::io::Write;
Expand Down Expand Up @@ -100,12 +100,7 @@ impl ReferenceIdRequest {
let offset = usize::from(self.offset);
let payload_len = usize::from(self.payload_len);

let bytes = filter
.as_bytes()
.as_slice()
.get(offset..)?
.get(..payload_len)?
.into();
let bytes = filter.as_bytes().get(offset..)?.get(..payload_len)?.into();

Some(ReferenceIdResponse { bytes })
}
Expand Down
10 changes: 7 additions & 3 deletions ntp-proto/src/packet/v5/server_reference_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,17 @@ impl BloomFilter {
}
}

pub fn add(&mut self, other: &BloomFilter) {
for (ours, theirs) in self.0.iter_mut().zip(other.0.iter()) {
*ours |= theirs;
}
}

pub fn union<'a>(others: impl Iterator<Item = &'a BloomFilter>) -> Self {
let mut union = Self::new();

for other in others {
for (ours, theirs) in union.0.iter_mut().zip(other.0.iter()) {
*ours |= theirs;
}
union.add(other);
}

union
Expand Down
28 changes: 23 additions & 5 deletions ntp-proto/src/peer.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#[cfg(feature = "ntpv5")]
use crate::packet::v5::server_reference_id::{BloomFilter, RemoteBloomFilter};
use crate::packet::NtpHeader;
use crate::packet::{
v5::server_reference_id::{BloomFilter, RemoteBloomFilter},
ExtensionField, NtpHeader,
};
use crate::{
config::SourceDefaultsConfig,
cookiestash::CookieStash,
identifiers::ReferenceId,
packet::{Cipher, NtpAssociationMode, NtpLeapIndicator, NtpPacket, RequestIdentifier},
system::SystemSnapshot,
time_types::{NtpDuration, NtpInstant, NtpTimestamp, PollInterval},
ExtensionField,
};
use serde::{Deserialize, Serialize};
use std::{io::Cursor, net::SocketAddr};
Expand Down Expand Up @@ -221,6 +222,7 @@ impl PeerSnapshot {
pub fn accept_synchronization(
&self,
local_stratum: u8,
system: &SystemSnapshot,
) -> Result<(), AcceptSynchronizationError> {
use AcceptSynchronizationError::*;

Expand All @@ -238,10 +240,19 @@ impl PeerSnapshot {
// Note, this can only ever be an issue if the peer is not using
// hardware as its source, so ignore reference_id if stratum is 1.
if self.stratum != 1 && self.reference_id == self.our_id {
info!("Peer rejected because of detected synchornization loop");
info!("Peer rejected because of detected synchronization loop (ref id)");
return Err(Loop);
}

#[cfg(feature = "ntpv5")]
match self.bloom_filter {
Some(filter) if filter.contains_id(&system.server_id) => {
info!("Peer rejected because of detected synchronization loop (bloom filter)");
return Err(Loop);
}
_ => {}
}

// An unreachable error occurs if the server is unreachable.
if !self.reach.is_reachable() {
info!("Peer is unreachable");
Expand Down Expand Up @@ -865,10 +876,17 @@ mod test {

let mut peer = Peer::test_peer();

#[cfg(feature = "ntpv5")]
let server_id = ServerId::new(&mut rand::thread_rng());

macro_rules! accept {
() => {{
let snapshot = PeerSnapshot::from_peer(&peer);
snapshot.accept_synchronization(16)
snapshot.accept_synchronization(
16,
#[cfg(feature = "ntpv5")]
&server_id,
)
}};
}

Expand Down
22 changes: 17 additions & 5 deletions ntp-proto/src/system.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#[cfg(feature = "ntpv5")]
use rand::thread_rng;
use serde::{Deserialize, Serialize};

#[cfg(feature = "ntpv5")]
Expand Down Expand Up @@ -69,11 +67,25 @@ impl SystemSnapshot {
self.accumulated_steps_threshold = config.accumulated_step_panic_threshold;
}

pub fn update_used_peers(&mut self, mut used_peers: impl Iterator<Item = PeerSnapshot>) {
if let Some(system_peer_snapshot) = used_peers.next() {
pub fn update_used_peers(&mut self, used_peers: impl Iterator<Item = PeerSnapshot>) {
let mut used_peers = used_peers.peekable();
if let Some(system_peer_snapshot) = used_peers.peek() {
self.stratum = system_peer_snapshot.stratum.saturating_add(1);
self.reference_id = system_peer_snapshot.source_id;
}

#[cfg(feature = "ntpv5")]
{
self.bloom_filter = BloomFilter::new();
for peer in used_peers {
if let Some(bf) = &peer.bloom_filter {
self.bloom_filter.add(bf);
} else {
tracing::warn!("Using peer without a bloom filter!");
}
}
self.bloom_filter.add_id(&self.server_id);
}
}
}

Expand All @@ -87,7 +99,7 @@ impl Default for SystemSnapshot {
#[cfg(feature = "ntpv5")]
bloom_filter: BloomFilter::new(),
#[cfg(feature = "ntpv5")]
server_id: ServerId::new(&mut thread_rng()),
server_id: ServerId::new(&mut rand::thread_rng()),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion ntpd/src/daemon/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ impl<C: NtpClock, T: Wait> System<C, T> {
snapshot: PeerSnapshot,
) -> Result<(), C::Error> {
let usable = snapshot
.accept_synchronization(self.synchronization_config.local_stratum)
.accept_synchronization(self.synchronization_config.local_stratum, &self.system)
.is_ok();
self.clock_controller()?.peer_update(index, usable);
self.peers.get_mut(&index).unwrap().snapshot = Some(snapshot);
Expand Down

0 comments on commit f3b1482

Please sign in to comment.