Skip to content

Commit

Permalink
Bootstrapping robustness.
Browse files Browse the repository at this point in the history
  • Loading branch information
Emil Lai committed Aug 30, 2023
1 parent a9000af commit 8a8427f
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased changes
- Do not reset banned peers on startup by default.
- When the node starts up, then try to connect to previously connected peers instead of
relying on the bootstrapper to serve the first set of peers.

## 6.1.1

Expand Down
9 changes: 9 additions & 0 deletions concordium-node/src/bin/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,15 +399,24 @@ fn instantiate_node(
P2PNode::new(Some(node_id), conf, PeerType::Node, stats_export_service, regenesis_arc)
}

/// Establish initial connections to peers on the network.
/// 1. Try connect to the provided "given_nodes" if configured.
/// 2. Try connect to the peers that the node was last connected to (if any).
/// 3. Try connect to the bootstrapper configured (if any) in order to advertise
/// ourself such that the boostrapper node can provide us to freshly joining
/// peers.
fn establish_connections(conf: &config::Config, node: &Arc<P2PNode>) -> anyhow::Result<()> {
info!("Starting the P2P layer");
connect_to_config_nodes(node);
peers::connect_to_stored_nodes(node)?;
if !conf.connection.no_bootstrap_dns {
attempt_bootstrap(node);
}
Ok(())
}

/// Try establish connections to the peers that the node is configured with via
/// [`NodeConfig::given_addresses`].
fn connect_to_config_nodes(node: &Arc<P2PNode>) {
// clone the addresses to release the lock before the relatively expensive
// connect calls.
Expand Down
14 changes: 8 additions & 6 deletions concordium-node/src/p2p/connectivity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,25 +157,27 @@ impl P2PNode {

/// Shut down connections with the given poll tokens.
/// Returns `true` if any connections were removed, and `false` otherwise.
pub fn remove_connections(&self, tokens: &[Token]) -> bool {
pub fn remove_connections(&self, tokens: &[Token]) -> (bool, Vec<RemotePeer>) {
// This is not implemented as a simple iteration using remove_connection because
// that would require more lock acquisitions and calls to bump_last_peer_update.
let conn_candidates = &mut lock_or_die!(self.conn_candidates());
let connections = &mut write_or_die!(self.connections());

let mut removed_peers = false;
let mut has_removed_peers = false;
let mut removed_peers = vec![];
let mut removed_candidates = false;
for token in tokens {
if conn_candidates.remove(token).is_some() {
removed_candidates = true;
} else if connections.remove(token).is_some() {
removed_peers = true;
} else if let Some(removed_peer) = connections.remove(token) {
removed_peers.push(removed_peer.remote_peer);
has_removed_peers = true;
}
}
if removed_peers {
if has_removed_peers {
self.bump_last_peer_update();
}
removed_candidates || removed_peers
(removed_candidates || has_removed_peers, removed_peers)
}

/// Close connection to the given address, if any.
Expand Down
17 changes: 14 additions & 3 deletions concordium-node/src/p2p/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
p2p::{
bans::BanId,
connectivity::{accept, connect, connection_housekeeping, AcceptFailureReason, SELF_TOKEN},
peers::check_peers,
peers::{check_peers, persist_peer, remove_persisted_peer},
},
plugins::consensus::{check_peer_states, update_peer_list},
read_or_die, spawn_or_die,
Expand Down Expand Up @@ -801,6 +801,9 @@ fn process_conn_change(node: &Arc<P2PNode>, conn_change: ConnChange) {
if !is_connected {
conns.insert(conn.token(), conn);
node.bump_last_peer_update();
// insert the peer in the lmdb store so the node can
// reconnect to the peer if the node restarts.
persist_peer(node, addr);
} else {
warn!("Already connected to a peer on the given address.")
}
Expand Down Expand Up @@ -842,15 +845,23 @@ fn process_conn_change(node: &Arc<P2PNode>, conn_change: ConnChange) {
);
node.stats.soft_banned_peers.inc();
node.stats.soft_banned_peers_total.inc();
remove_persisted_peer(node, remote_peer.addr);
}
}
ConnChange::RemovalByToken(token) => {
trace!("Removing connection with token {:?}", token);
node.remove_connection(token);
if let Some(remote_peer) = node.remove_connection(token) {
remove_persisted_peer(node, remote_peer.addr);
}
}
ConnChange::RemoveAllByTokens(tokens) => {
trace!("Removing connections with tokens {:?}", tokens);
node.remove_connections(&tokens);
let (_, removed_peers) = node.remove_connections(&tokens);
for p in removed_peers {
// If any connections were dropped, remove them
// from the database.
remove_persisted_peer(node, p.addr);
}
}
}
}
Expand Down
142 changes: 140 additions & 2 deletions concordium-node/src/p2p/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,19 @@ use crate::{
connection::Connection,
netmsg,
network::NetworkRequest,
p2p::{maintenance::attempt_bootstrap, P2PNode},
p2p::{connectivity::connect, maintenance::attempt_bootstrap, P2PNode},
read_or_die,
};
use anyhow::ensure;
use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt};
use chrono::Utc;
use concordium_base::common::{Buffer, Deserial, Serial};
use prometheus::core::Atomic;
use std::sync::{atomic::Ordering, Arc};
use rkv::{StoreOptions, Value};
use std::{
net::SocketAddr,
sync::{atomic::Ordering, Arc},
};

