Skip to content

Commit

Permalink
bfd: use recvmsg so that multihop sessions actually work
Browse files Browse the repository at this point in the history
For multihop sessions to work, it's necessary to extract both the
source and destination addresses from each received packet. Achieving
this requires using lower-level APIs not directly supported by Tokio
or Rust's standard library. This commit introduces the usage of
the recvmsg() API from the nix crate to get that done. As result,
multihop sessions now work normally.

Additionally, the nix version has been updated to 0.26 to incorporate a
necessary bug fix [1].

[1] nix-rust/nix#1964

Signed-off-by: Renato Westphal <renato@opensourcerouting.org>
  • Loading branch information
rwestphal committed Jun 21, 2023
1 parent 330fdde commit f20bd0c
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ itertools = "0.10"
libc = "0.2"
maplit = "1.0"
md5 = "0.7"
nix = "0.25"
nix = "0.26"
num-derive = "0.3"
num-traits = "0.2"
prost = "0.11"
Expand Down
19 changes: 19 additions & 0 deletions holo-bfd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub enum IoError {
UdpSocketError(std::io::Error),
UdpRecvError(std::io::Error),
UdpSendError(std::io::Error),
UdpRecvMissingSourceAddr,
UdpRecvMissingAncillaryData,
}

// ===== impl Error =====
Expand Down Expand Up @@ -145,6 +147,10 @@ impl IoError {
| IoError::UdpSendError(error) => {
warn!(error = %with_source(error), "{}", self);
}
IoError::UdpRecvMissingSourceAddr
| IoError::UdpRecvMissingAncillaryData => {
warn!("{}", self);
}
}
}
}
Expand All @@ -161,6 +167,18 @@ impl std::fmt::Display for IoError {
IoError::UdpSendError(..) => {
write!(f, "failed to send UDP packet")
}
IoError::UdpRecvMissingSourceAddr => {
write!(
f,
"failed to retrieve source address from received packet"
)
}
IoError::UdpRecvMissingAncillaryData => {
write!(
f,
"failed to retrieve ancillary data from received packet"
)
}
}
}
}
Expand All @@ -171,6 +189,7 @@ impl std::error::Error for IoError {
IoError::UdpSocketError(error)
| IoError::UdpRecvError(error)
| IoError::UdpSendError(error) => Some(error),
_ => None,
}
}
}
Expand Down
154 changes: 118 additions & 36 deletions holo-bfd/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@
// See LICENSE for license details.
//

use std::net::{IpAddr, SocketAddr};
use std::io::IoSliceMut;
use std::net::{
IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6,
};
use std::ops::Deref;
use std::os::fd::AsRawFd;
use std::sync::atomic::{self, AtomicU64};
use std::sync::Arc;

use holo_utils::bfd::PathType;
use holo_utils::ip::{AddressFamily, IpAddrExt};
use holo_utils::socket::{UdpSocket, UdpSocketExt};
use holo_utils::{capabilities, Sender};
use nix::sys::socket::{self, ControlMessageOwned};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::error::SendError;

Expand Down Expand Up @@ -52,9 +58,11 @@ pub(crate) fn socket_rx(
match path_type {
PathType::IpSingleHop => match af {
AddressFamily::Ipv4 => {
socket.set_ipv4_pktinfo(true)?;
socket.set_ipv4_minttl(255)?;
}
AddressFamily::Ipv6 => {
socket.set_ipv6_pktinfo(true)?;
socket.set_min_hopcount_v6(255)?;
}
},
Expand All @@ -63,6 +71,14 @@ pub(crate) fn socket_rx(
// sessions, incoming TTL checking should be done in the
// userspace given that different peers might have different TTL
// settings.
match af {
AddressFamily::Ipv4 => {
socket.set_ipv4_pktinfo(true)?;
}
AddressFamily::Ipv6 => {
socket.set_ipv6_pktinfo(true)?;
}
}
}
}

Expand Down Expand Up @@ -144,54 +160,120 @@ pub(crate) async fn send_packet(
}
}

#[cfg(not(feature = "testing"))]
fn get_packet_src(sa: Option<&socket::SockaddrStorage>) -> Option<SocketAddr> {
sa.and_then(|sa| {
if let Some(sa) = sa.as_sockaddr_in() {
Some(SocketAddrV4::from(*sa).into())
} else if let Some(sa) = sa.as_sockaddr_in6() {
Some(SocketAddrV6::from(*sa).into())
} else {
None
}
})
}

#[cfg(not(feature = "testing"))]
fn get_packet_dst(cmsgs: socket::CmsgIterator<'_>) -> Option<IpAddr> {
for cmsg in cmsgs {
match cmsg {
ControlMessageOwned::Ipv4PacketInfo(pktinfo) => {
return Some(
Ipv4Addr::from(pktinfo.ipi_spec_dst.s_addr.to_be()).into(),
);
}
ControlMessageOwned::Ipv6PacketInfo(pktinfo) => {
return Some(Ipv6Addr::from(pktinfo.ipi6_addr.s6_addr).into());
}
_ => {}
}
}

None
}

