Skip to content

Commit

Permalink
libbpf-rs: add batch lookup
Browse files Browse the repository at this point in the history
* add Map::lookup_batch and Map::lookup_and_delete_batch
* add lookup batch test to test behavior
* introduce BatchedMapIter to iterate over Map in batches
  • Loading branch information
lbrndnr authored and danielocfb committed Nov 8, 2024
1 parent 5b9ad8d commit 010994b
Show file tree
Hide file tree
Showing 3 changed files with 274 additions and 14 deletions.
1 change: 1 addition & 0 deletions libbpf-rs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
------
- Added `Program::attach_netfilter_with_opts` for attaching to netfilter
hooks
- Added `Map::lookup_batch` and `Map::lookup_and_delete_batch` method


0.24.5
Expand Down
223 changes: 210 additions & 13 deletions libbpf-rs/src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use bitflags::bitflags;
use libbpf_sys::bpf_map_info;
use libbpf_sys::bpf_obj_get_info_by_fd;

use crate::error;
use crate::util;
use crate::util::parse_ret_i32;
use crate::util::validate_bpf_ret;
Expand All @@ -42,7 +43,6 @@ pub type OpenMap<'obj> = OpenMapImpl<'obj>;
/// A mutable parsed but not yet loaded BPF map.
pub type OpenMapMut<'obj> = OpenMapImpl<'obj, Mut>;


/// Represents a parsed but not yet loaded BPF map.
///
/// This object exposes operations that need to happen before the map is created.
Expand Down Expand Up @@ -363,6 +363,57 @@ where
util::parse_ret(ret)
}

/// Internal function to batch lookup (and delete) elements from a map.
fn lookup_batch_raw<M>(
map: &M,
count: u32,
elem_flags: MapFlags,
flags: MapFlags,
delete: bool,
) -> BatchedMapIter<'_>
where
M: MapCore + ?Sized,
{
#[allow(clippy::needless_update)]
let opts = libbpf_sys::bpf_map_batch_opts {
sz: mem::size_of::<libbpf_sys::bpf_map_batch_opts>() as _,
elem_flags: elem_flags.bits(),
flags: flags.bits(),
// bpf_map_batch_opts might have padding fields on some platform
..Default::default()
};

// for maps of type BPF_MAP_TYPE_{HASH, PERCPU_HASH, LRU_HASH, LRU_PERCPU_HASH}
// the key size must be at least 4 bytes
let key_size = if map.map_type().is_hash_map() {
map.key_size().max(4)
} else {
map.key_size()
};

BatchedMapIter::new(map.as_fd(), count, key_size, map.value_size(), opts, delete)
}

/// Intneral function that returns an error for per-cpu and bloom filter maps.
fn check_not_bloom_or_percpu<M>(map: &M) -> Result<()>
where
M: MapCore + ?Sized,
{
if map.map_type().is_bloom_filter() {
return Err(Error::with_invalid_data(
"lookup_bloom_filter() must be used for bloom filter maps",
));
}
if map.map_type().is_percpu() {
return Err(Error::with_invalid_data(format!(
"lookup_percpu() must be used for per-cpu maps (type of the map is {:?})",
map.map_type(),
)));
}

Ok(())
}

