From 9f3bc76a33c3617ed60cc5f13cbbb8d57aff8d6a Mon Sep 17 00:00:00 2001 From: Davidson Souza Date: Fri, 6 Feb 2026 14:18:05 -0300 Subject: [PATCH] fix(electrum): fix a bug with broadcast In #698 small bug where our node would stop responding during start-up, on very specific conditions was introduced. If we broadcast a transaction and shutdown floresta before it gets confirmed, then try to start floresta again, we would get stuck. This happens because electrum would try to use the node handle to broadcast a transaction, but since `FlorestaNode` wouldn't be running, the electrum server would never get a n answer, blocking forever. This commit fixes this by moving the re-broadcast to then main loop. Now the re-broadcast code will try to broadcast again every 24 hours, until the tx gets confirmed. This makes sure our transaction won't fall off the mempool. It also only broadcasts after the node left IBD, to avoid having a transaction that gets stuck inside our mempool due to it already being confirmed. I've also made all ElectrumServer's field private, there's no reason for them to be public. --- .../src/electrum_protocol.rs | 87 ++++++++++++++----- crates/floresta-node/src/florestad.rs | 5 +- 2 files changed, 69 insertions(+), 23 deletions(-) diff --git a/crates/floresta-electrum/src/electrum_protocol.rs b/crates/floresta-electrum/src/electrum_protocol.rs index 486c3b774..277930f36 100644 --- a/crates/floresta-electrum/src/electrum_protocol.rs +++ b/crates/floresta-electrum/src/electrum_protocol.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; +use std::time::Instant; use bitcoin::consensus::deserialize; use bitcoin::consensus::encode::serialize_hex; @@ -32,6 +33,7 @@ use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedSender; use tokio_rustls::TlsAcceptor; +use tracing::debug; use tracing::error; use tracing::info; use tracing::trace; @@ -40,6 +42,11 @@ use crate::get_arg; use crate::json_rpc_res; use crate::request::Request; +/// How often do we re-broadcast our transactions, until it gets confirmed +/// +/// One day, in seconds +const REBROADCAST_INTERVAL: u64 = 24 * 3600; + /// Type alias for u32 representing a ClientId type ClientId = u32; @@ -161,55 +168,61 @@ pub enum Message { pub struct ElectrumServer { /// The blockchain backend we are using. This will be used to query /// blockchain information and broadcast transactions. - pub chain: Arc, + chain: Arc, + /// The address cache is used to store addresses and transactions, like a /// watch-only wallet, but it is adapted to the electrum protocol. - pub address_cache: Arc>, + address_cache: Arc>, /// The clients are the clients connected to our server, we keep track of them /// using a unique id. - pub clients: HashMap>, + clients: HashMap>, + /// The message_receiver receive messages and handles them. - pub message_receiver: UnboundedReceiver, + message_receiver: UnboundedReceiver, + /// The message_transmitter is used to send requests from clients or notifications /// like new or dropped clients - pub message_transmitter: UnboundedSender, + message_transmitter: UnboundedSender, + /// The client_addresses is used to keep track of the addresses of each client. /// We keep the script_hash and which client has it, so we can notify the /// clients when a new transaction is received. - pub client_addresses: HashMap>, + client_addresses: HashMap>, + /// A Arc-ed copy of the block filters backend that we can use to check if a /// block contains a transaction that we are interested in. - pub block_filters: Option>>, + block_filters: Option>>, + /// An interface to a running node, used to broadcast transactions and request /// blocks. - pub node_interface: NodeInterface, + node_interface: NodeInterface, + /// A list of addresses that we've just learned about and need to rescan for /// transactions. /// /// We accumulate those addresses here and then periodically /// scan, since a wallet will often send multiple addresses, but /// in different requests. - pub addresses_to_scan: Vec, + addresses_to_scan: Vec, + + /// Last time we've re-broadcasted our transactions. We want to do this every hour, to make + /// sure our transactions don't get stuck in the mempool if they are not getting confirmed for + /// some reason. We keep track of this time to know when to re-broadcast them. + last_rebroadcast: Option, } impl ElectrumServer { - pub async fn new( + pub fn new( address_cache: Arc>, chain: Arc, block_filters: Option>>, node_interface: NodeInterface, ) -> Result, Box> { let (tx, rx) = unbounded_channel(); - let unconfirmed = address_cache.find_unconfirmed().unwrap(); - for tx in unconfirmed { - let txid = tx.compute_txid(); - if let Err(e) = node_interface.broadcast_transaction(tx).await? { - error!("Could not re-broadcast tx: {txid} due to {e}"); - } - } Ok(ElectrumServer { + last_rebroadcast: None, chain, address_cache, block_filters, @@ -222,6 +235,11 @@ impl ElectrumServer { }) } + /// Notifier to send messages to the main loop + pub fn get_notifier(&self) -> UnboundedSender { + self.message_transmitter.clone() + } + /// Handle a request from a client. All methods are defined in the electrum /// protocol. async fn handle_client_request( @@ -517,6 +535,18 @@ impl ElectrumServer { } } + pub async fn rebroadcast_mempool_transactions(&self) { + let unconfirmed = self.address_cache.find_unconfirmed().unwrap(); + for tx in unconfirmed { + let txid = tx.compute_txid(); + if let Ok(Err(e)) = self.node_interface.broadcast_transaction(tx.clone()).await { + error!("Could not rebroadcast transaction {txid} due to {e}"); + } else { + debug!("Rebroadcasted transaction {txid}"); + } + } + } + pub async fn main_loop(mut self) -> Result<(), crate::error::Error> { let blocks = Channel::new(); let blocks = Arc::new(blocks); @@ -527,6 +557,7 @@ impl ElectrumServer { for (block, height) in blocks.recv() { self.handle_block(block, height); } + // handles client requests while let Ok(request) = tokio::time::timeout( std::time::Duration::from_secs(1), @@ -539,6 +570,24 @@ impl ElectrumServer { } } + // Handle transaction rebroadcast + let should_rebroadcast = self + .last_rebroadcast + .map(|last| last.elapsed() > std::time::Duration::from_secs(REBROADCAST_INTERVAL)) + .unwrap_or(true); + + if should_rebroadcast { + // Don't rebroadcast if we're in IBD, since our transactions might already be in + // the blocks we're downloading, so rebroadcasting them would be redundant and + // could even cause issues with some nodes. + if self.chain.is_in_ibd() { + continue; + } + + self.rebroadcast_mempool_transactions().await; + self.last_rebroadcast = Some(Instant::now()); + } + // rescan for new addresses, if any if !self.addresses_to_scan.is_empty() { if self.chain.is_in_ibd() { @@ -1067,9 +1116,7 @@ mod test { let tls_acceptor = tls_config.map(TlsAcceptor::from); let electrum_server: ElectrumServer> = - ElectrumServer::new(wallet, chain, None, node_interface) - .await - .unwrap(); + ElectrumServer::new(wallet, chain, None, node_interface).unwrap(); let non_tls_listener = Arc::new(TcpListener::bind(e_addr).await.unwrap()); let assigned_port = non_tls_listener.local_addr().unwrap().port(); diff --git a/crates/floresta-node/src/florestad.rs b/crates/floresta-node/src/florestad.rs index 351674019..c0836a7fe 100644 --- a/crates/floresta-node/src/florestad.rs +++ b/crates/floresta-node/src/florestad.rs @@ -499,7 +499,6 @@ impl Florestad { cfilters, chain_provider.get_handle(), ) - .await .map_err(FlorestadError::CouldNotCreateElectrumServer)?; // Default Electrum Server port. @@ -526,7 +525,7 @@ impl Florestad { task::spawn(client_accept_loop( non_tls_listener, - electrum_server.message_transmitter.clone(), + electrum_server.get_notifier(), None, )); info!("Electrum Server is running at {electrum_addr}"); @@ -591,7 +590,7 @@ impl Florestad { let tls_acceptor: TlsAcceptor = TlsAcceptor::from(tls_config); task::spawn(client_accept_loop( tls_listener, - electrum_server.message_transmitter.clone(), + electrum_server.get_notifier(), Some(tls_acceptor), )); info!("Electrum TLS Server is running at {electrum_addr_tls}");