impl P2PNode {
/// Obtain the list of statistics from all the peers, optionally of a
Expand Down Expand Up @@ -169,9 +175,122 @@ fn calculate_average_throughput(
Ok((avg_bps_in, avg_bps_out))
}

const PEERS_STORE_NAME: &str = "peers";

/// A peer that is stored in the lmdb database.
/// This is just a newtype over a [`SocketAddr`] so
/// it is possible to implement the necessary serializing/deserializing traits.
#[derive(Debug, PartialEq, Eq)]
pub struct StoredPeer(SocketAddr);

impl From<SocketAddr> for StoredPeer {
fn from(addr: SocketAddr) -> Self { StoredPeer(addr) }
}

impl Serial for StoredPeer {
fn serial<W: Buffer + WriteBytesExt>(&self, target: &mut W) {
match self.0 {
SocketAddr::V4(addr) => {
target.write_u8(0).expect("surely we can write to memory");
target.write_u16::<BigEndian>(addr.port()).expect("surely we can write to memory");
target
.write_u32::<BigEndian>(BigEndian::read_u32(&addr.ip().octets()))
.expect("surely we can write to memory");
}
SocketAddr::V6(addr) => {
target.write_u8(1).expect("surely we can write to memory");
target.write_u16::<BigEndian>(addr.port()).expect("surely we can write to memory");
target
.write_u128::<BigEndian>(BigEndian::read_u128(&addr.ip().octets()))
.expect("surely we can write to memory");
}
}
}
}

impl Deserial for StoredPeer {
fn deserial<R: ReadBytesExt>(source: &mut R) -> anyhow::Result<Self> {
let peer = match source.read_u8()? {
0 => {
let port = source.read_u16::<BigEndian>()?;
let ip = source.read_u32::<BigEndian>()?;
let mut buf = [0; 4];
BigEndian::write_u32(&mut buf, ip);
Self(SocketAddr::new(std::net::IpAddr::V4(buf.into()), port))
}
1 => {
let port = source.read_u16::<BigEndian>()?;
let ip = source.read_u128::<BigEndian>()?;
let mut buf = [0; 16];
BigEndian::write_u128(&mut buf, ip);
Self(SocketAddr::new(std::net::IpAddr::V6(buf.into()), port))
}
_ => anyhow::bail!("Unsupported type of `StoredPeer`"),
};
Ok(peer)
}
}

/// Persist the [`SocketAddr`] of a peer.
pub fn persist_peer(node: &Arc<P2PNode>, peer_addr: SocketAddr) {
if let Ok(kv) = node.kvs.read() {
let peers_store = kv.open_single(PEERS_STORE_NAME, StoreOptions::create()).expect("foo");
let mut buf = Vec::new();
let stored_peer: StoredPeer = peer_addr.into();
stored_peer.serial(&mut buf);
let mut writer = kv.write().expect("foo");
peers_store.put(&mut writer, buf, &Value::U64(0)).expect("foo");
writer.commit().expect("foo");
} else {
warn!("Could not acqure lock over lmdb");
};
}

/// Remove a peer from the persisted peer database.
pub fn remove_persisted_peer(node: &Arc<P2PNode>, peer_addr: SocketAddr) {
if let Ok(kv) = node.kvs.read() {
let peers_store = kv.open_single(PEERS_STORE_NAME, StoreOptions::create()).expect("foo");
let mut buf = Vec::new();
let stored_peer: StoredPeer = peer_addr.into();
stored_peer.serial(&mut buf);
let mut writer = kv.write().expect("foo");
peers_store.delete(&mut writer, buf).expect("foo");
writer.commit().expect("foo");
} else {
warn!("Could not acqure lock over lmdb");
};
}

/// Try connect to previosly connected peers if
/// anyone is stored.
/// Note that as opposed to [`connect_to_config_nodes`] this function respects
/// the maximum peers configured for the node.
pub fn connect_to_stored_nodes(node: &Arc<P2PNode>) -> anyhow::Result<()> {
if let Ok(kvs_env) = node.kvs.read() {
let peers_store = kvs_env.open_single(PEERS_STORE_NAME, StoreOptions::create())?;
let peers_reader = kvs_env.read()?;
let peers_iter = peers_store.iter_start(&peers_reader)?;
for entry in peers_iter {
let (mut peer_bytes, _expiry) = entry?;
if let Err(e) =
connect(node, PeerType::Node, StoredPeer::deserial(&mut peer_bytes)?.0, None, true)
{
warn!("could not connect to previosly connected peer {}", e);
}
}
Ok(())
} else {
anyhow::bail!("could not read previosly connected peers: cannot obtain lock for the kvs");
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::{
io::Cursor,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
};

#[test]
fn test_average_throughput() {
Expand All @@ -193,4 +312,23 @@ mod tests {
"Calculation should fail since time difference is negative."
);
}

#[test]
fn test_serial_deserial_stored_peer() -> anyhow::Result<()> {
let stored_peer_v4: StoredPeer =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080).into();
let mut buf = Vec::new();
stored_peer_v4.serial(&mut buf);
let deserialized_peer_v4 = StoredPeer::deserial(&mut Cursor::new(&buf))?;
assert_eq!(stored_peer_v4, deserialized_peer_v4);

let stored_peer_v6: StoredPeer =
SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 8080).into();
let mut buf = Vec::new();
stored_peer_v6.serial(&mut buf);
let deserialized_peer_v6 = StoredPeer::deserial(&mut Cursor::new(&buf))?;
assert_eq!(stored_peer_v6, deserialized_peer_v6);

Ok(())
}
}

0 comments on commit 8a8427f

Please sign in to comment.