Skip to content

Commit 1790269

Browse files
committed
feat(node): ice outbound peer that sent invalid block batch
1 parent 0bad7a2 commit 1790269

File tree

6 files changed

+46
-30
lines changed

6 files changed

+46
-30
lines changed

node/src/actors/chain_manager/actor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ impl ChainManager {
235235
let genesis_block = ig.build_genesis_block(consensus_constants.bootstrap_hash);
236236
ctx.notify(AddBlocks {
237237
blocks: vec![genesis_block],
238+
sender: None,
238239
});
239240
}
240241
}

node/src/actors/chain_manager/handlers.rs

Lines changed: 7 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ impl Handler<AddBlocks> for ChainManager {
294294
);
295295

296296
let consensus_constants = self.consensus_constants();
297+
let sender = msg.sender;
297298

298299
match self.sm_state {
299300
StateMachine::WaitingConsensus | StateMachine::AlmostSynced => {
@@ -403,10 +404,7 @@ impl Handler<AddBlocks> for ChainManager {
403404
let (batch_succeeded, num_processed_blocks) =
404405
self.process_first_batch(ctx, &sync_target, &blocks);
405406
if !batch_succeeded {
406-
// We receive an invalid batch of blocks. So we need to throw away our
407-
// outbound peers in order to find new ones that can give us the blocks
408-
// consolidated by the network
409-
self.drop_all_outbounds();
407+
self.drop_all_outbounds_and_ice_sender(sender);
410408

411409
return;
412410
}
@@ -432,10 +430,7 @@ impl Handler<AddBlocks> for ChainManager {
432430
let (batch_succeeded, num_processed_blocks) =
433431
self.process_first_batch(ctx, &sync_target, &consolidate_blocks);
434432
if !batch_succeeded {
435-
// We receive an invalid batch of blocks. So we need to throw away our
436-
// outbound peers in order to find new ones that can give us the blocks
437-
// consolidated by the network
438-
self.drop_all_outbounds();
433+
self.drop_all_outbounds_and_ice_sender(sender);
439434

440435
return;
441436
}
@@ -468,10 +463,7 @@ impl Handler<AddBlocks> for ChainManager {
468463
// Process remaining blocks
469464
let (batch_succeeded, num_processed_blocks) = act.process_blocks_batch(ctx, &sync_target, &remainig_blocks);
470465
if !batch_succeeded {
471-
// We receive an invalid batch of blocks. So we need to throw away our
472-
// outbound peers in order to find new ones that can give us the blocks
473-
// consolidated by the network
474-
act.drop_all_outbounds();
466+
act.drop_all_outbounds_and_ice_sender(sender);
475467

476468
return actix::fut::err(());
477469
}
@@ -494,10 +486,7 @@ impl Handler<AddBlocks> for ChainManager {
494486
let (batch_succeeded, num_processed_blocks) =
495487
self.process_first_batch(ctx, &sync_target, &consolidate_blocks);
496488
if !batch_succeeded {
497-
// We receive an invalid batch of blocks. So we need to throw away our
498-
// outbound peers in order to find new ones that can give us the blocks
499-
// consolidated by the network
500-
self.drop_all_outbounds();
489+
self.drop_all_outbounds_and_ice_sender(sender);
501490

502491
return;
503492
}
@@ -530,10 +519,7 @@ impl Handler<AddBlocks> for ChainManager {
530519
// Process remaining blocks
531520
let (batch_succeeded, num_processed_blocks) = act.process_blocks_batch(ctx, &sync_target, &candidate_blocks);
532521
if !batch_succeeded {
533-
// We receive an invalid batch of blocks. So we need to throw away our
534-
// outbound peers in order to find new ones that can give us the blocks
535-
// consolidated by the network
536-
act.drop_all_outbounds();
522+
act.drop_all_outbounds_and_ice_sender(sender);
537523

538524
act.update_state_machine(StateMachine::WaitingConsensus);
539525

@@ -579,10 +565,7 @@ impl Handler<AddBlocks> for ChainManager {
579565
// Process remaining blocks
580566
let (batch_succeeded, num_processed_blocks) = act.process_blocks_batch(ctx, &sync_target, &remaining_blocks);
581567
if !batch_succeeded {
582-
// We receive an invalid batch of blocks. So we need to throw away our
583-
// outbound peers in order to find new ones that can give us the blocks
584-
// consolidated by the network
585-
act.drop_all_outbounds();
568+
act.drop_all_outbounds_and_ice_sender(sender);
586569

587570
log::error!("Received invalid blocks batch...");
588571
act.update_state_machine(StateMachine::WaitingConsensus);

node/src/actors/chain_manager/mod.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,11 @@ use crate::{
7575
json_rpc::JsonRpcServer,
7676
messages::{
7777
AddItem, AddItems, AddTransaction, Anycast, BlockNotify, Broadcast, DropOutboundPeers,
78-
GetBlocksEpochRange, GetItemBlock, NodeStatusNotify, SendInventoryItem,
79-
SendInventoryRequest, SendLastBeacon, SendSuperBlockVote, StoreInventoryItem,
80-
SuperBlockNotify,
78+
GetBlocksEpochRange, GetItemBlock, NodeStatusNotify, RemoveAddressesFromTried,
79+
SendInventoryItem, SendInventoryRequest, SendLastBeacon, SendSuperBlockVote,
80+
StoreInventoryItem, SuperBlockNotify,
8181
},
82+
peers_manager::PeersManager,
8283
sessions_manager::SessionsManager,
8384
storage_keys,
8485
},
@@ -1842,7 +1843,7 @@ impl ChainManager {
18421843
Box::new(fut)
18431844
}
18441845

1845-
/// This function send a message to SessionsManager to drop all outbounds peers
1846+
/// Send a message to `SessionsManager` to drop all outbound peers.
18461847
pub fn drop_all_outbounds(&self) {
18471848
let peers_to_unregister = self
18481849
.last_received_beacons
@@ -1854,6 +1855,28 @@ impl ChainManager {
18541855
peers_to_drop: peers_to_unregister,
18551856
});
18561857
}
1858+
1859+
/// Send a message to `PeersManager` to ice a specific peer.
1860+
pub fn ice_peer(&self, addr: Option<SocketAddr>) {
1861+
if let Some(addr) = addr {
1862+
let peers_manager_addr = PeersManager::from_registry();
1863+
peers_manager_addr.do_send(RemoveAddressesFromTried {
1864+
addresses: vec![addr],
1865+
ice: true,
1866+
});
1867+
}
1868+
}
1869+
1870+
/// Execute `drop_all_outbounds` and `ice_peer` at once.
1871+
///
1872+
/// This is called when we receive an invalid batch of blocks. It will throw away our outbound
1873+
/// peers in order to find new ones that can give us the blocks consolidated by the network,
1874+
/// and ice the node that sent the invalid batch.
1875+
pub fn drop_all_outbounds_and_ice_sender(&self, sender: Option<SocketAddr>) {
1876+
self.drop_all_outbounds();
1877+
// Ice the invalid blocks' batch sender
1878+
self.ice_peer(sender);
1879+
}
18571880
}
18581881

18591882
/// Helper struct used to persist an old copy of the `ChainState` to the storage

node/src/actors/messages.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ impl Message for GetSuperBlockVotes {
7070
pub struct AddBlocks {
7171
/// Blocks
7272
pub blocks: Vec<Block>,
73+
/// Sender peer
74+
pub sender: Option<SocketAddr>,
7375
}
7476

7577
impl Message for AddBlocks {

node/src/actors/session/actor.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,10 @@ impl Actor for Session {
114114
// Get ChainManager address
115115
let chain_manager_addr = ChainManager::from_registry();
116116

117-
chain_manager_addr.do_send(AddBlocks { blocks: vec![] });
117+
chain_manager_addr.do_send(AddBlocks {
118+
blocks: vec![],
119+
sender: None,
120+
});
118121
log::warn!("Session disconnected during block exchange");
119122
}
120123

node/src/actors/session/handlers.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,10 @@ impl Handler<EpochNotification<EveryEpochPayload>> for Session {
109109
// Get ChainManager address
110110
let chain_manager_addr = ChainManager::from_registry();
111111

112-
chain_manager_addr.do_send(AddBlocks { blocks: vec![] });
112+
chain_manager_addr.do_send(AddBlocks {
113+
blocks: vec![],
114+
sender: None,
115+
});
113116
log::warn!("Timeout for waiting blocks achieved");
114117
ctx.stop();
115118
}
@@ -667,6 +670,7 @@ fn inventory_process_block(session: &mut Session, _ctx: &mut Context<Session>, b
667670
// Send a message to the ChainManager to try to add a new block
668671
chain_manager_addr.do_send(AddBlocks {
669672
blocks: blocks_vector,
673+
sender: session.public_addr,
670674
});
671675

672676
// Clear requested block structures

0 commit comments

Comments
 (0)