Skip to content

Commit

Permalink
fix: avoid holding locks across blocking send()
Browse files Browse the repository at this point in the history
addresses #457

peer.send() and mpsc::Sender::send() are both async methods that can
block indefinitely.  As such, we should never hold a lock across
these, as it will prevent other code from executing for the
duration.

This commit is the result of reviewing/auditing all send() calls
in neptune-core, both for peers and channels.

Instance of both types were found and fixed.  Now the lock is
released before the send().

Otherwise, all logic remains the same.  (afaik)
  • Loading branch information
dan-da committed Feb 20, 2025
1 parent 59304ed commit 4b9a218
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/bin/dashboard_src/dashboard_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;

use clap::Parser;
use crossterm::event;
use crossterm::event::DisableMouseCapture;
use crossterm::event::EnableMouseCapture;
Expand Down Expand Up @@ -58,7 +59,6 @@ use super::peers_screen::SortOrder;
use super::receive_screen::ReceiveScreen;
use super::screen::Screen;
use super::send_screen::SendScreen;
use clap::Parser;

#[derive(Debug, Parser, Clone)]
#[clap(name = "neptune-dashboard", about = "Terminal user interface")]
Expand Down
2 changes: 2 additions & 0 deletions src/connect_to_peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,8 @@ pub(crate) async fn close_peer_connected_callback(
.net
.write_peer_standing_on_decrease(peer_address.ip(), new_standing)
.await;
drop(global_state_mut); // avoid holding across mpsc::Sender::send()

debug!(
"Stored peer info standing {} for peer {}",
new_standing, peer_address
Expand Down
8 changes: 8 additions & 0 deletions src/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,15 @@ impl MainToMinerChannel {
pub struct MainLoopHandler {
incoming_peer_listener: TcpListener,
global_state_lock: GlobalStateLock,

// note: broadcast::Sender::send() does not block
main_to_peer_broadcast_tx: broadcast::Sender<MainToPeerTask>,

// note: mpsc::Sender::send() blocks if channel full.
// locks should not be held across it.
peer_task_to_main_tx: mpsc::Sender<PeerTaskToMain>,

// note: MainToMinerChannel::send() does not block. might log error.
main_to_miner_tx: MainToMinerChannel,

#[cfg(test)]
Expand Down Expand Up @@ -498,6 +505,7 @@ impl MainLoopHandler {
let mut global_state_mut = self.global_state_lock.lock_guard_mut().await;

if !global_state_mut.incoming_block_is_more_canonical(&new_block) {
drop(global_state_mut); // don't hold across send()
warn!("Got new block from miner task that was not child of tip. Discarding.");
self.main_to_miner_tx.send(MainToMiner::Continue);
return Ok(None);
Expand Down
1 change: 1 addition & 0 deletions src/main_loop/proof_upgrader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ impl UpgradeJob {
.wallet_state
.add_expected_utxos(expected_utxos)
.await;
drop(global_state); // sooner is better.

// Inform all peers about our hard work
main_to_peer_channel
Expand Down
55 changes: 31 additions & 24 deletions src/peer_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1090,10 +1090,13 @@ impl PeerLoopHandler {
}

let response = Self::batch_response(&state, returned_blocks, &anchor).await;

// issue 457. do not hold lock across a peer.send(). nor self.punish()
drop(state);

let response = match response {
Some(response) => response,
None => {
drop(state);
warn!("Unable to satisfy batch-block request");
self.punish(NegativePeerSanction::BatchBlocksUnknownRequest)
.await?;
Expand Down Expand Up @@ -1401,30 +1404,34 @@ impl PeerLoopHandler {
PeerMessage::TransactionNotification(tx_notification) => {
log_slow_scope!(fn_name!() + "::PeerMessage::TransactionNotification");

// 1. Ignore if we already know this transaction, and
// the proof quality is not higher than what we already know.
let state = self.global_state_lock.lock_guard().await;
let transaction_of_same_or_higher_proof_quality_is_known =
state.mempool.contains_with_higher_proof_quality(
tx_notification.txid,
tx_notification.proof_quality,
);
if transaction_of_same_or_higher_proof_quality_is_known {
debug!("transaction with same or higher proof quality was already known");
return Ok(KEEP_CONNECTION_ALIVE);
}

// Only accept transactions that do not require executing
// `update`.
if state
.chain
.light_state()
.mutator_set_accumulator_after()
.hash()
!= tx_notification.mutator_set_hash
// addresses #457
// new scope for state read-lock to avoid holding across peer.send()
{
debug!("transaction refers to non-canonical mutator set state");
return Ok(KEEP_CONNECTION_ALIVE);
// 1. Ignore if we already know this transaction, and
// the proof quality is not higher than what we already know.
let state = self.global_state_lock.lock_guard().await;
let transaction_of_same_or_higher_proof_quality_is_known =
state.mempool.contains_with_higher_proof_quality(
tx_notification.txid,
tx_notification.proof_quality,
);
if transaction_of_same_or_higher_proof_quality_is_known {
debug!("transaction with same or higher proof quality was already known");
return Ok(KEEP_CONNECTION_ALIVE);
}

// Only accept transactions that do not require executing
// `update`.
if state
.chain
.light_state()
.mutator_set_accumulator_after()
.hash()
!= tx_notification.mutator_set_hash
{
debug!("transaction refers to non-canonical mutator set state");
return Ok(KEEP_CONNECTION_ALIVE);
}
}

// 2. Request the actual `Transaction` from peer
Expand Down

0 comments on commit 4b9a218

Please sign in to comment.