Skip to content

Commit

Permalink
wip relay
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyuyureka committed Jan 19, 2025
1 parent 508c7ee commit 4433624
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 2 deletions.
111 changes: 111 additions & 0 deletions src/bgp_relay.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use crate::compressed_attrs::*;
use crate::store::*;
use crate::store_impl::*;
use crate::table_impl::*;
use ipnet::IpNet;
use serde::Deserialize;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::Notify;

#[derive(Debug, Deserialize)]
pub struct RelayConfig {
table: TableSelector,
}

async fn run_(
cfg: RelayConfig,
caches: Arc<Mutex<Caches>>,
queue_out: InMemoryTable<Action<RouteAttrs>>,
queue_out_notify: Arc<Notify>,
mut rx: mpsc::Receiver<(IpNet, u32, Action<Arc<CompressedRouteAttrs>>)>,
) -> anyhow::Result<()> {
let rib_out = InMemoryTable::<RouteAttrs>::new(caches);
loop {
tokio::select! {
_ = queue_out_notify.notified() => {
// the channel overflowed
let queue_out = std::mem::take(&mut *queue_out.table.lock().unwrap());
for change in queue_out.iter().flat_map(|(net, entry)| entry.iter().map(move |(num, action)| (net, *num, action.clone()))) {
match change {
(net, num, Action::Update(attrs)) => {
rib_out.update_route_compressed(num, net, attrs);

// TODO send update
}
(net, num, Action::Withdraw) => {
rib_out.withdraw_route(num, net);

// TODO send withdraw
}
}
}
}
entry = rx.recv() => {
let (net, num, action) = entry.ok_or(anyhow::anyhow!("channel closed"))?;
let rib_out_entry = rib_out.table.lock().unwrap().exact(&net).and_then(|entry| {
match entry.binary_search_by_key(&num, |(k, _)| *k) {
Ok(index) => Some(entry[index].1.clone()),
Err(_) => None,
}
});
let actual_action = match (action, rib_out_entry) {
(Action::Update(attrs), None) => Some(Action::Update(attrs)),
(Action::Update(attrs), Some(existing_route)) if attrs != existing_route => Some(Action::Update(attrs)),
(Action::Withdraw, Some(_)) => Some(Action::Withdraw),
_ => None,
};
match actual_action {
Some(Action::Update(attrs)) => {
rib_out.update_route_compressed(num, net, attrs);

// TODO send update
}
Some(Action::Withdraw) => {
rib_out.withdraw_route(num, net);

// TODO send withdraw
}
None => {},
}
}
}
}
}

pub async fn run(
cfg: RelayConfig,
store: InMemoryStore,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) -> anyhow::Result<()> {
let caches = store.caches.clone();
let table = store.get_table(cfg.table.clone());
let queue_out = InMemoryTable::<Action<RouteAttrs>>::new(caches.clone());
let queue_out_notify = Arc::new(Notify::new());
let (tx, rx) = mpsc::channel(16);
let subscriber: Arc<
dyn Fn(IpNet, u32, Action<Arc<CompressedRouteAttrs>>) + std::marker::Send + Sync,
> = {
let queue_out = queue_out.clone();
let queue_out_notify = queue_out_notify.clone();
Arc::new(move |net, num, action| {
if let Ok(permit) = tx.try_reserve() {
permit.send((net, num, action));
} else {
queue_out.update_route_compressed(num, net, action);
queue_out_notify.notify_one();
}
})
};
table.subscribe(Arc::downgrade(&subscriber));
drop(table);
drop(store);

let res = tokio::select! {
res = run_(cfg, caches, queue_out, queue_out_notify, rx) => res,
_ = shutdown.changed() => Ok(()),
};
drop(subscriber);
res
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod api;
pub mod bgp_collector;
pub mod bgp_relay;
mod bgpdumper;
pub mod bmp_collector;
mod compressed_attrs;
Expand Down Expand Up @@ -42,4 +43,6 @@ pub struct Config {
/// Only check config and exit
#[serde(default)]
pub config_check: bool,
#[serde(default)]
pub relays: Vec<bgp_relay::RelayConfig>,
}
4 changes: 4 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ async fn main() -> anyhow::Result<()> {
}),
);

futures.extend(cfg.relays.into_iter().map(|relay| {
tokio::task::spawn(bgp_relay::run(relay, store.clone(), shutdown_rx.clone()))
}));

let mut sigint = signal(SignalKind::interrupt())?;
let mut sigterm = signal(SignalKind::terminate())?;
let res = tokio::select! {
Expand Down
4 changes: 2 additions & 2 deletions src/store_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct InMemoryStore {
clients: Arc<Mutex<HashMap<SocketAddr, Client>>>,
sessions: Arc<Mutex<HashMap<SessionId, Session>>>,
tables: Arc<Mutex<HashMap<TableSelector, InMemoryTable>>>,
caches: Arc<Mutex<Caches>>,
pub caches: Arc<Mutex<Caches>>,
}

fn tables_for_client_fn(
Expand Down Expand Up @@ -54,7 +54,7 @@ impl InMemoryStore {
== query_router_id
}
}
fn get_table(&self, sel: TableSelector) -> InMemoryTable {
pub fn get_table(&self, sel: TableSelector) -> InMemoryTable {
self.tables
.lock()
.unwrap()
Expand Down

0 comments on commit 4433624

Please sign in to comment.