#[allow(clippy::wildcard_imports)]
mod private {
use super::*;
Expand Down Expand Up @@ -410,22 +461,37 @@ pub trait MapCore: Debug + AsFd + private::Sealed {
/// must be used.
/// If the map is of type bloom_filter the function [`Self::lookup_bloom_filter()`] must be used
fn lookup(&self, key: &[u8], flags: MapFlags) -> Result<Option<Vec<u8>>> {
if self.map_type().is_bloom_filter() {
return Err(Error::with_invalid_data(
"lookup_bloom_filter() must be used for bloom filter maps",
));
}
if self.map_type().is_percpu() {
return Err(Error::with_invalid_data(format!(
"lookup_percpu() must be used for per-cpu maps (type of the map is {:?})",
self.map_type(),
)));
}

check_not_bloom_or_percpu(self)?;
let out_size = self.value_size() as usize;
lookup_raw(self, key, flags, out_size)
}

/// Returns many elements in batch mode from the map.
///
/// `count` specifies the batch size.
fn lookup_batch(
&self,
count: u32,
elem_flags: MapFlags,
flags: MapFlags,
) -> Result<BatchedMapIter<'_>> {
check_not_bloom_or_percpu(self)?;
Ok(lookup_batch_raw(self, count, elem_flags, flags, false))
}

/// Returns many elements in batch mode from the map.
///
/// `count` specifies the batch size.
fn lookup_and_delete_batch(
&self,
count: u32,
elem_flags: MapFlags,
flags: MapFlags,
) -> Result<BatchedMapIter<'_>> {
check_not_bloom_or_percpu(self)?;
Ok(lookup_batch_raw(self, count, elem_flags, flags, true))
}

/// Returns if the given value is likely present in bloom_filter as `bool`.
///
/// `value` must have exactly [`Self::value_size()`] elements.
Expand Down Expand Up @@ -1169,6 +1235,14 @@ impl MapType {
)
}

/// Returns if the map is of one of the hashmap types.
pub fn is_hash_map(&self) -> bool {
matches!(
self,
MapType::Hash | MapType::PercpuHash | MapType::LruHash | MapType::LruPercpuHash
)
}

/// Returns if the map is keyless map type as per documentation of libbpf
/// Keyless map types are: Queues, Stacks and Bloom Filters
fn is_keyless(&self) -> bool {
Expand Down Expand Up @@ -1282,6 +1356,129 @@ impl Iterator for MapKeyIter<'_> {
}
}

/// An iterator over batches of key value pairs of a BPF map.
#[derive(Debug)]
pub struct BatchedMapIter<'map> {
map_fd: BorrowedFd<'map>,
delete: bool,
count: usize,
key_size: usize,
value_size: usize,
keys: Vec<u8>,
values: Vec<u8>,
prev: Option<Vec<u8>>,
next: Vec<u8>,
batch_opts: libbpf_sys::bpf_map_batch_opts,
index: Option<usize>,
}

impl<'map> BatchedMapIter<'map> {
fn new(
map_fd: BorrowedFd<'map>,
count: u32,
key_size: u32,
value_size: u32,
batch_opts: libbpf_sys::bpf_map_batch_opts,
delete: bool,
) -> Self {
Self {
map_fd,
delete,
count: count as usize,
key_size: key_size as usize,
value_size: value_size as usize,
keys: vec![0; (count * key_size) as usize],
values: vec![0; (count * value_size) as usize],
prev: None,
next: vec![0; key_size as usize],
batch_opts,
index: None,
}
}

fn lookup_next_batch(&mut self) {
let prev = self.prev.as_ref().map_or(ptr::null(), |p| p.as_ptr());
let mut count = self.count as u32;

let ret = unsafe {
if self.delete {
libbpf_sys::bpf_map_lookup_and_delete_batch(
self.map_fd.as_raw_fd(),
prev as _,
self.next.as_mut_ptr().cast(),
self.keys.as_mut_ptr().cast(),
self.values.as_mut_ptr().cast(),
(&mut count) as *mut u32,
&self.batch_opts as *const libbpf_sys::bpf_map_batch_opts,
)
} else {
libbpf_sys::bpf_map_lookup_batch(
self.map_fd.as_raw_fd(),
prev as _,
self.next.as_mut_ptr().cast(),
self.keys.as_mut_ptr().cast(),
self.values.as_mut_ptr().cast(),
(&mut count) as *mut u32,
&self.batch_opts as *const libbpf_sys::bpf_map_batch_opts,
)
}
};

if let Err(e) = util::parse_ret(ret) {
match e.kind() {
// in this case we can trust the returned count value
error::ErrorKind::NotFound => {}
// retry with same input arguments
error::ErrorKind::Interrupted => {
return self.lookup_next_batch();
}
_ => {
self.index = None;
return;
}
}
}

self.prev = Some(self.next.clone());
self.index = Some(0);

unsafe {
self.keys.set_len(self.key_size * count as usize);
self.values.set_len(self.value_size * count as usize);
}
}
}

