From 4433624cb28941fdfc999697cbbe71aa58907b26 Mon Sep 17 00:00:00 2001 From: Yureka Date: Fri, 17 Jan 2025 01:41:59 +0100 Subject: [PATCH] wip relay --- src/bgp_relay.rs | 111 ++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 ++ src/main.rs | 4 ++ src/store_impl.rs | 4 +- 4 files changed, 120 insertions(+), 2 deletions(-) create mode 100644 src/bgp_relay.rs diff --git a/src/bgp_relay.rs b/src/bgp_relay.rs new file mode 100644 index 0000000..bbd6d88 --- /dev/null +++ b/src/bgp_relay.rs @@ -0,0 +1,111 @@ +use crate::compressed_attrs::*; +use crate::store::*; +use crate::store_impl::*; +use crate::table_impl::*; +use ipnet::IpNet; +use serde::Deserialize; +use std::sync::Arc; +use std::sync::Mutex; +use tokio::sync::mpsc; +use tokio::sync::Notify; + +#[derive(Debug, Deserialize)] +pub struct RelayConfig { + table: TableSelector, +} + +async fn run_( + cfg: RelayConfig, + caches: Arc>, + queue_out: InMemoryTable>, + queue_out_notify: Arc, + mut rx: mpsc::Receiver<(IpNet, u32, Action>)>, +) -> anyhow::Result<()> { + let rib_out = InMemoryTable::::new(caches); + loop { + tokio::select! { + _ = queue_out_notify.notified() => { + // the channel overflowed + let queue_out = std::mem::take(&mut *queue_out.table.lock().unwrap()); + for change in queue_out.iter().flat_map(|(net, entry)| entry.iter().map(move |(num, action)| (net, *num, action.clone()))) { + match change { + (net, num, Action::Update(attrs)) => { + rib_out.update_route_compressed(num, net, attrs); + + // TODO send update + } + (net, num, Action::Withdraw) => { + rib_out.withdraw_route(num, net); + + // TODO send withdraw + } + } + } + } + entry = rx.recv() => { + let (net, num, action) = entry.ok_or(anyhow::anyhow!("channel closed"))?; + let rib_out_entry = rib_out.table.lock().unwrap().exact(&net).and_then(|entry| { + match entry.binary_search_by_key(&num, |(k, _)| *k) { + Ok(index) => Some(entry[index].1.clone()), + Err(_) => None, + } + }); + let actual_action = match (action, rib_out_entry) { + (Action::Update(attrs), None) => Some(Action::Update(attrs)), + (Action::Update(attrs), Some(existing_route)) if attrs != existing_route => Some(Action::Update(attrs)), + (Action::Withdraw, Some(_)) => Some(Action::Withdraw), + _ => None, + }; + match actual_action { + Some(Action::Update(attrs)) => { + rib_out.update_route_compressed(num, net, attrs); + + // TODO send update + } + Some(Action::Withdraw) => { + rib_out.withdraw_route(num, net); + + // TODO send withdraw + } + None => {}, + } + } + } + } +} + +pub async fn run( + cfg: RelayConfig, + store: InMemoryStore, + mut shutdown: tokio::sync::watch::Receiver, +) -> anyhow::Result<()> { + let caches = store.caches.clone(); + let table = store.get_table(cfg.table.clone()); + let queue_out = InMemoryTable::>::new(caches.clone()); + let queue_out_notify = Arc::new(Notify::new()); + let (tx, rx) = mpsc::channel(16); + let subscriber: Arc< + dyn Fn(IpNet, u32, Action>) + std::marker::Send + Sync, + > = { + let queue_out = queue_out.clone(); + let queue_out_notify = queue_out_notify.clone(); + Arc::new(move |net, num, action| { + if let Ok(permit) = tx.try_reserve() { + permit.send((net, num, action)); + } else { + queue_out.update_route_compressed(num, net, action); + queue_out_notify.notify_one(); + } + }) + }; + table.subscribe(Arc::downgrade(&subscriber)); + drop(table); + drop(store); + + let res = tokio::select! { + res = run_(cfg, caches, queue_out, queue_out_notify, rx) => res, + _ = shutdown.changed() => Ok(()), + }; + drop(subscriber); + res +} diff --git a/src/lib.rs b/src/lib.rs index c829180..9efe6bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ pub mod api; pub mod bgp_collector; +pub mod bgp_relay; mod bgpdumper; pub mod bmp_collector; mod compressed_attrs; @@ -42,4 +43,6 @@ pub struct Config { /// Only check config and exit #[serde(default)] pub config_check: bool, + #[serde(default)] + pub relays: Vec, } diff --git a/src/main.rs b/src/main.rs index 69e22f3..76dd9db 100644 --- a/src/main.rs +++ b/src/main.rs @@ -55,6 +55,10 @@ async fn main() -> anyhow::Result<()> { }), ); + futures.extend(cfg.relays.into_iter().map(|relay| { + tokio::task::spawn(bgp_relay::run(relay, store.clone(), shutdown_rx.clone())) + })); + let mut sigint = signal(SignalKind::interrupt())?; let mut sigterm = signal(SignalKind::terminate())?; let res = tokio::select! { diff --git a/src/store_impl.rs b/src/store_impl.rs index bc32c28..7474d45 100644 --- a/src/store_impl.rs +++ b/src/store_impl.rs @@ -23,7 +23,7 @@ pub struct InMemoryStore { clients: Arc>>, sessions: Arc>>, tables: Arc>>, - caches: Arc>, + pub caches: Arc>, } fn tables_for_client_fn( @@ -54,7 +54,7 @@ impl InMemoryStore { == query_router_id } } - fn get_table(&self, sel: TableSelector) -> InMemoryTable { + pub fn get_table(&self, sel: TableSelector) -> InMemoryTable { self.tables .lock() .unwrap()