Skip to content

Commit ea10893

Browse files
committed
wip bmp relay
1 parent 6d7d785 commit ea10893

File tree

8 files changed

+248
-5
lines changed

8 files changed

+248
-5
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ tokio-util = { version = "0.7", features = ["codec"] }
2929
weak-table = "0.3"
3030
nibbletree = { version = "0.2", path = "./nibbletree", features = ["ipnet"] }
3131
autometrics = { version = "0.3", features = ["prometheus-exporter"] }
32-
zettabgp = "0.3.4"
32+
zettabgp = { version = "0.3.4", git = "https://github.com/wobcom/zettabgp" }
3333
hickory-resolver = "0.24"
3434
include_dir = { version = "0.7", optional = true }
3535
mime_guess = { version = "2.0", optional = true }

config.example.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ collectors:
66
collector_type: Bmp
77
bind: "[::]:11019"
88
peers:
9-
"192.0.2.1": {}
9+
"2a0e:b940:0:2:a00e:f9ff:fe1b:b7e9": {}

src/bmp_relay.rs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
use crate::compressed_attrs::decompress_route_attrs;
2+
use crate::store::make_bgp_withdraw;
3+
use crate::store::TableSelector;
4+
use crate::store_impl::InMemoryStore;
5+
use crate::table_impl::Action;
6+
use crate::table_stream::table_stream;
7+
use futures_util::pin_mut;
8+
use futures_util::StreamExt;
9+
use serde::Deserialize;
10+
use std::net::Ipv4Addr;
11+
use std::net::SocketAddr;
12+
use std::time::Duration;
13+
use tokio::io::AsyncWriteExt;
14+
use zettabgp::bmp::prelude::*;
15+
use zettabgp::prelude::*;
16+
17+
#[derive(Debug, Deserialize)]
18+
pub struct RelayConfig {
19+
table: TableSelector,
20+
monitoring_station: SocketAddr,
21+
router_id: Ipv4Addr,
22+
asn: u32,
23+
}
24+
25+
async fn run_(cfg: RelayConfig, store: InMemoryStore) -> ! {
26+
let table = store.get_table(cfg.table);
27+
let mut buf = [0; 10000];
28+
'outer: loop {
29+
let updates_stream = table_stream(&table);
30+
pin_mut!(updates_stream);
31+
32+
let mut tcp_stream = match tokio::net::TcpStream::connect(cfg.monitoring_station).await {
33+
Err(_) => {
34+
log::info!("trying to connect {}", cfg.monitoring_station);
35+
tokio::time::sleep(Duration::from_secs(5)).await;
36+
continue 'outer;
37+
}
38+
Ok(v) => v,
39+
};
40+
log::info!("connected {}", cfg.monitoring_station);
41+
42+
let fake_open_message = BgpOpenMessage {
43+
as_num: cfg.asn,
44+
caps: vec![
45+
BgpCapability::SafiIPv4u,
46+
BgpCapability::SafiIPv6u,
47+
BgpCapability::SafiVPNv4u,
48+
BgpCapability::SafiVPNv6u,
49+
BgpCapability::CapRR,
50+
BgpCapability::CapASN32(cfg.asn),
51+
],
52+
hold_time: 0,
53+
router_id: cfg.router_id,
54+
};
55+
let peer_hdr = BmpMessagePeerHeader {
56+
peertype: 3,
57+
flags: 0,
58+
peerdistinguisher: BgpRD::new(0, 0),
59+
peeraddress: "::".parse().unwrap(),
60+
asnum: cfg.asn,
61+
routerid: cfg.router_id,
62+
timestamp: 0,
63+
};
64+
let mut bmp_messages = futures_util::stream::iter([
65+
BmpMessage::Initiation(BmpMessageInitiation {
66+
str0: None,
67+
sys_descr: None,
68+
sys_name: None,
69+
}),
70+
BmpMessage::PeerUpNotification(BmpMessagePeerUp {
71+
peer: peer_hdr.clone(),
72+
localaddress: "::".parse().unwrap(),
73+
localport: 0,
74+
remoteport: 0,
75+
msg1: fake_open_message.clone(),
76+
msg2: fake_open_message,
77+
}),
78+
])
79+
.chain(updates_stream.map(|action| {
80+
let update = match action {
81+
(net, num, Action::Withdraw) => {
82+
if num != 0 {
83+
//log::warn!("add-paths table is not yet implemented");
84+
}
85+
make_bgp_withdraw(net)
86+
}
87+
(net, num, Action::Update(attrs)) => {
88+
if num != 0 {
89+
//log::warn!("add-paths table is not yet implemented");
90+
}
91+
decompress_route_attrs(&attrs).to_bgp_update(net)
92+
}
93+
};
94+
95+
BmpMessage::RouteMonitoring(BmpMessageRouteMonitoring {
96+
peer: peer_hdr.clone(),
97+
update,
98+
})
99+
}));
100+
101+
'inner: while let Some(bmp_msg) = bmp_messages.next().await {
102+
log::trace!("sending message {}: {:?}", cfg.monitoring_station, bmp_msg);
103+
let mut len = 0;
104+
match bmp_msg.encode_to(&mut buf[5..]) {
105+
Ok(i) => len += i,
106+
Err(e) => {
107+
log::warn!("error encoding BMP message {:?}: {}", bmp_msg, e);
108+
continue 'inner;
109+
}
110+
}
111+
let msg_hdr = BmpMessageHeader {
112+
version: 3,
113+
msglength: len + 5,
114+
};
115+
len += msg_hdr.encode_to(&mut buf).unwrap();
116+
117+
if let Err(e) = tcp_stream.write_all(&buf[..len]).await {
118+
log::warn!(
119+
"resetting connection {:?}, reason: {}",
120+
cfg.monitoring_station,
121+
e
122+
);
123+
continue 'outer;
124+
}
125+
}
126+
}
127+
}
128+
129+
pub async fn run(
130+
cfg: RelayConfig,
131+
store: InMemoryStore,
132+
mut shutdown: tokio::sync::watch::Receiver<bool>,
133+
) -> anyhow::Result<()> {
134+
tokio::select! {
135+
_ = run_(cfg, store) => unreachable!(),
136+
_ = shutdown.changed() => Ok(()),
137+
}
138+
}

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod api;
22
pub mod bgp_collector;
33
mod bgpdumper;
44
pub mod bmp_collector;
5+
pub mod bmp_relay;
56
mod compressed_attrs;
67
pub mod route_distinguisher;
78
pub mod store;
@@ -42,4 +43,6 @@ pub struct Config {
4243
/// Only check config and exit
4344
#[serde(default)]
4445
pub config_check: bool,
46+
#[serde(default)]
47+
pub bmp_relays: HashMap<String, bmp_relay::RelayConfig>,
4548
}