impl Iterator for BatchedMapIter<'_> {
type Item = (Vec<u8>, Vec<u8>);

fn next(&mut self) -> Option<Self::Item> {
let load_next_batch = match self.index {
Some(index) => {
let batch_finished = index * self.key_size >= self.keys.len();
let last_batch = self.keys.len() < self.key_size * self.count;
batch_finished && !last_batch
}
None => true,
};

if load_next_batch {
self.lookup_next_batch();
}

let index = self.index?;
let key = self.keys.chunks_exact(self.key_size).nth(index)?.to_vec();
let val = self
.values
.chunks_exact(self.value_size)
.nth(index)?
.to_vec();

self.index = Some(index + 1);
Some((key, val))
}
}

/// A convenience wrapper for [`bpf_map_info`][libbpf_sys::bpf_map_info]. It
/// provides the ability to retrieve the details of a certain map.
#[derive(Debug)]
Expand Down
64 changes: 63 additions & 1 deletion libbpf-rs/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

mod common;

use std::collections::HashMap;
use std::collections::HashSet;
use std::env::current_exe;
use std::ffi::c_int;
Expand Down Expand Up @@ -58,7 +59,6 @@ use crate::common::get_test_object_path;
use crate::common::open_test_object;
use crate::common::with_ringbuffer;


#[tag(root)]
#[test]
fn test_object_build_and_load() {
Expand Down Expand Up @@ -253,6 +253,68 @@ fn test_object_map_update_batch() {
.is_err());
}

#[tag(root)]
#[test]
fn test_object_map_lookup_batch() {
bump_rlimit_mlock();

let mut obj = get_test_object("runqslower.bpf.o");
let start = get_map_mut(&mut obj, "start");
let data = HashMap::from([
(1u32, 9999u64),
(2u32, 42u64),
(3u32, 18u64),
(4u32, 1337u64),
]);

for (key, val) in data.iter() {
assert!(start
.update(&key.to_ne_bytes(), &val.to_ne_bytes(), MapFlags::ANY)
.is_ok());
}

let elems = start
.lookup_batch(2, MapFlags::ANY, MapFlags::ANY)
.expect("failed to lookup batch")
.collect::<Vec<_>>();
assert_eq!(elems.len(), 4);

for (key, val) in elems.into_iter() {
let key = u32::from_ne_bytes(key.try_into().unwrap());
let val = u64::from_ne_bytes(val.try_into().unwrap());
assert_eq!(val, data[&key]);
}

// test lookup with batch size larger than the number of keys
let elems = start
.lookup_batch(5, MapFlags::ANY, MapFlags::ANY)
.expect("failed to lookup batch")
.collect::<Vec<_>>();
assert_eq!(elems.len(), 4);

for (key, val) in elems.into_iter() {
let key = u32::from_ne_bytes(key.try_into().unwrap());
let val = u64::from_ne_bytes(val.try_into().unwrap());
assert_eq!(val, data[&key]);
}

// test lookup and delete with batch size that does not divide total count
let elems = start
.lookup_and_delete_batch(3, MapFlags::ANY, MapFlags::ANY)
.expect("failed to lookup batch")
.collect::<Vec<_>>();
assert_eq!(elems.len(), 4);

for (key, val) in elems.into_iter() {
let key = u32::from_ne_bytes(key.try_into().unwrap());
let val = u64::from_ne_bytes(val.try_into().unwrap());
assert_eq!(val, data[&key]);
}

// Map should be empty now.
assert!(start.keys().collect::<Vec<_>>().is_empty())
}

#[tag(root)]
#[test]
fn test_object_map_delete_batch() {
Expand Down

0 comments on commit 010994b

Please sign in to comment.