diff --git a/crates/floresta-electrum/src/electrum_protocol.rs b/crates/floresta-electrum/src/electrum_protocol.rs index 486c3b774..277930f36 100644 --- a/crates/floresta-electrum/src/electrum_protocol.rs +++ b/crates/floresta-electrum/src/electrum_protocol.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; +use std::time::Instant; use bitcoin::consensus::deserialize; use bitcoin::consensus::encode::serialize_hex; @@ -32,6 +33,7 @@ use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedSender; use tokio_rustls::TlsAcceptor; +use tracing::debug; use tracing::error; use tracing::info; use tracing::trace; @@ -40,6 +42,11 @@ use crate::get_arg; use crate::json_rpc_res; use crate::request::Request; +/// How often do we re-broadcast our transactions, until it gets confirmed +/// +/// One day, in seconds +const REBROADCAST_INTERVAL: u64 = 24 * 3600; + /// Type alias for u32 representing a ClientId type ClientId = u32; @@ -161,55 +168,61 @@ pub enum Message { pub struct ElectrumServer { /// The blockchain backend we are using. This will be used to query /// blockchain information and broadcast transactions. - pub chain: Arc, + chain: Arc, + /// The address cache is used to store addresses and transactions, like a /// watch-only wallet, but it is adapted to the electrum protocol. - pub address_cache: Arc>, + address_cache: Arc>, /// The clients are the clients connected to our server, we keep track of them /// using a unique id. - pub clients: HashMap>, + clients: HashMap>, + /// The message_receiver receive messages and handles them. - pub message_receiver: UnboundedReceiver, + message_receiver: UnboundedReceiver, + /// The message_transmitter is used to send requests from clients or notifications /// like new or dropped clients - pub message_transmitter: UnboundedSender, + message_transmitter: UnboundedSender, + /// The client_addresses is used to keep track of the addresses of each client. /// We keep the script_hash and which client has it, so we can notify the /// clients when a new transaction is received. - pub client_addresses: HashMap>, + client_addresses: HashMap>, + /// A Arc-ed copy of the block filters backend that we can use to check if a /// block contains a transaction that we are interested in. - pub block_filters: Option>>, + block_filters: Option>>, + /// An interface to a running node, used to broadcast transactions and request /// blocks. - pub node_interface: NodeInterface, + node_interface: NodeInterface, + /// A list of addresses that we've just learned about and need to rescan for /// transactions. /// /// We accumulate those addresses here and then periodically /// scan, since a wallet will often send multiple addresses, but /// in different requests. - pub addresses_to_scan: Vec, + addresses_to_scan: Vec, + + /// Last time we've re-broadcasted our transactions. We want to do this every hour, to make + /// sure our transactions don't get stuck in the mempool if they are not getting confirmed for + /// some reason. We keep track of this time to know when to re-broadcast them. + last_rebroadcast: Option, } impl ElectrumServer { - pub async fn new( + pub fn new( address_cache: Arc>, chain: Arc, block_filters: Option>>, node_interface: NodeInterface, ) -> Result, Box> { let (tx, rx) = unbounded_channel(); - let unconfirmed = address_cache.find_unconfirmed().unwrap(); - for tx in unconfirmed { - let txid = tx.compute_txid(); - if let Err(e) = node_interface.broadcast_transaction(tx).await? { - error!("Could not re-broadcast tx: {txid} due to {e}"); - } - } Ok(ElectrumServer { + last_rebroadcast: None, chain, address_cache, block_filters, @@ -222,6 +235,11 @@ impl ElectrumServer { }) } + /// Notifier to send messages to the main loop + pub fn get_notifier(&self) -> UnboundedSender { + self.message_transmitter.clone() + } + /// Handle a request from a client. All methods are defined in the electrum /// protocol. async fn handle_client_request( @@ -517,6 +535,18 @@ impl ElectrumServer { } } + pub async fn rebroadcast_mempool_transactions(&self) { + let unconfirmed = self.address_cache.find_unconfirmed().unwrap(); + for tx in unconfirmed { + let txid = tx.compute_txid(); + if let Ok(Err(e)) = self.node_interface.broadcast_transaction(tx.clone()).await { + error!("Could not rebroadcast transaction {txid} due to {e}"); + } else { + debug!("Rebroadcasted transaction {txid}"); + } + } + } + pub async fn main_loop(mut self) -> Result<(), crate::error::Error> { let blocks = Channel::new(); let blocks = Arc::new(blocks); @@ -527,6 +557,7 @@ impl ElectrumServer { for (block, height) in blocks.recv() { self.handle_block(block, height); } + // handles client requests while let Ok(request) = tokio::time::timeout( std::time::Duration::from_secs(1), @@ -539,6 +570,24 @@ impl ElectrumServer { } } + // Handle transaction rebroadcast + let should_rebroadcast = self + .last_rebroadcast + .map(|last| last.elapsed() > std::time::Duration::from_secs(REBROADCAST_INTERVAL)) + .unwrap_or(true); + + if should_rebroadcast { + // Don't rebroadcast if we're in IBD, since our transactions might already be in + // the blocks we're downloading, so rebroadcasting them would be redundant and + // could even cause issues with some nodes. + if self.chain.is_in_ibd() { + continue; + } + + self.rebroadcast_mempool_transactions().await; + self.last_rebroadcast = Some(Instant::now()); + } + // rescan for new addresses, if any if !self.addresses_to_scan.is_empty() { if self.chain.is_in_ibd() { @@ -1067,9 +1116,7 @@ mod test { let tls_acceptor = tls_config.map(TlsAcceptor::from); let electrum_server: ElectrumServer> = - ElectrumServer::new(wallet, chain, None, node_interface) - .await - .unwrap(); + ElectrumServer::new(wallet, chain, None, node_interface).unwrap(); let non_tls_listener = Arc::new(TcpListener::bind(e_addr).await.unwrap()); let assigned_port = non_tls_listener.local_addr().unwrap().port(); diff --git a/crates/floresta-node/src/florestad.rs b/crates/floresta-node/src/florestad.rs index 351674019..c0836a7fe 100644 --- a/crates/floresta-node/src/florestad.rs +++ b/crates/floresta-node/src/florestad.rs @@ -499,7 +499,6 @@ impl Florestad { cfilters, chain_provider.get_handle(), ) - .await .map_err(FlorestadError::CouldNotCreateElectrumServer)?; // Default Electrum Server port. @@ -526,7 +525,7 @@ impl Florestad { task::spawn(client_accept_loop( non_tls_listener, - electrum_server.message_transmitter.clone(), + electrum_server.get_notifier(), None, )); info!("Electrum Server is running at {electrum_addr}"); @@ -591,7 +590,7 @@ impl Florestad { let tls_acceptor: TlsAcceptor = TlsAcceptor::from(tls_config); task::spawn(client_accept_loop( tls_listener, - electrum_server.message_transmitter.clone(), + electrum_server.get_notifier(), Some(tls_acceptor), )); info!("Electrum TLS Server is running at {electrum_addr_tls}");