src/main.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ async fn main() -> anyhow::Result<()> {
6161
}),
6262
);
6363

64+
futures.extend(cfg.bmp_relays.into_values().map(|relay| {
65+
tokio::task::spawn(bmp_relay::run(relay, store.clone(), shutdown_rx.clone()))
66+
}));
67+
6468
let mut sigint = signal(SignalKind::interrupt())?;
6569
let mut sigterm = signal(SignalKind::terminate())?;
6670
let res = tokio::select! {

src/store.rs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,84 @@ impl Default for QueryLimits {
148148
}
149149
}
150150

151+
pub fn make_bgp_withdraw(net: IpNet) -> zettabgp::prelude::BgpUpdateMessage {
152+
use zettabgp::prelude::*;
153+
154+
BgpUpdateMessage {
155+
attrs: vec![BgpAttrItem::MPWithdraws(BgpMPWithdraws {
156+
addrs: net_to_bgp_addrs(net),
157+
})],
158+
..Default::default()
159+
}
160+
}
161+
162+
impl RouteAttrs {
163+
pub fn to_bgp_update(&self, net: IpNet) -> zettabgp::prelude::BgpUpdateMessage {
164+
use zettabgp::prelude::*;
165+
166+
let mut attrs = vec![];
167+
168+
if let Some(nexthop) = self.nexthop {
169+
attrs.push(BgpAttrItem::MPUpdates(BgpMPUpdates {
170+
nexthop: std_addr_to_bgp_addr(nexthop),
171+
addrs: net_to_bgp_addrs(net),
172+
}));
173+
} else {
174+
warn!("Can not build MPUpdates without nexthop");
175+
}
176+
177+
if let Some(communities) = &self.communities {
178+
attrs.push(BgpAttrItem::CommunityList(BgpCommunityList {
179+
value: communities
180+
.iter()
181+
.map(|(high, low)| BgpCommunity::new(((*high as u32) << 16) + *low as u32))
182+
.collect(),
183+
}));
184+
}
185+
if let Some(large_communities) = &self.large_communities {
186+
attrs.push(BgpAttrItem::LargeCommunityList(BgpLargeCommunityList {
187+
value: large_communities
188+
.iter()
189+
.cloned()
190+
.map(|(ga, ldp1, ldp2)| BgpLargeCommunity { ga, ldp1, ldp2 })
191+
.collect(),
192+
}));
193+
}
194+
195+
if let Some(med) = self.med {
196+
attrs.push(BgpAttrItem::MED(BgpMED { value: med }));
197+
}
198+
if let Some(local_pref) = self.local_pref {
199+
attrs.push(BgpAttrItem::LocalPref(BgpLocalpref { value: local_pref }));
200+
}
201+
202+
if let Some(origin) = &self.origin {
203+
attrs.push(BgpAttrItem::Origin(BgpOrigin {
204+
value: match origin {
205+
RouteOrigin::Igp => BgpAttrOrigin::Igp,
206+
RouteOrigin::Egp => BgpAttrOrigin::Egp,
207+
RouteOrigin::Incomplete => BgpAttrOrigin::Incomplete,
208+
},
209+
}));
210+
}
211+
212+
if let Some(as_path) = &self.as_path {
213+
attrs.push(BgpAttrItem::ASPath(BgpASpath {
214+
value: as_path
215+
.iter()
216+
.cloned()
217+
.map(|value| BgpAS { value })
218+
.collect(),
219+
}));
220+
}
221+
222+
BgpUpdateMessage {
223+
attrs,
224+
..Default::default()
225+
}
226+
}
227+
}
228+
151229
pub trait Store: Clone + Send + Sync + 'static {
152230
fn update_route(
153231
&self,
@@ -362,6 +440,27 @@ fn bgp_addrs_to_nets(
362440
}
363441
}
364442

