Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Introduce IP-level bans for connection spam targeting validators #3366

Open
wants to merge 3 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions node/bft/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ edition = "2021"
[features]
default = [ ]
metrics = [ "dep:metrics", "snarkos-node-bft-events/metrics", "snarkos-node-bft-ledger-service/metrics" ]
test = [ ]

[dependencies.aleo-std]
workspace = true
Expand Down Expand Up @@ -152,6 +153,10 @@ version = "0.4"
[dev-dependencies.rayon]
version = "1"

[dev-dependencies.snarkos-node-bft]
path = "."
features = [ "test" ]

[dev-dependencies.snarkos-node-bft-ledger-service]
path = "./ledger-service"
default-features = false
Expand Down
60 changes: 58 additions & 2 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,14 @@ use futures::SinkExt;
use indexmap::{IndexMap, IndexSet};
use parking_lot::{Mutex, RwLock};
use rand::seq::{IteratorRandom, SliceRandom};
use std::{collections::HashSet, future::Future, io, net::SocketAddr, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
future::Future,
io,
net::{IpAddr, SocketAddr},
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
net::TcpStream,
sync::{oneshot, OnceCell},
Expand All @@ -88,6 +95,12 @@ const MIN_CONNECTED_VALIDATORS: usize = 175;
/// The maximum number of validators to send in a validators response event.
const MAX_VALIDATORS_TO_SEND: usize = 200;

/// The minimum permitted interval between connection attempts for an IP; anything shorter is considered malicious.
#[cfg(not(any(test, feature = "test")))]
const MIN_CONNECTION_INTERVAL_IN_SECS: u64 = 10;
/// The amount of time an IP address is prohibited from connecting.
const IP_BAN_TIME_IN_SECS: u64 = 30;

/// Part of the Gateway API that deals with networking.
/// This is a separate trait to allow for easier testing/mocking.
#[async_trait]
Expand Down Expand Up @@ -119,6 +132,8 @@ pub struct Gateway<N: Network> {
/// prevent simultaneous "two-way" connections between two peers (i.e. both nodes simultaneously
/// attempt to connect to each other). This set is used to prevent this from happening.
connecting_peers: Arc<Mutex<IndexSet<SocketAddr>>>,
/// The set of banned IP addresses.
banned_ips: Arc<RwLock<HashMap<IpAddr, Instant>>>,
/// The primary sender.
primary_sender: Arc<OnceCell<PrimarySender<N>>>,
/// The worker senders.
Expand Down Expand Up @@ -160,6 +175,7 @@ impl<N: Network> Gateway<N> {
trusted_validators: trusted_validators.iter().copied().collect(),
connected_peers: Default::default(),
connecting_peers: Default::default(),
banned_ips: Default::default(),
primary_sender: Default::default(),
worker_senders: Default::default(),
sync_sender: Default::default(),
Expand Down Expand Up @@ -459,6 +475,19 @@ impl<N: Network> Gateway<N> {
Ok(())
}

/// Check whether the given IP address is currently banned.
#[cfg(not(any(test, feature = "test")))]
fn is_ip_banned(&self, ip: IpAddr) -> bool {
self.banned_ips.read().contains_key(&ip)
}

/// Insert or update a banned IP.
#[cfg(not(any(test, feature = "test")))]
fn update_ip_ban(&self, ip: IpAddr) {
let timestamp = Instant::now();
self.banned_ips.write().insert(ip, timestamp);
}

#[cfg(feature = "metrics")]
fn update_metrics(&self) {
metrics::gauge(metrics::bft::CONNECTED, self.connected_peers.read().len() as f64);
Expand Down Expand Up @@ -875,6 +904,8 @@ impl<N: Network> Gateway<N> {
self.handle_unauthorized_validators();
// If the number of connected validators is less than the minimum, send a `ValidatorsRequest`.
self.handle_min_connected_validators();
// Unban any addresses whose ban time has expired.
self.handle_banned_ips();
}

/// Logs the connected validators.
Expand Down Expand Up @@ -955,6 +986,11 @@ impl<N: Network> Gateway<N> {
}
}
}

// Remove addresses whose ban time has expired.
fn handle_banned_ips(&self) {
self.banned_ips.write().retain(|_, timestamp| timestamp.elapsed().as_secs() < IP_BAN_TIME_IN_SECS);
}
}

#[async_trait]
Expand Down Expand Up @@ -1105,9 +1141,29 @@ impl<N: Network> OnConnect for Gateway<N> {
impl<N: Network> Handshake for Gateway<N> {
/// Performs the handshake protocol.
async fn perform_handshake(&self, mut connection: Connection) -> io::Result<Connection> {
// Perform the handshake.
let peer_addr = connection.addr();
let peer_side = connection.side();

// Check (or impose) IP-level bans.
#[cfg(not(any(test, feature = "test")))]
if self.dev().is_none() && peer_side == ConnectionSide::Initiator {
// If the IP is already banned, update the ban timestamp and reject the connection.
if self.is_ip_banned(peer_addr.ip()) {
self.update_ip_ban(peer_addr.ip());
trace!("{CONTEXT} Gateway rejected a connection request from banned IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' is a banned IP address", peer_addr.ip())));
}

// Check the previous low-level connection timestamp.
if let Some(peer_stats) = self.tcp.known_peers().get(peer_addr.ip()) {
if peer_stats.timestamp().elapsed().as_secs() <= MIN_CONNECTION_INTERVAL_IN_SECS {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Meshiest wrote: "in my test environment this is permanently banning my local validators (they're connecting to eachother while booting up)"

@Meshiest can you describe your test in more detail? Most importantly how many validators, from genesis or not, and what rough machine specs? That would be useful for @ljedrz to reproduce and chime in on an alternative design or constants.

Copy link
Contributor

@Meshiest Meshiest Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an entirely local 4 validator, 4 bootstrap+core clients, and 4 fringe client network for testing another change & made a few snarkos changes to support 127.x.y.z IPs so the peers would have different addresses while still being local and having default ports (can probably push these somewhere). I also updated the bootstrap peers to be the core clients

When booting the nodes all at the same time, validators are starved for peers and are the initiators for all their requests. This results in the validators getting banned both by other validators as well as their own core clients in the non-gateway concurrent con check.

I'm not certain my IP changes to support other local addresses are contributing but they are part of the equation and it could be related to the other testing I was doing at the time

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Meshiest just to rule out other possible issues, could you try starting the nodes one-by-one and see if this avoids the bans? I agree that we should also support concurrent launch, this is just to double-check that it is this edge case.

Copy link
Contributor

@Meshiest Meshiest Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebuilding via:

  1. checkout [Feature] Introduce IP-level bans for connection spam targeting validators #3366
  2. cherry-pick Testing features for custom loopback isonets #3397 (I'm running all my nodes locally)
  3. merge mainnet

And I wasn't able to reproduce the parallel booting ban, though after more testing and rapidly restarting nodes I wasn't able to achieve any bans. Not entirely sure what's going on. Out of time until later today (at least 12 hours)

self.update_ip_ban(peer_addr.ip());
trace!("{CONTEXT} Gateway rejected a consecutive connection request from IP '{}'", peer_addr.ip());
return Err(error(format!("'{}' appears to be spamming connections", peer_addr.ip())));
}
}
}

let stream = self.borrow_stream(&mut connection);

// If this is an inbound connection, we log it, but don't know the listening address yet.
Expand Down
33 changes: 23 additions & 10 deletions node/tcp/src/helpers/known_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,66 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{collections::HashMap, net::SocketAddr, sync::Arc};
use std::{
collections::{hash_map::Entry, HashMap},
net::IpAddr,
sync::Arc,
time::Instant,
};

use parking_lot::RwLock;

use crate::Stats;

/// Contains statistics related to Tcp's peers, currently connected or not.
#[derive(Default)]
pub struct KnownPeers(RwLock<HashMap<SocketAddr, Arc<Stats>>>);
pub struct KnownPeers(RwLock<HashMap<IpAddr, Arc<Stats>>>);

impl KnownPeers {
/// Adds an address to the list of known peers.
pub fn add(&self, addr: SocketAddr) {
self.0.write().entry(addr).or_default();
pub fn add(&self, addr: IpAddr) {
let timestamp = Instant::now();
match self.0.write().entry(addr) {
Entry::Vacant(entry) => {
entry.insert(Arc::new(Stats::new(timestamp)));
}
Entry::Occupied(entry) => {
*entry.get().timestamp.write() = timestamp;
}
}
}

/// Returns the stats for the given peer.
pub fn get(&self, addr: SocketAddr) -> Option<Arc<Stats>> {
pub fn get(&self, addr: IpAddr) -> Option<Arc<Stats>> {
self.0.read().get(&addr).map(Arc::clone)
}

/// Removes an address from the list of known peers.
pub fn remove(&self, addr: SocketAddr) -> Option<Arc<Stats>> {
pub fn remove(&self, addr: IpAddr) -> Option<Arc<Stats>> {
self.0.write().remove(&addr)
}

/// Returns the list of all known peers and their stats.
pub fn snapshot(&self) -> HashMap<SocketAddr, Arc<Stats>> {
pub fn snapshot(&self) -> HashMap<IpAddr, Arc<Stats>> {
self.0.read().clone()
}

/// Registers a submission of a message to the given address.
pub fn register_sent_message(&self, to: SocketAddr, size: usize) {
pub fn register_sent_message(&self, to: IpAddr, size: usize) {
if let Some(stats) = self.0.read().get(&to) {
stats.register_sent_message(size);
}
}

/// Registers a receipt of a message to the given address.
pub fn register_received_message(&self, from: SocketAddr, size: usize) {
pub fn register_received_message(&self, from: IpAddr, size: usize) {
if let Some(stats) = self.0.read().get(&from) {
stats.register_received_message(size);
}
}

/// Registers a failure associated with the given address.
pub fn register_failure(&self, addr: SocketAddr) {
pub fn register_failure(&self, addr: IpAddr) {
if let Some(stats) = self.0.read().get(&addr) {
stats.register_failure();
}
Expand Down
26 changes: 24 additions & 2 deletions node/tcp/src/helpers/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::{AtomicU64, Ordering::Relaxed};
use parking_lot::RwLock;
use std::{
sync::atomic::{AtomicU64, Ordering::Relaxed},
time::Instant,
};

/// Contains statistics related to Tcp.
#[derive(Default)]
pub struct Stats {
/// The timestamp of the creation (for the node) or connection (to a peer).
pub(crate) timestamp: RwLock<Instant>,
/// The number of all messages sent.
msgs_sent: AtomicU64,
/// The number of all messages received.
Expand All @@ -30,6 +35,23 @@ pub struct Stats {
}

impl Stats {
/// Creates a new instance of the object.
pub fn new(timestamp: Instant) -> Self {
Self {
timestamp: RwLock::new(timestamp),
msgs_sent: Default::default(),
msgs_received: Default::default(),
bytes_sent: Default::default(),
bytes_received: Default::default(),
failures: Default::default(),
}
}

/// Returns the creation or connection timestamp.
pub fn timestamp(&self) -> Instant {
*self.timestamp.read()
}

/// Returns the number of sent messages and their collective size in bytes.
pub fn sent(&self) -> (u64, u64) {
let msgs = self.msgs_sent.load(Relaxed);
Expand Down
6 changes: 3 additions & 3 deletions node/tcp/src/protocols/reading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl<R: Reading> ReadingInternal for R {
while let Some(msg) = inbound_message_receiver.recv().await {
if let Err(e) = self_clone.process_message(addr, msg).await {
error!(parent: node.span(), "can't process a message from {addr}: {e}");
node.known_peers().register_failure(addr);
node.known_peers().register_failure(addr.ip());
}
#[cfg(feature = "metrics")]
metrics::decrement_gauge(metrics::tcp::TCP_TASKS, 1f64);
Expand Down Expand Up @@ -178,7 +178,7 @@ impl<R: Reading> ReadingInternal for R {
}
Err(e) => {
error!(parent: node.span(), "can't read from {addr}: {e}");
node.known_peers().register_failure(addr);
node.known_peers().register_failure(addr.ip());
if node.config().fatal_io_errors.contains(&e.kind()) {
break;
}
Expand Down Expand Up @@ -229,7 +229,7 @@ impl<D: Decoder> Decoder for CountingCodec<D> {

if ret.is_some() {
self.acc = 0;
self.node.known_peers().register_received_message(self.addr, read_len);
self.node.known_peers().register_received_message(self.addr.ip(), read_len);
self.node.stats().register_received_message(read_len);
} else {
self.acc = read_len;
Expand Down
4 changes: 2 additions & 2 deletions node/tcp/src/protocols/writing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,12 @@ impl<W: Writing> WritingInternal for W {
match self_clone.write_to_stream(*msg, &mut framed).await {
Ok(len) => {
let _ = wrapped_msg.delivery_notification.send(Ok(()));
node.known_peers().register_sent_message(addr, len);
node.known_peers().register_sent_message(addr.ip(), len);
node.stats().register_sent_message(len);
trace!(parent: node.span(), "sent {}B to {}", len, addr);
}
Err(e) => {
node.known_peers().register_failure(addr);
node.known_peers().register_failure(addr.ip());
error!(parent: node.span(), "couldn't send a message to {}: {}", addr, e);
let is_fatal = node.config().fatal_io_errors.contains(&e.kind());
let _ = wrapped_msg.delivery_notification.send(Err(e));
Expand Down
17 changes: 5 additions & 12 deletions node/tcp/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
atomic::{AtomicUsize, Ordering::*},
Arc,
},
time::Duration,
time::{Duration, Instant},
};

use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -101,7 +101,7 @@ impl Tcp {
connecting: Default::default(),
connections: Default::default(),
known_peers: Default::default(),
stats: Default::default(),
stats: Stats::new(Instant::now()),
tasks: Default::default(),
}));

Expand Down Expand Up @@ -255,7 +255,7 @@ impl Tcp {

if let Err(ref e) = ret {
self.connecting.lock().remove(&addr);
self.known_peers().register_failure(addr);
self.known_peers().register_failure(addr.ip());
error!(parent: self.span(), "Unable to initiate a connection with {addr}: {e}");
}

Expand All @@ -282,13 +282,6 @@ impl Tcp {
task.abort();
}

// If the (owning) Tcp was not the initiator of the connection, it doesn't know the listening address
// of the associated peer, so the related stats are unreliable; the next connection initiated by the
// peer could be bound to an entirely different port number
if conn.side() == ConnectionSide::Initiator {
self.known_peers().remove(conn.addr());
}

debug!(parent: self.span(), "Disconnected from {}", conn.addr());
} else {
warn!(parent: self.span(), "Failed to disconnect, was not connected to {addr}");
Expand Down Expand Up @@ -386,7 +379,7 @@ impl Tcp {
tokio::spawn(async move {
if let Err(e) = tcp.adapt_stream(stream, addr, ConnectionSide::Responder).await {
tcp.connecting.lock().remove(&addr);
tcp.known_peers().register_failure(addr);
tcp.known_peers().register_failure(addr.ip());
error!(parent: tcp.span(), "Failed to connect with {addr}: {e}");
}
});
Expand Down Expand Up @@ -426,7 +419,7 @@ impl Tcp {

/// Prepares the freshly acquired connection to handle the protocols the Tcp implements.
async fn adapt_stream(&self, stream: TcpStream, peer_addr: SocketAddr, own_side: ConnectionSide) -> io::Result<()> {
self.known_peers.add(peer_addr);
self.known_peers.add(peer_addr.ip());

// Register the port seen by the peer.
if own_side == ConnectionSide::Initiator {
Expand Down