diff --git a/ntpd/src/daemon/server.rs b/ntpd/src/daemon/server.rs index 71624332b..7204883ef 100644 --- a/ntpd/src/daemon/server.rs +++ b/ntpd/src/daemon/server.rs @@ -11,8 +11,8 @@ use std::{ }; use ntp_proto::{ - DecodedServerCookie, KeySet, NoCipher, NtpAssociationMode, NtpClock, NtpPacket, NtpTimestamp, - PacketParsingError, SystemSnapshot, + Cipher, DecodedServerCookie, KeySet, NoCipher, NtpAssociationMode, NtpClock, NtpPacket, + NtpTimestamp, PacketParsingError, SystemSnapshot, }; use ntp_udp::{InterfaceName, UdpSocket}; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -39,14 +39,40 @@ pub struct ServerStats { pub nts_nak_packets: Counter, } +impl ServerStats { + fn update_from(&self, accept_result: &AcceptResult<'_>) { + use AcceptResult::{Accept, CryptoNak, Deny, Ignore, RateLimit}; + + self.received_packets.inc(); + + match accept_result { + Accept { .. } => self.accepted_packets.inc(), + Ignore => self.ignored_packets.inc(), + Deny { .. } => self.denied_packets.inc(), + RateLimit { .. } => self.rate_limited_packets.inc(), + CryptoNak { .. } => self.nts_nak_packets.inc(), + }; + + if accept_result.is_nts() { + self.nts_received_packets.inc(); + match accept_result { + Accept { .. } => self.nts_accepted_packets.inc(), + Deny { .. } => self.nts_denied_packets.inc(), + RateLimit { .. } => self.nts_rate_limited_packets.inc(), + CryptoNak { .. } | Ignore => { /* counted above */ } + }; + } + } +} + #[derive(Debug, Clone, Default)] pub struct Counter { value: Arc, } impl Counter { - fn inc(&self) -> u64 { - self.value.fetch_add(1, Ordering::Relaxed) + fn inc(&self) { + self.value.fetch_add(1, Ordering::Relaxed); } pub fn get(&self) -> u64 { @@ -89,30 +115,87 @@ pub struct ServerTask { enum AcceptResult<'a> { Accept { packet: NtpPacket<'a>, - max_response_size: usize, decoded_cookie: Option, - peer_addr: SocketAddr, recv_timestamp: NtpTimestamp, }, Ignore, Deny { packet: NtpPacket<'a>, - max_response_size: usize, decoded_cookie: Option, - peer_addr: SocketAddr, }, RateLimit { packet: NtpPacket<'a>, - max_response_size: usize, decoded_cookie: Option, - peer_addr: SocketAddr, }, CryptoNak { packet: NtpPacket<'a>, - max_response_size: usize, - peer_addr: SocketAddr, }, - NetworkGone, +} + +impl AcceptResult<'_> { + fn apply_deny(self) -> Self { + // We should send deny messages only to reasonable requests + // otherwise two servers could end up in a loop of sending + // deny's to each other. + match self { + AcceptResult::Accept { + packet, + decoded_cookie, + .. + } => AcceptResult::Deny { + packet, + decoded_cookie, + }, + other => other, + } + } + + fn apply_rate_limit(self) -> Self { + match self { + AcceptResult::Accept { + packet, + decoded_cookie, + .. + } => AcceptResult::RateLimit { + packet, + decoded_cookie, + }, + other => other, + } + } + + fn is_nts(&self) -> bool { + match self { + AcceptResult::Accept { decoded_cookie, .. } + | AcceptResult::Deny { decoded_cookie, .. } + | AcceptResult::RateLimit { decoded_cookie, .. } => decoded_cookie.is_some(), + AcceptResult::CryptoNak { .. } => true, + AcceptResult::Ignore => false, + } + } + + fn kind_name(&self) -> &'static str { + match self { + AcceptResult::Accept { .. } => "Accept", + AcceptResult::Ignore => "Ignore", + AcceptResult::Deny { .. } => "Deny", + AcceptResult::RateLimit { .. } => "RateLimit", + AcceptResult::CryptoNak { .. } => "CryptoNak", + } + } +} + +#[must_use] +#[derive(Debug, Clone, Copy)] +enum SocketConnection { + KeepAlive, + Reconnect, +} + +enum FilterReason { + Deny, + Ignore, + RateLimit, } impl ServerTask { @@ -164,29 +247,33 @@ impl ServerTask { async fn serve(&mut self, rate_limiting_cutoff: Duration) { let mut cur_socket = None; loop { - let socket = if let Some(ref socket) = cur_socket { - socket - } else { - cur_socket = Some(loop { - match UdpSocket::server(self.config.listen, self.interface).await { - Ok(socket) => break socket, - Err(error) => { - warn!(?error, ?self.config.listen, ?self.interface, "Could not open server socket"); - tokio::time::sleep(self.network_wait_period).await; + // open socket if it is not already open + let socket = match &cur_socket { + Some(socket) => socket, + None => { + let new_socket = loop { + match UdpSocket::server(self.config.listen, self.interface).await { + Ok(socket) => break socket, + Err(error) => { + warn!(?error, ?self.config.listen, ?self.interface, "Could not open server socket"); + tokio::time::sleep(self.network_wait_period).await; + } } - } - }); - // system may now be wildly out of date, ensure it is always updated. - self.system = *self.system_receiver.borrow_and_update(); + }; - cur_socket.as_ref().unwrap() + // system may now be wildly out of date, ensure it is always updated. + self.system = *self.system_receiver.borrow_and_update(); + + cur_socket.insert(new_socket) + } }; let mut buf = [0_u8; MAX_PACKET_SIZE]; tokio::select! { recv_res = socket.recv(&mut buf) => { - if !self.serve_packet(socket, &buf, recv_res, rate_limiting_cutoff).await { - cur_socket = None; + match self.handle_receive(socket, &buf, recv_res, rate_limiting_cutoff).await { + SocketConnection::KeepAlive => { /* do nothing */ } + SocketConnection::Reconnect => { cur_socket = None } } }, _ = self.system_receiver.changed(), if self.system_receiver.has_changed().is_ok() => { @@ -196,321 +283,259 @@ impl ServerTask { } } - async fn serve_packet( + /// Checks if the address of the sender is on the allow or deny list and if the address already + /// did perform a request within the last `cutoff` interval + fn check_and_update_filters( + &mut self, + peer_addr: SocketAddr, + cutoff: Duration, + ) -> Result<(), FilterReason> { + match self.filter(&peer_addr.ip()) { + Some(FilterAction::Deny) => Err(FilterReason::Deny), + Some(FilterAction::Ignore) => Err(FilterReason::Ignore), + None => { + let now = Instant::now(); + if self.client_cache.is_allowed(peer_addr.ip(), now, cutoff) { + Ok(()) + } else { + Err(FilterReason::RateLimit) + } + } + } + } + + /// Handle the result of the `recv` call + /// - decide if an IO error warrants reopening the socket + /// - check if the sender address matches any of the allow, deny, or rate-limit filters + /// + /// -> call [`Self::generate_response`] to further process the packet + async fn handle_receive( &mut self, - socket: &UdpSocket, + socket: &ntp_udp::UdpSocket, buf: &[u8], recv_res: std::io::Result<(usize, SocketAddr, Option)>, rate_limiting_cutoff: Duration, - ) -> bool { - self.stats.received_packets.inc(); - let accept_result = self.accept_packet(rate_limiting_cutoff, recv_res, buf); + ) -> SocketConnection { + match recv_res { + Err(receive_error) => { + warn!(?receive_error, "could not receive packet"); - match accept_result { - AcceptResult::Accept { - packet, - max_response_size, - decoded_cookie, - peer_addr, - recv_timestamp, - } => { - self.stats.accepted_packets.inc(); - - let keyset = self.keyset.borrow().clone(); - let mut buf = [0; MAX_PACKET_SIZE]; - let mut cursor = Cursor::new(buf.as_mut_slice()); - let serialize_result = match decoded_cookie { - Some(decoded_cookie) => { - self.stats.nts_received_packets.inc(); - self.stats.nts_accepted_packets.inc(); - let response = NtpPacket::nts_timestamp_response( - &self.system, - packet, - recv_timestamp, - &self.clock, - &decoded_cookie, - &keyset, - ); - response.serialize(&mut cursor, decoded_cookie.s2c.as_ref()) - } - None => { - let response = NtpPacket::timestamp_response( - &self.system, - packet, - recv_timestamp, - &self.clock, - ); - response.serialize(&mut cursor, &NoCipher) + // For a server, we only trigger NetworkGone restarts + // on ENETDOWN. ENETUNREACH, EHOSTDOWN and EHOSTUNREACH + // do not signal restart-worthy conditions for the a + // server (they essentially indicate problems with the + // remote network/host, which is not relevant for a server). + // Furthermore, they can conceivably be triggered by a + // malicious third party, and triggering restart on them + // would then result in a denial-of-service. + match receive_error.raw_os_error() { + Some(libc::ENETDOWN) => SocketConnection::Reconnect, + _ => { + self.stats.ignored_packets.inc(); + SocketConnection::KeepAlive } + } + } + Ok((length, peer_addr, opt_timestamp)) => { + let Some(request_buf) = buf.get(..length) else { + warn!("length from socket is out of bounds. This is a bug!"); + return SocketConnection::Reconnect; }; - if let Err(serialize_err) = serialize_result { - warn!(error=?serialize_err, "Could not serialize response"); - return true; - } + // The response buffer gets the same size as the request was so we can never send + // a response that is longer than the request + let mut response_buf = [0; MAX_PACKET_SIZE]; + let response_buf = &mut response_buf[..length]; - if cursor.position() as usize > max_response_size { - warn!("Generated response that was larger than the request. This is a bug!"); - return true; - } + let Some(response) = self.handle_packet( + request_buf, + response_buf, + peer_addr, + opt_timestamp, + rate_limiting_cutoff, + ) else { + return SocketConnection::KeepAlive; + }; - if let Err(send_err) = socket - .send_to(&cursor.get_ref()[0..cursor.position() as usize], peer_addr) - .await - { + if let Err(send_err) = socket.send_to(response, peer_addr).await { self.stats.response_send_errors.inc(); debug!(error=?send_err, "Could not send response packet"); } + + SocketConnection::KeepAlive } - AcceptResult::Deny { - packet, - max_response_size, - decoded_cookie, - peer_addr, - } => { - self.stats.denied_packets.inc(); - - let mut buf = [0; MAX_PACKET_SIZE]; - let mut cursor = Cursor::new(buf.as_mut_slice()); - let serialize_result = match decoded_cookie { - Some(decoded_cookie) => { - self.stats.nts_received_packets.inc(); - self.stats.nts_denied_packets.inc(); - let response = NtpPacket::nts_deny_response(packet); - response.serialize(&mut cursor, decoded_cookie.s2c.as_ref()) - } - None => { - let response = NtpPacket::deny_response(packet); - response.serialize(&mut cursor, &NoCipher) - } - }; + } + } - if let Err(serialize_err) = serialize_result { - self.stats.response_send_errors.inc(); - warn!(error=?serialize_err, "Could not serialize response"); - return true; - } + #[instrument(level = "debug", skip_all, fields(peer_addr, size = request_buf.len(), opt_timestamp))] + fn handle_packet<'buf>( + &mut self, + request_buf: &[u8], + response_buf: &'buf mut [u8], + peer_addr: SocketAddr, + opt_timestamp: Option, + rate_limiting_cutoff: Duration, + ) -> Option<&'buf [u8]> { + let Some(timestamp) = opt_timestamp else { + debug!("received a packet without a timestamp"); + self.stats.update_from(&AcceptResult::Ignore); + return None; + }; - if cursor.position() as usize > max_response_size { - warn!("Generated response that was larger than the request. This is a bug!"); - return true; - } + // Note: packets are allowed to be bigger when including extensions. + // we don't expect many, but the client may still send them. We try + // to see if the message still makes sense with some bytes dropped. + // Messages of fewer than 48 bytes are skipped entirely + if request_buf.len() < 48 { + debug!("received packet is too small"); + self.stats.update_from(&AcceptResult::Ignore); + return None; + } - if let Err(send_err) = socket - .send_to(&cursor.get_ref()[0..cursor.position() as usize], peer_addr) - .await - { - self.stats.response_send_errors.inc(); - warn!(error=?send_err, "Could not send deny packet"); - } + let filter_result = self.check_and_update_filters(peer_addr, rate_limiting_cutoff); + if let Err(FilterReason::Ignore) = filter_result { + debug!("filters decided to ignore"); + self.stats.update_from(&AcceptResult::Ignore); + return None; + } + + // actually parse the packet. KeySet is cloned to not take a lock + let keyset = self.keyset.borrow().clone(); + let accept_result = Self::accept_data(request_buf, keyset.as_ref(), timestamp); + + // apply filters + let accept_result = match filter_result { + Ok(_) => accept_result, + Err(FilterReason::Ignore) => AcceptResult::Ignore, + Err(FilterReason::Deny) => accept_result.apply_deny(), + Err(FilterReason::RateLimit) => accept_result.apply_rate_limit(), + }; + + // update statistics + self.stats.update_from(&accept_result); + debug!(kind = accept_result.kind_name(), "Decided response"); + + let (packet, opt_cipher) = self.generate_response(accept_result)?; + let response_buf = Self::serialize_response(response_buf, packet, opt_cipher)?; + + debug!(response_size = response_buf.len(), "Generated response"); + + Some(response_buf) + } + + fn generate_response<'a>( + &self, + accept_result: AcceptResult<'a>, + ) -> Option<(NtpPacket<'a>, Option>)> { + let (packet, cipher) = match accept_result { + AcceptResult::Ignore => { + return None; } - AcceptResult::CryptoNak { + + AcceptResult::Accept { packet, - max_response_size, - peer_addr, - } => { - self.stats.nts_received_packets.inc(); - self.stats.nts_nak_packets.inc(); - - let mut buf = [0; MAX_PACKET_SIZE]; - let mut cursor = Cursor::new(buf.as_mut_slice()); - let response = NtpPacket::nts_nak_response(packet); - if let Err(serialize_err) = response.serialize(&mut cursor, &NoCipher) { - self.stats.response_send_errors.inc(); - warn!(error=?serialize_err, "Could not serialize response"); - return true; + decoded_cookie, + recv_timestamp, + } => match decoded_cookie { + Some(cookie) => { + let keyset = self.keyset.borrow().clone(); + let response = NtpPacket::nts_timestamp_response( + &self.system, + packet, + recv_timestamp, + &self.clock, + &cookie, + &keyset, + ); + (response, Some(cookie.s2c)) } + None => ( + NtpPacket::timestamp_response( + &self.system, + packet, + recv_timestamp, + &self.clock, + ), + None, + ), + }, - if cursor.position() as usize > max_response_size { - warn!("Generated response that was larger than the request. This is a bug!"); - return true; - } + AcceptResult::CryptoNak { packet } => (NtpPacket::nts_nak_response(packet), None), - if let Err(send_err) = socket - .send_to(&cursor.get_ref()[0..cursor.position() as usize], peer_addr) - .await - { - self.stats.response_send_errors.inc(); - warn!(error=?send_err, "Could not send nts nak packet"); - } - } - AcceptResult::NetworkGone => { - warn!("Server connection gone"); - return false; - } - AcceptResult::RateLimit { + AcceptResult::Deny { packet, - max_response_size, decoded_cookie, - peer_addr, - } => { - self.stats.rate_limited_packets.inc(); - - let mut buf = [0; MAX_PACKET_SIZE]; - let mut cursor = Cursor::new(buf.as_mut_slice()); - let serialize_result = match decoded_cookie { - Some(decoded_cookie) => { - self.stats.nts_received_packets.inc(); - self.stats.nts_rate_limited_packets.inc(); - let response = NtpPacket::nts_rate_limit_response(packet); - response.serialize(&mut cursor, decoded_cookie.s2c.as_ref()) - } - None => { - let response = NtpPacket::rate_limit_response(packet); - response.serialize(&mut cursor, &NoCipher) - } - }; - - if let Err(serialize_err) = serialize_result { - self.stats.response_send_errors.inc(); - warn!(error=?serialize_err, "Could not serialize response"); - return true; - } + } => match decoded_cookie { + Some(cookie) => (NtpPacket::nts_deny_response(packet), Some(cookie.s2c)), + None => (NtpPacket::deny_response(packet), None), + }, - if cursor.position() as usize > max_response_size { - warn!("Generated response that was larger than the request. This is a bug!"); - return true; - } + AcceptResult::RateLimit { + packet, + decoded_cookie, + } => match decoded_cookie { + Some(cookie) => (NtpPacket::nts_rate_limit_response(packet), Some(cookie.s2c)), + None => (NtpPacket::rate_limit_response(packet), None), + }, + }; - if let Err(send_err) = socket - .send_to(&cursor.get_ref()[0..cursor.position() as usize], peer_addr) - .await - { - self.stats.response_send_errors.inc(); - debug!(error=?send_err, "Could not send response packet"); - } - } - AcceptResult::Ignore => { - self.stats.ignored_packets.inc(); - } - } - true + Some((packet, cipher)) } - fn accept_packet<'a>( - &mut self, - rate_limiting_cutoff: Duration, - result: Result<(usize, SocketAddr, Option), std::io::Error>, - buf: &'a [u8], - ) -> AcceptResult<'a> { - match result { - Ok((size, peer_addr, Some(recv_timestamp))) if size >= 48 => { - // Note: packets are allowed to be bigger when including extensions. - // we don't expect many, but the client may still send them. We try - // to see if the message still makes sense with some bytes dropped. - // Messages of fewer than 48 bytes are skipped entirely - match self.filter(&peer_addr.ip()) { - Some(FilterAction::Deny) => { - match self.accept_data(&buf[..size], peer_addr, recv_timestamp) { - // We should send deny messages only to reasonable requests - // otherwise two servers could end up in a loop of sending - // deny's to each other. - AcceptResult::Accept { - packet, - max_response_size, - decoded_cookie, - peer_addr, - .. - } => AcceptResult::Deny { - packet, - max_response_size, - decoded_cookie, - peer_addr, - }, - v => v, - } - } - Some(FilterAction::Ignore) => AcceptResult::Ignore, - None => { - let timestamp = Instant::now(); - let cutoff = rate_limiting_cutoff; - let too_soon = - !self - .client_cache - .is_allowed(peer_addr.ip(), timestamp, cutoff); - - match self.accept_data(&buf[..size], peer_addr, recv_timestamp) { - AcceptResult::Accept { - packet, - max_response_size, - decoded_cookie, - peer_addr, - .. - } if too_soon => AcceptResult::RateLimit { - packet, - max_response_size, - decoded_cookie, - peer_addr, - }, - accept_result => accept_result, - } - } - } - } - Ok((size, _, Some(_))) => { - debug!(expected = 48, actual = size, "received packet is too small"); + /// Build a response to the given packet + fn serialize_response<'buf>( + response_buf: &'buf mut [u8], + packet: NtpPacket<'_>, + opt_cipher: Option>, + ) -> Option<&'buf [u8]> { + let mut cursor = Cursor::new(response_buf); + + let serialize_result = match opt_cipher { + Some(cipher) => packet.serialize(&mut cursor, cipher.as_ref()), + None => packet.serialize(&mut cursor, &NoCipher), + }; - AcceptResult::Ignore - } - Ok((size, _, None)) => { - debug!(?size, "received a packet without a timestamp"); + if let Err(serialize_err) = serialize_result { + warn!(error=?serialize_err, "Could not serialize response"); + return None; + } - AcceptResult::Ignore - } - Err(receive_error) => { - warn!(?receive_error, "could not receive packet"); + let end = usize::try_from(cursor.position()).expect(concat!( + "cursor.position() is always less then usize::MAX, ", + "since &[u8] can be at most usize::MAX bytes", + )); + let response_buf = cursor.into_inner(); - match receive_error.raw_os_error() { - // For a server, we only trigger NetworkGone restarts - // on ENETDOWN. ENETUNREACH, EHOSTDOWN and EHOSTUNREACH - // do not signal restart-worthy conditions for the a - // server (they essentially indicate problems with the - // remote network/host, which is not relevant for a server). - // Furthermore, they can conceivably be triggered by a - // malicious third party, and triggering restart on them - // would then result in a denial-of-service. - Some(libc::ENETDOWN) => AcceptResult::NetworkGone, - _ => AcceptResult::Ignore, - } - } - } + Some(&response_buf[..end]) } + /// Deserialize the packet and decide what our response should be + /// - check if the packet can even be deserialized + /// - check if it was successfully decrypted (and authenticated) + /// - check if it was a request packet (`Client` prior to NTPv5) fn accept_data<'a>( - &self, buf: &'a [u8], - peer_addr: SocketAddr, + keyset: &KeySet, recv_timestamp: NtpTimestamp, ) -> AcceptResult<'a> { - let keyset = self.keyset.borrow().clone(); - match NtpPacket::deserialize(buf, keyset.as_ref()) { + match NtpPacket::deserialize(buf, keyset) { Ok((packet, decoded_cookie)) => match packet.mode() { NtpAssociationMode::Client => { - trace!("NTP client request accepted from {}", peer_addr); + trace!("NTP client request accepted"); AcceptResult::Accept { packet, - max_response_size: buf.len(), decoded_cookie, - peer_addr, recv_timestamp, } } _ => { - trace!( - "NTP packet with unkown mode {:?} ignored from {}", - packet.mode(), - peer_addr - ); + trace!("NTP packet with unknown mode {:?} ignored", packet.mode()); AcceptResult::Ignore } }, Err(PacketParsingError::DecryptError(packet)) => { debug!("received packet with invalid nts cookie"); - AcceptResult::CryptoNak { - packet, - max_response_size: buf.len(), - peer_addr, - } + AcceptResult::CryptoNak { packet } } Err(e) => { debug!("received invalid packet: {e}"); @@ -591,7 +616,7 @@ mod tests { use std::time::Duration; use ntp_proto::{ - KeySetProvider, NtpDuration, NtpLeapIndicator, PollInterval, PollIntervalLimits, + KeySetProvider, NoCipher, NtpDuration, NtpLeapIndicator, PollInterval, PollIntervalLimits, ReferenceId, }; @@ -1056,6 +1081,89 @@ mod tests { server.abort(); } + + fn test_server() -> ServerTask { + let (_, system_receiver) = tokio::sync::watch::channel(SystemSnapshot::default()); + let (_, keyset) = tokio::sync::watch::channel(KeySetProvider::new(1).get()); + + ServerTask { + config: ServerConfig { + listen: "0.0.0.0:123".parse().unwrap(), + denylist: FilterList::default_denylist(), + allowlist: FilterList::default_allowlist(), + rate_limiting_cache_size: 0, + rate_limiting_cutoff: Default::default(), + }, + network_wait_period: Default::default(), + system_receiver, + keyset, + system: Default::default(), + client_cache: TimestampedCache::new(10), + clock: TestClock {}, + interface: None, + stats: Default::default(), + } + } + + #[test] + fn early_fails() { + let mut s = test_server(); + let mut resp_buf = [0; MAX_PACKET_SIZE]; + + let (req, _) = NtpPacket::poll_message(PollInterval::default()); + let req = serialize_packet_unencryped(&req); + + // No timestamp + s.stats = ServerStats::default(); + assert_eq!( + s.handle_packet( + req.as_slice(), + &mut resp_buf, + "127.0.0.1:1337".parse().unwrap(), + None, + s.config.rate_limiting_cutoff, + ), + None + ); + assert_eq!(s.stats.ignored_packets.get(), 1); + assert_eq!(s.stats.received_packets.get(), 1); + + // Too short + s.stats = ServerStats::default(); + assert!(s + .handle_packet( + &[0; 23], + &mut resp_buf, + "127.0.0.1:1337".parse().unwrap(), + Some(NtpTimestamp::default()), + s.config.rate_limiting_cutoff, + ) + .is_none()); + assert_eq!(s.stats.ignored_packets.get(), 1); + assert_eq!(s.stats.received_packets.get(), 1); + } + + #[test] + fn invalid_packet() { + let mut s = test_server(); + let mut resp_buf = [0; MAX_PACKET_SIZE]; + + let (req, _) = NtpPacket::poll_message(PollInterval::default()); + let mut req = serialize_packet_unencryped(&req); + req[0] = 0; + + assert!(s + .handle_packet( + &req, + &mut resp_buf, + "127.0.0.1:1337".parse().unwrap(), + Some(NtpTimestamp::default()), + s.config.rate_limiting_cutoff, + ) + .is_none()); + assert_eq!(s.stats.ignored_packets.get(), 1); + assert_eq!(s.stats.received_packets.get(), 1); + } } #[cfg(test)]