443+
fn std_addr_to_bgp_addr(net: IpAddr) -> zettabgp::prelude::BgpAddr {
444+
use zettabgp::prelude::*;
445+
match net {
446+
IpAddr::V4(v4) => BgpAddr::V4(v4),
447+
IpAddr::V6(v6) => BgpAddr::V6(v6),
448+
}
449+
}
450+
fn net_to_bgp_addrs(net: IpNet) -> zettabgp::prelude::BgpAddrs {
451+
use zettabgp::prelude::*;
452+
match net {
453+
IpNet::V4(v4) => BgpAddrs::IPV4U(vec![BgpAddrV4 {
454+
addr: v4.addr(),
455+
prefixlen: v4.prefix_len(),
456+
}]),
457+
IpNet::V6(v6) => BgpAddrs::IPV6U(vec![BgpAddrV6 {
458+
addr: v6.addr(),
459+
prefixlen: v6.prefix_len(),
460+
}]),
461+
}
462+
}
463+
365464
fn bgpv4addr_to_ipnet(addr: &BgpAddrV4) -> Option<IpNet> {
366465
Ipv4Net::new(addr.addr, addr.prefixlen)
367466
.inspect_err(|_| warn!("invalid BgpAddrs prefixlen"))

src/store_impl.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ impl InMemoryStore {
5555
== query_router_id
5656
}
5757
}
58-
fn get_table(&self, sel: TableSelector) -> InMemoryTable {
58+
pub fn get_table(&self, sel: TableSelector) -> InMemoryTable {
5959
self.tables
6060
.lock()
6161
.unwrap()

0 commit comments

Comments
 (0)