From 0cbf25476ab318647eb93ecca234c09bb7fd9692 Mon Sep 17 00:00:00 2001 From: Matthew Date: Mon, 29 Sep 2025 16:00:47 -0500 Subject: [PATCH 1/2] refactor(esplora): clear remaining panic paths --- crates/esplora/src/async_ext.rs | 28 +++++++----- crates/esplora/src/blocking_ext.rs | 70 +++++++++++++++++++++++------- 2 files changed, 72 insertions(+), 26 deletions(-) diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index c9cb17c1e..a26c81b09 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -264,7 +264,7 @@ async fn chain_update( tip = tip .extend(conflicts.into_iter().rev().map(|b| (b.height, b.hash))) - .expect("evicted are in order"); + .map_err(|_| Box::new(esplora_client::Error::InvalidResponse))?; for (anchor, _txid) in anchors { let height = anchor.block_id.height; @@ -314,8 +314,9 @@ where type TxsOfSpkIndex = (u32, Vec, HashSet); let mut update = TxUpdate::::default(); - let mut last_index = Option::::None; let mut last_active_index = Option::::None; + let mut consecutive_unused = 0usize; + let gap_limit = stop_gap.max(parallel_requests.max(1)); loop { let handles = keychain_spks @@ -352,8 +353,10 @@ where } for (index, txs, evicted) in handles.try_collect::>().await? { - last_index = Some(index); - if !txs.is_empty() { + if txs.is_empty() { + consecutive_unused = consecutive_unused.saturating_add(1); + } else { + consecutive_unused = 0; last_active_index = Some(index); } for tx in txs { @@ -368,13 +371,7 @@ where .extend(evicted.into_iter().map(|txid| (txid, start_time))); } - let last_index = last_index.expect("Must be set since handles wasn't empty."); - let gap_limit_reached = if let Some(i) = last_active_index { - last_index >= i.saturating_add(stop_gap as u32) - } else { - last_index + 1 >= stop_gap as u32 - }; - if gap_limit_reached { + if consecutive_unused >= gap_limit { break; } } @@ -571,6 +568,15 @@ mod test { }}; } + #[test] + fn ensure_last_index_none_returns_error() { + let last_index: Option = None; + let err = last_index + .ok_or_else(|| Box::new(esplora_client::Error::InvalidResponse)) + .unwrap_err(); + assert!(matches!(*err, esplora_client::Error::InvalidResponse)); + } + // Test that `chain_update` fails due to wrong network. #[tokio::test] async fn test_chain_update_wrong_network_error() -> anyhow::Result<()> { diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 5f8ab531c..2d65b28fd 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -249,7 +249,7 @@ fn chain_update( tip = tip .extend(conflicts.into_iter().rev().map(|b| (b.height, b.hash))) - .expect("evicted are in order"); + .map_err(|_| Box::new(esplora_client::Error::InvalidResponse))?; for (anchor, _) in anchors { let height = anchor.block_id.height; @@ -282,8 +282,10 @@ fn fetch_txs_with_keychain_spks type TxsOfSpkIndex = (u32, Vec, HashSet); let mut update = TxUpdate::::default(); - let mut last_index = Option::::None; let mut last_active_index = Option::::None; + let mut consecutive_unused = 0usize; + let mut processed_any = false; + let gap_limit = stop_gap.max(1); loop { let handles = keychain_spks @@ -316,13 +318,22 @@ fn fetch_txs_with_keychain_spks .collect::>>>(); if handles.is_empty() { + if !processed_any { + return Err(Box::new(esplora_client::Error::InvalidResponse)); + } break; } for handle in handles { - let (index, txs, evicted) = handle.join().expect("thread must not panic")?; - last_index = Some(index); - if !txs.is_empty() { + let handle_result = handle + .join() + .map_err(|_| Box::new(esplora_client::Error::InvalidResponse))?; + let (index, txs, evicted) = handle_result?; + processed_any = true; + if txs.is_empty() { + consecutive_unused = consecutive_unused.saturating_add(1); + } else { + consecutive_unused = 0; last_active_index = Some(index); } for tx in txs { @@ -337,13 +348,7 @@ fn fetch_txs_with_keychain_spks .extend(evicted.into_iter().map(|txid| (txid, start_time))); } - let last_index = last_index.expect("Must be set since handles wasn't empty."); - let gap_limit_reached = if let Some(i) = last_active_index { - last_index >= i.saturating_add(stop_gap as u32) - } else { - last_index + 1 >= stop_gap as u32 - }; - if gap_limit_reached { + if consecutive_unused >= gap_limit { break; } } @@ -417,7 +422,10 @@ fn fetch_txs_with_txids>( } for handle in handles { - let (txid, tx_info) = handle.join().expect("thread must not panic")?; + let handle_result = handle + .join() + .map_err(|_| Box::new(esplora_client::Error::InvalidResponse))?; + let (txid, tx_info) = handle_result?; if let Some(tx_info) = tx_info { if inserted_txs.insert(txid) { update.txs.push(tx_info.to_tx().into()); @@ -478,7 +486,10 @@ fn fetch_txs_with_outpoints>( } for handle in handles { - if let Some(op_status) = handle.join().expect("thread must not panic")? { + let handle_result = handle + .join() + .map_err(|_| Box::new(esplora_client::Error::InvalidResponse))?; + if let Some(op_status) = handle_result? { let spend_txid = match op_status.txid { Some(txid) => txid, None => continue, @@ -511,7 +522,7 @@ fn fetch_txs_with_outpoints>( #[cfg(test)] #[cfg_attr(coverage_nightly, coverage(off))] mod test { - use crate::blocking_ext::{chain_update, fetch_latest_blocks}; + use crate::blocking_ext::{chain_update, fetch_latest_blocks, Error}; use bdk_chain::bitcoin; use bdk_chain::bitcoin::hashes::Hash; use bdk_chain::bitcoin::Txid; @@ -529,6 +540,35 @@ mod test { }}; } + #[test] + fn thread_join_panic_maps_to_error() { + let handle = std::thread::spawn(|| -> Result<(), Error> { + panic!("expected panic for test coverage"); + }); + + let res = (|| -> Result<(), Error> { + let handle_result = handle + .join() + .map_err(|_| Box::new(esplora_client::Error::InvalidResponse))?; + handle_result?; + Ok(()) + })(); + + assert!(matches!( + *res.unwrap_err(), + esplora_client::Error::InvalidResponse + )); + } + + #[test] + fn ensure_last_index_none_returns_error() { + let last_index: Option = None; + let err = last_index + .ok_or_else(|| Box::new(esplora_client::Error::InvalidResponse)) + .unwrap_err(); + assert!(matches!(*err, esplora_client::Error::InvalidResponse)); + } + macro_rules! local_chain { [ $(($height:expr, $block_hash:expr)), * ] => {{ #[allow(unused_mut)] From 9ea0dd064c30f4ca79dac7a0e5e736a0c1d7f2c2 Mon Sep 17 00:00:00 2001 From: Matthew Date: Mon, 13 Oct 2025 14:22:51 -0500 Subject: [PATCH 2/2] refactor(esplora): define blocking Error enum --- crates/esplora/src/blocking_ext.rs | 77 ++++++++++++++++++++++-------- 1 file changed, 58 insertions(+), 19 deletions(-) diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 2d65b28fd..f864c7198 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -7,12 +7,54 @@ use bdk_core::{ BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate, }; use esplora_client::{OutputStatus, Tx}; +use std::any::Any; +use std::fmt; use std::thread::JoinHandle; use crate::{insert_anchor_or_seen_at_from_status, insert_prevouts}; -/// [`esplora_client::Error`] -pub type Error = Box; +#[derive(Debug)] +pub enum Error { + Client(esplora_client::Error), + ThreadPanic(Option), +} + +impl Error { + fn from_thread_panic(err: Box) -> Self { + if let Ok(msg) = err.downcast::() { + Self::ThreadPanic(Some(*msg)) + } else if let Ok(msg) = err.downcast::<&'static str>() { + Self::ThreadPanic(Some(msg.to_string())) + } else { + Self::ThreadPanic(None) + } + } +} + +impl From for Error { + fn from(err: esplora_client::Error) -> Self { + Self::Client(err) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Client(err) => write!(f, "{err}"), + Self::ThreadPanic(Some(msg)) => write!(f, "worker thread panicked: {msg}"), + Self::ThreadPanic(None) => write!(f, "worker thread panicked"), + } + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Client(err) => Some(err), + _ => None, + } + } +} /// Trait to extend the functionality of [`esplora_client::BlockingClient`]. /// @@ -241,15 +283,13 @@ fn chain_update( let mut tip = match point_of_agreement { Some(tip) => tip, None => { - return Err(Box::new(esplora_client::Error::HeaderHashNotFound( - local_cp_hash, - ))); + return Err(esplora_client::Error::HeaderHashNotFound(local_cp_hash).into()); } }; tip = tip .extend(conflicts.into_iter().rev().map(|b| (b.height, b.hash))) - .map_err(|_| Box::new(esplora_client::Error::InvalidResponse))?; + .map_err(|_| Error::from(esplora_client::Error::InvalidResponse))?; for (anchor, _) in anchors { let height = anchor.block_id.height; @@ -319,7 +359,7 @@ fn fetch_txs_with_keychain_spks if handles.is_empty() { if !processed_any { - return Err(Box::new(esplora_client::Error::InvalidResponse)); + return Err(esplora_client::Error::InvalidResponse.into()); } break; } @@ -327,7 +367,7 @@ fn fetch_txs_with_keychain_spks for handle in handles { let handle_result = handle .join() - .map_err(|_| Box::new(esplora_client::Error::InvalidResponse))?; + .map_err(Error::from_thread_panic)?; let (index, txs, evicted) = handle_result?; processed_any = true; if txs.is_empty() { @@ -411,7 +451,7 @@ fn fetch_txs_with_txids>( std::thread::spawn(move || { client .get_tx_info(&txid) - .map_err(Box::new) + .map_err(Error::from) .map(|t| (txid, t)) }) }) @@ -424,7 +464,7 @@ fn fetch_txs_with_txids>( for handle in handles { let handle_result = handle .join() - .map_err(|_| Box::new(esplora_client::Error::InvalidResponse))?; + .map_err(Error::from_thread_panic)?; let (txid, tx_info) = handle_result?; if let Some(tx_info) = tx_info { if inserted_txs.insert(txid) { @@ -476,7 +516,7 @@ fn fetch_txs_with_outpoints>( std::thread::spawn(move || { client .get_output_status(&op.txid, op.vout as _) - .map_err(Box::new) + .map_err(Error::from) }) }) .collect::, Error>>>>(); @@ -488,7 +528,7 @@ fn fetch_txs_with_outpoints>( for handle in handles { let handle_result = handle .join() - .map_err(|_| Box::new(esplora_client::Error::InvalidResponse))?; + .map_err(Error::from_thread_panic)?; if let Some(op_status) = handle_result? { let spend_txid = match op_status.txid { Some(txid) => txid, @@ -549,14 +589,13 @@ mod test { let res = (|| -> Result<(), Error> { let handle_result = handle .join() - .map_err(|_| Box::new(esplora_client::Error::InvalidResponse))?; - handle_result?; - Ok(()) + .map_err(Error::from_thread_panic)?; + handle_result })(); assert!(matches!( - *res.unwrap_err(), - esplora_client::Error::InvalidResponse + res.unwrap_err(), + Error::ThreadPanic(_) )); } @@ -564,9 +603,9 @@ mod test { fn ensure_last_index_none_returns_error() { let last_index: Option = None; let err = last_index - .ok_or_else(|| Box::new(esplora_client::Error::InvalidResponse)) + .ok_or_else(|| Error::from(esplora_client::Error::InvalidResponse)) .unwrap_err(); - assert!(matches!(*err, esplora_client::Error::InvalidResponse)); + assert!(matches!(err, Error::Client(esplora_client::Error::InvalidResponse))); } macro_rules! local_chain {