#[cfg(not(feature = "testing"))]
pub(crate) async fn read_loop(
socket: Arc<UdpSocket>,
path_type: PathType,
udp_packet_rxp: Sender<UdpRxPacketMsg>,
) -> Result<(), SendError<UdpRxPacketMsg>> {
let mut buf = [0; 1024];
let mut iov = [IoSliceMut::new(&mut buf)];
let mut cmsgspace = nix::cmsg_space!(libc::in6_pktinfo);

loop {
// Receive data from the network.
let (_, src) = match socket.recv_from(&mut buf).await {
Ok((num_bytes, src)) => (num_bytes, src),
match socket
.async_io(tokio::io::Interest::READABLE, || {
match socket::recvmsg::<socket::SockaddrStorage>(
socket.as_raw_fd(),
&mut iov,
Some(&mut cmsgspace),
socket::MsgFlags::empty(),
) {
Ok(msg) => {
// Retrieve source and destination addresses.
let src = get_packet_src(msg.address.as_ref());
let dst = get_packet_dst(msg.cmsgs());
Ok((src, dst, msg.bytes))
}
Err(errno) => Err(errno.into()),
}
})
.await
{
Ok((src, dst, bytes)) => {
let src = match src {
Some(addr) => addr,
None => {
IoError::UdpRecvMissingSourceAddr.log();
return Ok(());
}
};
let dst = match dst {
Some(addr) => addr,
None => {
IoError::UdpRecvMissingAncillaryData.log();
return Ok(());
}
};

// Validate packet's source address.
if !src.ip().is_usable() {
Error::UdpInvalidSourceAddr(src.ip()).log();
continue;
}

// Decode packet, discarding malformed ones.
let packet = match Packet::decode(&iov[0].deref()[0..bytes]) {
Ok(packet) => packet,
Err(_) => continue,
};

// Notify the BFD main task about the received packet.
let packet_info = match path_type {
PathType::IpSingleHop => PacketInfo::IpSingleHop { src },
PathType::IpMultihop => {
let src = src.ip();
// TODO: get packet's TTL using IP_RECVTTL/IPV6_HOPLIMIT
let ttl = 255;
PacketInfo::IpMultihop { src, dst, ttl }
}
};
let msg = UdpRxPacketMsg {
packet_info,
packet,
};
udp_packet_rxp.send(msg).await?;
}
Err(error) if error.kind() == std::io::ErrorKind::Interrupted => {
// Retry if the syscall was interrupted (EINTR).
continue;
}
Err(error) => {
IoError::UdpRecvError(error).log();
continue;
}
};

// Validate packet's source address.
if !src.ip().is_usable() {
Error::UdpInvalidSourceAddr(src.ip()).log();
continue;
}

// Get packet's ancillary data.
let packet_info = match path_type {
PathType::IpSingleHop => PacketInfo::IpSingleHop { src },
PathType::IpMultihop => {
let src = src.ip();
// TODO: get packet's destination using IP_PKTINFO/IPV6_PKTINFO.
let dst = src;
// TODO: get packet's TTL using IP_RECVTTL/IPV6_HOPLIMIT.
let ttl = 255;
PacketInfo::IpMultihop { src, dst, ttl }
}
};

// Decode packet, dropping malformed ones.
let packet = match Packet::decode(&buf) {
Ok(packet) => packet,
Err(_) => continue,
};

// Notify the BFD main task about the received packet.
let msg = UdpRxPacketMsg {
packet_info,
packet,
};
udp_packet_rxp.send(msg).await?;
}
}
30 changes: 30 additions & 0 deletions holo-utils/src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ pub trait UdpSocketExt {

// Sets the value of the IPV6_MINHOPCOUNT option for this socket.
fn set_min_hopcount_v6(&self, hopcount: u8) -> Result<()>;

// Sets the value of the IP_PKTINFO option for this socket.
fn set_ipv4_pktinfo(&self, value: bool) -> Result<()>;

// Sets the value of the IPV6_RECVPKTINFO option for this socket.
fn set_ipv6_pktinfo(&self, value: bool) -> Result<()>;
}

// Extension methods for TcpSocket.
Expand Down Expand Up @@ -331,6 +337,30 @@ impl UdpSocketExt for UdpSocket {
std::mem::size_of::<i32>() as libc::socklen_t,
)
}

fn set_ipv4_pktinfo(&self, value: bool) -> Result<()> {
let optval = value as c_int;

setsockopt(
self,
libc::IPPROTO_IP,
libc::IP_PKTINFO,
&optval as *const _ as *const libc::c_void,
std::mem::size_of::<i32>() as libc::socklen_t,
)
}

fn set_ipv6_pktinfo(&self, value: bool) -> Result<()> {
let optval = value as c_int;

setsockopt(
self,
libc::IPPROTO_IPV6,
libc::IPV6_RECVPKTINFO,
&optval as *const _ as *const libc::c_void,
std::mem::size_of::<i32>() as libc::socklen_t,
)
}
}

// ===== impl TcpSocket =====
Expand Down

0 comments on commit f20bd0c

Please sign in to comment.