Skip to content

Commit

Permalink
Always use BloomFilter with NTPv5 even if we are only a client
Browse files Browse the repository at this point in the history
  • Loading branch information
tdittr committed Nov 4, 2023
1 parent f3b1482 commit 0d2e3e1
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 56 deletions.
17 changes: 7 additions & 10 deletions ntp-proto/src/packet/v5/extension_fields.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct ReferenceIdRequest {
}

impl ReferenceIdRequest {
pub fn new(payload_len: u16, offset: u16) -> Option<Self> {
pub const fn new(payload_len: u16, offset: u16) -> Option<Self> {
if payload_len % 4 != 0 {
return None;
}
Expand All @@ -93,10 +93,7 @@ impl ReferenceIdRequest {
})
}

pub fn to_response<'filter>(
&self,
filter: &'filter BloomFilter,
) -> Option<ReferenceIdResponse<'filter>> {
pub fn to_response(self, filter: &BloomFilter) -> Option<ReferenceIdResponse> {
let offset = usize::from(self.offset);
let payload_len = usize::from(self.payload_len);

Expand Down Expand Up @@ -135,11 +132,11 @@ impl ReferenceIdRequest {
})
}

pub fn offset(&self) -> u16 {
pub const fn offset(&self) -> u16 {
self.offset
}

pub fn payload_len(&self) -> u16 {
pub const fn payload_len(&self) -> u16 {
self.payload_len
}
}
Expand All @@ -150,7 +147,7 @@ pub struct ReferenceIdResponse<'a> {
}

impl<'a> ReferenceIdResponse<'a> {
pub fn new(bytes: &'a [u8]) -> Option<Self> {
pub const fn new(bytes: &'a [u8]) -> Option<Self> {
if bytes.len() % 4 != 0 {
return None;
}
Expand Down Expand Up @@ -182,14 +179,14 @@ impl<'a> ReferenceIdResponse<'a> {
Ok(())
}

pub fn decode(bytes: &'a [u8]) -> Self {
pub const fn decode(bytes: &'a [u8]) -> Self {
Self {
bytes: Cow::Borrowed(bytes),
}
}

pub fn bytes(&self) -> &[u8] {
&*self.bytes
&self.bytes
}
}

Expand Down
80 changes: 34 additions & 46 deletions ntp-proto/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ pub struct Peer {
protocol_version: ProtocolVersion,

#[cfg(feature = "ntpv5")]
bloom_filter: Option<RemoteBloomFilter>,
// TODO we only need this if we run as a server
bloom_filter: RemoteBloomFilter,
}

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -222,7 +223,7 @@ impl PeerSnapshot {
pub fn accept_synchronization(
&self,
local_stratum: u8,
system: &SystemSnapshot,
#[cfg_attr(not(feature = "ntpv5"), allow(unused_variables))] system: &SystemSnapshot,
) -> Result<(), AcceptSynchronizationError> {
use AcceptSynchronizationError::*;

Expand Down Expand Up @@ -272,10 +273,7 @@ impl PeerSnapshot {
reach: peer.reach,
poll_interval: peer.last_poll_interval,
#[cfg(feature = "ntpv5")]
bloom_filter: peer
.bloom_filter
.as_ref()
.and_then(|bf| bf.full_filter().copied()),
bloom_filter: peer.bloom_filter.full_filter().copied(),
}
}
}
Expand Down Expand Up @@ -389,7 +387,7 @@ impl Peer {
protocol_version: Default::default(), // TODO make this configurable

#[cfg(feature = "ntpv5")]
bloom_filter: None,
bloom_filter: RemoteBloomFilter::new(16).expect("16 is a valid chunk size"),
}
}

Expand Down Expand Up @@ -471,10 +469,8 @@ impl Peer {

#[cfg(feature = "ntpv5")]
if let NtpHeader::V5(header) = packet.header() {
if let Some(ref mut filter) = self.bloom_filter {
let req_ef = filter.next_request(header.client_cookie);
packet.push_untrusted(ExtensionField::ReferenceIdRequest(req_ef));
}
let req_ef = self.bloom_filter.next_request(header.client_cookie);
packet.push_untrusted(ExtensionField::ReferenceIdRequest(req_ef));
}

// Write packet to buffer
Expand Down Expand Up @@ -629,20 +625,19 @@ impl Peer {
}

// Update our bloom filter
if let Some(filter) = &mut self.bloom_filter {
let bloom_responses =
message
.untrusted_extension_fields()
.filter_map(|ef| match ef {
ExtensionField::ReferenceIdResponse(response) => Some(response),
_ => None,
});

for ref_id in bloom_responses {
let result = filter.handle_response(header.client_cookie, ref_id);
if let Err(err) = result {
info!(?err, "Invalid ReferenceIdResponse from peer, ignoring...")
}
let bloom_responses = message
.untrusted_extension_fields()
.filter_map(|ef| match ef {
ExtensionField::ReferenceIdResponse(response) => Some(response),
_ => None,
});

for ref_id in bloom_responses {
let result = self
.bloom_filter
.handle_response(header.client_cookie, ref_id);
if let Err(err) = result {
info!(?err, "Invalid ReferenceIdResponse from peer, ignoring...")
}
}
}
Expand Down Expand Up @@ -701,7 +696,7 @@ impl Peer {
protocol_version: Default::default(),

#[cfg(feature = "ntpv5")]
bloom_filter: Some(RemoteBloomFilter::new(16).unwrap()),
bloom_filter: RemoteBloomFilter::new(16).unwrap(),
}
}
}
Expand Down Expand Up @@ -876,17 +871,18 @@ mod test {

let mut peer = Peer::test_peer();

#[cfg_attr(not(feature = "ntpv5"), allow(unused_mut))]
let mut system = SystemSnapshot::default();

#[cfg(feature = "ntpv5")]
let server_id = ServerId::new(&mut rand::thread_rng());
{
system.server_id = ServerId::new(&mut rand::thread_rng());
}

macro_rules! accept {
() => {{
let snapshot = PeerSnapshot::from_peer(&peer);
snapshot.accept_synchronization(
16,
#[cfg(feature = "ntpv5")]
&server_id,
)
snapshot.accept_synchronization(16, &system)
}};
}

Expand Down Expand Up @@ -1358,19 +1354,14 @@ mod test {
let clock = TestClock::default();
let system = SystemSnapshot::default();

let mut server_system = SystemSnapshot::default();
server_system.bloom_filter = server_filter.clone();
let server_system = SystemSnapshot {
bloom_filter: server_filter,
..Default::default()
};

let mut tries = 0;

while client
.bloom_filter
.as_ref()
.unwrap()
.full_filter()
.is_none()
&& tries < 100
{
while client.bloom_filter.full_filter().is_none() && tries < 100 {
let mut buf = [0; 1024];
let req = client
.generate_poll_message(&mut buf, system, &SourceDefaultsConfig::default())
Expand All @@ -1394,9 +1385,6 @@ mod test {
tries += 1;
}

assert_eq!(
Some(&server_filter),
client.bloom_filter.unwrap().full_filter()
);
assert_eq!(Some(&server_filter), client.bloom_filter.full_filter());
}
}

0 comments on commit 0d2e3e1

Please sign in to comment.