diff --git a/Cargo.lock b/Cargo.lock index aa9771fd8b..5064c6de45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1624,6 +1624,20 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -7096,6 +7110,7 @@ dependencies = [ "console_error_panic_hook", "crdts", "custom_debug", + "dashmap", "dirs-next", "eyre", "futures", diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index d570799c5d..a21f64c94b 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -352,9 +352,14 @@ impl SpendDagDb { })); } else if let Some(sender) = spend_processing.clone() { let (reattempt_addrs, fetched_addrs, addrs_for_further_track) = client - .crawl_to_next_utxos(&mut addrs_to_get, sender.clone(), *UTXO_REATTEMPT_SECONDS) + .crawl_to_next_utxos( + addrs_to_get.clone(), + sender.clone(), + *UTXO_REATTEMPT_SECONDS, + ) .await; + addrs_to_get.clear(); let mut utxo_addresses = self.utxo_addresses.write().await; for addr in fetched_addrs { let _ = utxo_addresses.remove(&addr); diff --git a/sn_client/Cargo.toml b/sn_client/Cargo.toml index 92c944bb87..3c68c6e66c 100644 --- a/sn_client/Cargo.toml +++ b/sn_client/Cargo.toml @@ -38,6 +38,7 @@ bls = { package = "blsttc", version = "8.0.1" } bytes = { version = "1.0.1", features = ["serde"] } crdts = "7.3.2" custom_debug = "~0.6.1" +dashmap = "~6.1.0" futures = "~0.3.13" hex = "~0.4.3" itertools = "~0.12.1" diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index 7816eb2806..fa00a5078f 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -8,7 +8,11 @@ use crate::{Client, Error, SpendDag}; -use futures::{future::join_all, StreamExt}; +use dashmap::DashMap; +use futures::{ + future::join_all, + stream::{self, StreamExt}, +}; use sn_networking::{GetRecordError, NetworkError}; use sn_transfers::{ NanoTokens, SignedSpend, SpendAddress, SpendReason, UniquePubkey, WalletError, WalletResult, @@ -16,9 +20,10 @@ use sn_transfers::{ }; use std::{ collections::{BTreeMap, BTreeSet}, + sync::Arc, time::{Duration, Instant}, }; -use tokio::{sync::mpsc::Sender, task::JoinSet}; +use tokio::sync::mpsc::Sender; const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096; @@ -145,11 +150,12 @@ impl Client { /// Get spends from a set of given SpendAddresses /// Drain the addresses at the same layer first, then: - /// 1, return with UTXOs for re-attempt (with insertion time stamp) - /// 2, addrs_to_get to hold the addresses for further track + /// 1, return failed_utxos for re-attempt (with insertion time stamp) + /// 2, return fetched_address to avoid un-necessary re-attempts + /// 3, return addrs_for_further_track for further track pub async fn crawl_to_next_utxos( &self, - addrs_to_get: &mut BTreeMap, + addrs_to_get: BTreeMap, sender: Sender<(SignedSpend, u64, bool)>, reattempt_seconds: u64, ) -> ( @@ -157,96 +163,111 @@ impl Client { Vec, BTreeSet<(SpendAddress, NanoTokens)>, ) { - let mut failed_utxos = BTreeMap::new(); - let mut tasks = JoinSet::new(); - let mut addrs_for_further_track = BTreeSet::new(); - let mut fetched_addrs = Vec::new(); - - while !addrs_to_get.is_empty() || !tasks.is_empty() { - while tasks.len() < 32 && !addrs_to_get.is_empty() { - if let Some((addr, (failed_times, amount))) = addrs_to_get.pop_first() { - let client_clone = self.clone(); - let _ = tasks.spawn(async move { - ( - client_clone.crawl_spend(addr).await, - failed_times, - addr, - amount, - ) - }); - } - } - - if let Some(Ok((result, failed_times, address, amount))) = tasks.join_next().await { - match result { - InternalGetNetworkSpend::Spend(spend) => { - let for_further_track = beta_track_analyze_spend(&spend); - let _ = sender - .send((*spend, for_further_track.len() as u64, false)) - .await - .map_err(|e| WalletError::SpendProcessing(e.to_string())); - addrs_for_further_track.extend(for_further_track); - fetched_addrs.push(address); - } - InternalGetNetworkSpend::DoubleSpend(spends) => { - warn!( - "Detected burnt spend regarding {address:?} - {:?}", - spends.len() - ); - for (i, spend) in spends.iter().enumerate() { - let reason = spend.reason(); - let amount = spend.spend.amount(); - let ancestors_len = spend.spend.ancestors.len(); - let descendants_len = spend.spend.descendants.len(); - let roy_len = spend.spend.network_royalties().len(); + // max concurrency for the tasks of fetching records from network. + const MAX_CONCURRENT: usize = 64; + + let failed_utxos_arc: Arc> = Arc::new(DashMap::new()); + let addrs_for_further_track_arc: Arc> = Arc::new(DashMap::new()); + let fetched_addrs_arc: Arc> = Arc::new(DashMap::new()); + + stream::iter(addrs_to_get.into_iter()) + .map(|(addr, (failed_times, amount))| { + let client_clone = self.clone(); + let sender_clone = sender.clone(); + + let failed_utxos = Arc::clone(&failed_utxos_arc); + let addrs_for_further_track = Arc::clone(&addrs_for_further_track_arc); + let fetched_addrs = Arc::clone(&fetched_addrs_arc); + async move { + let result = client_clone.crawl_spend(addr).await; + + match result { + InternalGetNetworkSpend::Spend(spend) => { + let for_further_track = beta_track_analyze_spend(&spend); + let _ = sender_clone + .send((*spend, for_further_track.len() as u64, false)) + .await; + for entry in for_further_track { + let _ = addrs_for_further_track.insert(entry, ()); + } + fetched_addrs.insert(addr, ()); + } + InternalGetNetworkSpend::DoubleSpend(spends) => { warn!( - "burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}", - spend.spend.ancestors, spend.spend.descendants + "Detected burnt spend regarding {addr:?} - {:?}", + spends.len() ); - let for_further_track = beta_track_analyze_spend(spend); - addrs_for_further_track.extend(for_further_track); - - let _ = sender - .send((spend.clone(), 0, true)) - .await - .map_err(|e| WalletError::SpendProcessing(e.to_string())); + for (i, spend) in spends.into_iter().enumerate() { + let reason = spend.reason(); + let amount = spend.spend.amount(); + let ancestors_len = spend.spend.ancestors.len(); + let descendants_len = spend.spend.descendants.len(); + let roy_len = spend.spend.network_royalties().len(); + warn!("burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}", + spend.spend.ancestors, spend.spend.descendants); + } + fetched_addrs.insert(addr, ()); + } + InternalGetNetworkSpend::NotFound => { + let reattempt_interval = if amount.as_nano() > 100000 { + info!("Not find spend of big-UTXO {addr:?} with {amount}"); + reattempt_seconds + } else { + reattempt_seconds * (failed_times * 8 + 1) + }; + failed_utxos.insert( + addr, + ( + failed_times + 1, + Instant::now() + Duration::from_secs(reattempt_interval), + amount, + ), + ); + } + InternalGetNetworkSpend::Error(e) => { + warn!("Fetching spend {addr:?} with {amount:?} result in error {e:?}"); + // Error of `NotEnoughCopies` could be re-attempted and succeed eventually. + failed_utxos.insert( + addr, + ( + failed_times + 1, + Instant::now() + Duration::from_secs(reattempt_seconds), + amount, + ), + ); } - fetched_addrs.push(address); - } - InternalGetNetworkSpend::NotFound => { - let reattempt_interval = if amount.as_nano() > 100000 { - info!("Not find spend of big-UTXO {address:?} with {amount}"); - reattempt_seconds - } else { - reattempt_seconds * (failed_times * 8 + 1) - }; - let _ = failed_utxos.insert( - address, - ( - failed_times + 1, - Instant::now() + Duration::from_secs(reattempt_interval), - amount, - ), - ); - } - InternalGetNetworkSpend::Error(e) => { - warn!("Fetching spend {address:?} with {amount:?} result in error {e:?}"); - // Error of `NotEnoughCopies` could be re-attempted and succeed eventually. - let _ = failed_utxos.insert( - address, - ( - failed_times + 1, - Instant::now() + Duration::from_secs(reattempt_seconds), - amount, - ), - ); } + + (addr, amount) } - } + }) + .buffer_unordered(MAX_CONCURRENT) + .for_each(|(address, amount)| async move { + info!("Completed fetching attempt of {address:?} with amount {amount:?}"); + }) + .await; + + let mut failed_utxos_result = BTreeMap::new(); + for entry in failed_utxos_arc.iter() { + let key = entry.key(); + let val = entry.value(); + let _ = failed_utxos_result.insert(*key, *val); + } + + let mut fetched_addrs = Vec::new(); + for entry in fetched_addrs_arc.iter() { + let key = entry.key(); + fetched_addrs.push(*key); + } + + let mut addrs_for_further_track = BTreeSet::new(); + for entry in addrs_for_further_track_arc.iter() { + let key = entry.key(); + let _ = addrs_for_further_track.insert(*key); } - (failed_utxos, fetched_addrs, addrs_for_further_track) + (failed_utxos_result, fetched_addrs, addrs_for_further_track) } /// Crawls the Spend Dag from a given SpendAddress recursively