Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 67 additions & 20 deletions crates/floresta-electrum/src/electrum_protocol.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;

use bitcoin::consensus::deserialize;
use bitcoin::consensus::encode::serialize_hex;
Expand Down Expand Up @@ -32,6 +33,7 @@ use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio_rustls::TlsAcceptor;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::trace;
Expand All @@ -40,6 +42,11 @@ use crate::get_arg;
use crate::json_rpc_res;
use crate::request::Request;

/// How often do we re-broadcast our transactions, until it gets confirmed
///
/// One day, in seconds
const REBROADCAST_INTERVAL: u64 = 24 * 3600;

/// Type alias for u32 representing a ClientId
type ClientId = u32;

Expand Down Expand Up @@ -161,55 +168,61 @@ pub enum Message {
pub struct ElectrumServer<Blockchain: BlockchainInterface> {
/// The blockchain backend we are using. This will be used to query
/// blockchain information and broadcast transactions.
pub chain: Arc<Blockchain>,
chain: Arc<Blockchain>,

/// The address cache is used to store addresses and transactions, like a
/// watch-only wallet, but it is adapted to the electrum protocol.
pub address_cache: Arc<AddressCache<KvDatabase>>,
address_cache: Arc<AddressCache<KvDatabase>>,

/// The clients are the clients connected to our server, we keep track of them
/// using a unique id.
pub clients: HashMap<ClientId, Arc<Client>>,
clients: HashMap<ClientId, Arc<Client>>,

/// The message_receiver receive messages and handles them.
pub message_receiver: UnboundedReceiver<Message>,
message_receiver: UnboundedReceiver<Message>,

/// The message_transmitter is used to send requests from clients or notifications
/// like new or dropped clients
pub message_transmitter: UnboundedSender<Message>,
message_transmitter: UnboundedSender<Message>,

/// The client_addresses is used to keep track of the addresses of each client.
/// We keep the script_hash and which client has it, so we can notify the
/// clients when a new transaction is received.
pub client_addresses: HashMap<sha256::Hash, Arc<Client>>,
client_addresses: HashMap<sha256::Hash, Arc<Client>>,

/// A Arc-ed copy of the block filters backend that we can use to check if a
/// block contains a transaction that we are interested in.
pub block_filters: Option<Arc<NetworkFilters<FlatFiltersStore>>>,
block_filters: Option<Arc<NetworkFilters<FlatFiltersStore>>>,

/// An interface to a running node, used to broadcast transactions and request
/// blocks.
pub node_interface: NodeInterface,
node_interface: NodeInterface,

/// A list of addresses that we've just learned about and need to rescan for
/// transactions.
///
/// We accumulate those addresses here and then periodically
/// scan, since a wallet will often send multiple addresses, but
/// in different requests.
pub addresses_to_scan: Vec<ScriptBuf>,
addresses_to_scan: Vec<ScriptBuf>,

/// Last time we've re-broadcasted our transactions. We want to do this every hour, to make
/// sure our transactions don't get stuck in the mempool if they are not getting confirmed for
/// some reason. We keep track of this time to know when to re-broadcast them.
last_rebroadcast: Option<Instant>,
}

impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
pub async fn new(
pub fn new(
address_cache: Arc<AddressCache<KvDatabase>>,
chain: Arc<Blockchain>,
block_filters: Option<Arc<NetworkFilters<FlatFiltersStore>>>,
node_interface: NodeInterface,
) -> Result<ElectrumServer<Blockchain>, Box<dyn std::error::Error>> {
let (tx, rx) = unbounded_channel();
let unconfirmed = address_cache.find_unconfirmed().unwrap();
for tx in unconfirmed {
let txid = tx.compute_txid();
if let Err(e) = node_interface.broadcast_transaction(tx).await? {
error!("Could not re-broadcast tx: {txid} due to {e}");
}
}

Ok(ElectrumServer {
last_rebroadcast: None,
chain,
address_cache,
block_filters,
Expand All @@ -222,6 +235,11 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
})
}

/// Notifier to send messages to the main loop
pub fn get_notifier(&self) -> UnboundedSender<Message> {
self.message_transmitter.clone()
}

/// Handle a request from a client. All methods are defined in the electrum
/// protocol.
async fn handle_client_request(
Expand Down Expand Up @@ -517,6 +535,18 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}
}

pub async fn rebroadcast_mempool_transactions(&self) {
let unconfirmed = self.address_cache.find_unconfirmed().unwrap();
for tx in unconfirmed {
let txid = tx.compute_txid();
if let Ok(Err(e)) = self.node_interface.broadcast_transaction(tx.clone()).await {
error!("Could not rebroadcast transaction {txid} due to {e}");
} else {
debug!("Rebroadcasted transaction {txid}");
}
}
}

pub async fn main_loop(mut self) -> Result<(), crate::error::Error> {
let blocks = Channel::new();
let blocks = Arc::new(blocks);
Expand All @@ -527,6 +557,7 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
for (block, height) in blocks.recv() {
self.handle_block(block, height);
}

// handles client requests
while let Ok(request) = tokio::time::timeout(
std::time::Duration::from_secs(1),
Expand All @@ -539,6 +570,24 @@ impl<Blockchain: BlockchainInterface> ElectrumServer<Blockchain> {
}
}

// Handle transaction rebroadcast
let should_rebroadcast = self
.last_rebroadcast
.map(|last| last.elapsed() > std::time::Duration::from_secs(REBROADCAST_INTERVAL))
.unwrap_or(true);

if should_rebroadcast {
// Don't rebroadcast if we're in IBD, since our transactions might already be in
// the blocks we're downloading, so rebroadcasting them would be redundant and
// could even cause issues with some nodes.
if self.chain.is_in_ibd() {
continue;
}
Comment on lines +574 to +585
Copy link
Member

@luisschwab luisschwab Feb 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combine the should_rebroadcast and self.chain.is_in_ibd checks to the same expression


self.rebroadcast_mempool_transactions().await;
self.last_rebroadcast = Some(Instant::now());
}

// rescan for new addresses, if any
if !self.addresses_to_scan.is_empty() {
if self.chain.is_in_ibd() {
Expand Down Expand Up @@ -1067,9 +1116,7 @@ mod test {
let tls_acceptor = tls_config.map(TlsAcceptor::from);

let electrum_server: ElectrumServer<ChainState<FlatChainStore>> =
ElectrumServer::new(wallet, chain, None, node_interface)
.await
.unwrap();
ElectrumServer::new(wallet, chain, None, node_interface).unwrap();
let non_tls_listener = Arc::new(TcpListener::bind(e_addr).await.unwrap());
let assigned_port = non_tls_listener.local_addr().unwrap().port();

Expand Down
5 changes: 2 additions & 3 deletions crates/floresta-node/src/florestad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,6 @@ impl Florestad {
cfilters,
chain_provider.get_handle(),
)
.await
.map_err(FlorestadError::CouldNotCreateElectrumServer)?;

// Default Electrum Server port.
Expand All @@ -526,7 +525,7 @@ impl Florestad {

task::spawn(client_accept_loop(
non_tls_listener,
electrum_server.message_transmitter.clone(),
electrum_server.get_notifier(),
None,
));
info!("Electrum Server is running at {electrum_addr}");
Expand Down Expand Up @@ -591,7 +590,7 @@ impl Florestad {
let tls_acceptor: TlsAcceptor = TlsAcceptor::from(tls_config);
task::spawn(client_accept_loop(
tls_listener,
electrum_server.message_transmitter.clone(),
electrum_server.get_notifier(),
Some(tls_acceptor),
));
info!("Electrum TLS Server is running at {electrum_addr_tls}");
Expand Down
Loading