From 0dd22467631bac4393c7f1cdcc4563c56db99487 Mon Sep 17 00:00:00 2001 From: qima Date: Wed, 4 Sep 2024 20:54:41 +0800 Subject: [PATCH 1/3] fix(auditor): not to re-attempt fetched spend --- sn_auditor/src/dag_db.rs | 14 +++++++++++--- sn_client/src/audit/dag_crawling.rs | 7 ++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index a1bb786010..d570799c5d 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -303,6 +303,7 @@ impl SpendDagDb { }; let mut addrs_to_get = BTreeMap::new(); + let mut addrs_fetched = BTreeSet::new(); loop { // get expired utxos for re-attempt fetch @@ -350,16 +351,23 @@ impl SpendDagDb { ) })); } else if let Some(sender) = spend_processing.clone() { - let (reattempt_addrs, fetched_addrs) = client + let (reattempt_addrs, fetched_addrs, addrs_for_further_track) = client .crawl_to_next_utxos(&mut addrs_to_get, sender.clone(), *UTXO_REATTEMPT_SECONDS) .await; + let mut utxo_addresses = self.utxo_addresses.write().await; - for addr in fetched_addrs.iter() { - let _ = utxo_addresses.remove(addr); + for addr in fetched_addrs { + let _ = utxo_addresses.remove(&addr); + let _ = addrs_fetched.insert(addr); } for (addr, tuple) in reattempt_addrs { let _ = utxo_addresses.insert(addr, tuple); } + for (addr, amount) in addrs_for_further_track { + if !addrs_fetched.contains(&addr) { + let _ = addrs_to_get.entry(addr).or_insert((0, amount)); + } + } } else { panic!("There is no point in running the auditor if we are not collecting the DAG or collecting data through crawling. Please enable the `dag-collection` feature or provide beta program related arguments."); }; diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index e29760858e..7816eb2806 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -155,6 +155,7 @@ impl Client { ) -> ( BTreeMap, Vec, + BTreeSet<(SpendAddress, NanoTokens)>, ) { let mut failed_utxos = BTreeMap::new(); let mut tasks = JoinSet::new(); @@ -245,11 +246,7 @@ impl Client { } } - for (addr, amount) in addrs_for_further_track { - let _ = addrs_to_get.entry(addr).or_insert((0, amount)); - } - - (failed_utxos, fetched_addrs) + (failed_utxos, fetched_addrs, addrs_for_further_track) } /// Crawls the Spend Dag from a given SpendAddress recursively From 42404fec200f0b78badccddd8735b4d28e2df2c8 Mon Sep 17 00:00:00 2001 From: qima Date: Fri, 6 Sep 2024 21:41:33 +0800 Subject: [PATCH 2/3] fix(auditor): use DashMap and stream for better threading --- Cargo.lock | 15 +++ sn_auditor/src/dag_db.rs | 7 +- sn_client/Cargo.toml | 1 + sn_client/src/audit/dag_crawling.rs | 195 +++++++++++++++------------- 4 files changed, 130 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aa9771fd8b..5064c6de45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1624,6 +1624,20 @@ dependencies = [ "syn 2.0.79", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -7096,6 +7110,7 @@ dependencies = [ "console_error_panic_hook", "crdts", "custom_debug", + "dashmap", "dirs-next", "eyre", "futures", diff --git a/sn_auditor/src/dag_db.rs b/sn_auditor/src/dag_db.rs index d570799c5d..a21f64c94b 100644 --- a/sn_auditor/src/dag_db.rs +++ b/sn_auditor/src/dag_db.rs @@ -352,9 +352,14 @@ impl SpendDagDb { })); } else if let Some(sender) = spend_processing.clone() { let (reattempt_addrs, fetched_addrs, addrs_for_further_track) = client - .crawl_to_next_utxos(&mut addrs_to_get, sender.clone(), *UTXO_REATTEMPT_SECONDS) + .crawl_to_next_utxos( + addrs_to_get.clone(), + sender.clone(), + *UTXO_REATTEMPT_SECONDS, + ) .await; + addrs_to_get.clear(); let mut utxo_addresses = self.utxo_addresses.write().await; for addr in fetched_addrs { let _ = utxo_addresses.remove(&addr); diff --git a/sn_client/Cargo.toml b/sn_client/Cargo.toml index 92c944bb87..3c68c6e66c 100644 --- a/sn_client/Cargo.toml +++ b/sn_client/Cargo.toml @@ -38,6 +38,7 @@ bls = { package = "blsttc", version = "8.0.1" } bytes = { version = "1.0.1", features = ["serde"] } crdts = "7.3.2" custom_debug = "~0.6.1" +dashmap = "~6.1.0" futures = "~0.3.13" hex = "~0.4.3" itertools = "~0.12.1" diff --git a/sn_client/src/audit/dag_crawling.rs b/sn_client/src/audit/dag_crawling.rs index 7816eb2806..fa00a5078f 100644 --- a/sn_client/src/audit/dag_crawling.rs +++ b/sn_client/src/audit/dag_crawling.rs @@ -8,7 +8,11 @@ use crate::{Client, Error, SpendDag}; -use futures::{future::join_all, StreamExt}; +use dashmap::DashMap; +use futures::{ + future::join_all, + stream::{self, StreamExt}, +}; use sn_networking::{GetRecordError, NetworkError}; use sn_transfers::{ NanoTokens, SignedSpend, SpendAddress, SpendReason, UniquePubkey, WalletError, WalletResult, @@ -16,9 +20,10 @@ use sn_transfers::{ }; use std::{ collections::{BTreeMap, BTreeSet}, + sync::Arc, time::{Duration, Instant}, }; -use tokio::{sync::mpsc::Sender, task::JoinSet}; +use tokio::sync::mpsc::Sender; const SPENDS_PROCESSING_BUFFER_SIZE: usize = 4096; @@ -145,11 +150,12 @@ impl Client { /// Get spends from a set of given SpendAddresses /// Drain the addresses at the same layer first, then: - /// 1, return with UTXOs for re-attempt (with insertion time stamp) - /// 2, addrs_to_get to hold the addresses for further track + /// 1, return failed_utxos for re-attempt (with insertion time stamp) + /// 2, return fetched_address to avoid un-necessary re-attempts + /// 3, return addrs_for_further_track for further track pub async fn crawl_to_next_utxos( &self, - addrs_to_get: &mut BTreeMap, + addrs_to_get: BTreeMap, sender: Sender<(SignedSpend, u64, bool)>, reattempt_seconds: u64, ) -> ( @@ -157,96 +163,111 @@ impl Client { Vec, BTreeSet<(SpendAddress, NanoTokens)>, ) { - let mut failed_utxos = BTreeMap::new(); - let mut tasks = JoinSet::new(); - let mut addrs_for_further_track = BTreeSet::new(); - let mut fetched_addrs = Vec::new(); - - while !addrs_to_get.is_empty() || !tasks.is_empty() { - while tasks.len() < 32 && !addrs_to_get.is_empty() { - if let Some((addr, (failed_times, amount))) = addrs_to_get.pop_first() { - let client_clone = self.clone(); - let _ = tasks.spawn(async move { - ( - client_clone.crawl_spend(addr).await, - failed_times, - addr, - amount, - ) - }); - } - } - - if let Some(Ok((result, failed_times, address, amount))) = tasks.join_next().await { - match result { - InternalGetNetworkSpend::Spend(spend) => { - let for_further_track = beta_track_analyze_spend(&spend); - let _ = sender - .send((*spend, for_further_track.len() as u64, false)) - .await - .map_err(|e| WalletError::SpendProcessing(e.to_string())); - addrs_for_further_track.extend(for_further_track); - fetched_addrs.push(address); - } - InternalGetNetworkSpend::DoubleSpend(spends) => { - warn!( - "Detected burnt spend regarding {address:?} - {:?}", - spends.len() - ); - for (i, spend) in spends.iter().enumerate() { - let reason = spend.reason(); - let amount = spend.spend.amount(); - let ancestors_len = spend.spend.ancestors.len(); - let descendants_len = spend.spend.descendants.len(); - let roy_len = spend.spend.network_royalties().len(); + // max concurrency for the tasks of fetching records from network. + const MAX_CONCURRENT: usize = 64; + + let failed_utxos_arc: Arc> = Arc::new(DashMap::new()); + let addrs_for_further_track_arc: Arc> = Arc::new(DashMap::new()); + let fetched_addrs_arc: Arc> = Arc::new(DashMap::new()); + + stream::iter(addrs_to_get.into_iter()) + .map(|(addr, (failed_times, amount))| { + let client_clone = self.clone(); + let sender_clone = sender.clone(); + + let failed_utxos = Arc::clone(&failed_utxos_arc); + let addrs_for_further_track = Arc::clone(&addrs_for_further_track_arc); + let fetched_addrs = Arc::clone(&fetched_addrs_arc); + async move { + let result = client_clone.crawl_spend(addr).await; + + match result { + InternalGetNetworkSpend::Spend(spend) => { + let for_further_track = beta_track_analyze_spend(&spend); + let _ = sender_clone + .send((*spend, for_further_track.len() as u64, false)) + .await; + for entry in for_further_track { + let _ = addrs_for_further_track.insert(entry, ()); + } + fetched_addrs.insert(addr, ()); + } + InternalGetNetworkSpend::DoubleSpend(spends) => { warn!( - "burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}", - spend.spend.ancestors, spend.spend.descendants + "Detected burnt spend regarding {addr:?} - {:?}", + spends.len() ); - let for_further_track = beta_track_analyze_spend(spend); - addrs_for_further_track.extend(for_further_track); - - let _ = sender - .send((spend.clone(), 0, true)) - .await - .map_err(|e| WalletError::SpendProcessing(e.to_string())); + for (i, spend) in spends.into_iter().enumerate() { + let reason = spend.reason(); + let amount = spend.spend.amount(); + let ancestors_len = spend.spend.ancestors.len(); + let descendants_len = spend.spend.descendants.len(); + let roy_len = spend.spend.network_royalties().len(); + warn!("burnt spend entry {i} reason {reason:?}, amount {amount}, ancestors: {ancestors_len}, descendants: {descendants_len}, royalties: {roy_len}, {:?} - {:?}", + spend.spend.ancestors, spend.spend.descendants); + } + fetched_addrs.insert(addr, ()); + } + InternalGetNetworkSpend::NotFound => { + let reattempt_interval = if amount.as_nano() > 100000 { + info!("Not find spend of big-UTXO {addr:?} with {amount}"); + reattempt_seconds + } else { + reattempt_seconds * (failed_times * 8 + 1) + }; + failed_utxos.insert( + addr, + ( + failed_times + 1, + Instant::now() + Duration::from_secs(reattempt_interval), + amount, + ), + ); + } + InternalGetNetworkSpend::Error(e) => { + warn!("Fetching spend {addr:?} with {amount:?} result in error {e:?}"); + // Error of `NotEnoughCopies` could be re-attempted and succeed eventually. + failed_utxos.insert( + addr, + ( + failed_times + 1, + Instant::now() + Duration::from_secs(reattempt_seconds), + amount, + ), + ); } - fetched_addrs.push(address); - } - InternalGetNetworkSpend::NotFound => { - let reattempt_interval = if amount.as_nano() > 100000 { - info!("Not find spend of big-UTXO {address:?} with {amount}"); - reattempt_seconds - } else { - reattempt_seconds * (failed_times * 8 + 1) - }; - let _ = failed_utxos.insert( - address, - ( - failed_times + 1, - Instant::now() + Duration::from_secs(reattempt_interval), - amount, - ), - ); - } - InternalGetNetworkSpend::Error(e) => { - warn!("Fetching spend {address:?} with {amount:?} result in error {e:?}"); - // Error of `NotEnoughCopies` could be re-attempted and succeed eventually. - let _ = failed_utxos.insert( - address, - ( - failed_times + 1, - Instant::now() + Duration::from_secs(reattempt_seconds), - amount, - ), - ); } + + (addr, amount) } - } + }) + .buffer_unordered(MAX_CONCURRENT) + .for_each(|(address, amount)| async move { + info!("Completed fetching attempt of {address:?} with amount {amount:?}"); + }) + .await; + + let mut failed_utxos_result = BTreeMap::new(); + for entry in failed_utxos_arc.iter() { + let key = entry.key(); + let val = entry.value(); + let _ = failed_utxos_result.insert(*key, *val); + } + + let mut fetched_addrs = Vec::new(); + for entry in fetched_addrs_arc.iter() { + let key = entry.key(); + fetched_addrs.push(*key); + } + + let mut addrs_for_further_track = BTreeSet::new(); + for entry in addrs_for_further_track_arc.iter() { + let key = entry.key(); + let _ = addrs_for_further_track.insert(*key); } - (failed_utxos, fetched_addrs, addrs_for_further_track) + (failed_utxos_result, fetched_addrs, addrs_for_further_track) } /// Crawls the Spend Dag from a given SpendAddress recursively From 89760a2b420b8fadd451ed3a2a170312eef91742 Mon Sep 17 00:00:00 2001 From: qima Date: Tue, 8 Oct 2024 23:22:47 +0800 Subject: [PATCH 3/3] chore(release): stable release 2024.10.2.3 --- CHANGELOG.md | 9 +++++++++ Cargo.lock | 4 ++-- autonomi/Cargo.toml | 2 +- release-cycle-info | 2 +- sn_auditor/Cargo.toml | 4 ++-- sn_cli/Cargo.toml | 4 ++-- sn_client/Cargo.toml | 2 +- sn_faucet/Cargo.toml | 2 +- sn_node/Cargo.toml | 2 +- sn_node_rpc_client/Cargo.toml | 2 +- 10 files changed, 21 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9da0b70553..810a8dc264 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 *When editing this file, please respect a line length of 100.* +## 2024-10-08 + +### Network + +#### Changed + +- Optimize auditor tracking by not to re-attempt fetched spend. +- Optimize auditor tracking function by using DashMap and stream. + ## 2024-10-07 ### Network diff --git a/Cargo.lock b/Cargo.lock index 5064c6de45..f35188f13d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7016,7 +7016,7 @@ dependencies = [ [[package]] name = "sn_auditor" -version = "0.3.4" +version = "0.3.5" dependencies = [ "blsttc", "clap", @@ -7099,7 +7099,7 @@ dependencies = [ [[package]] name = "sn_client" -version = "0.110.3" +version = "0.110.4" dependencies = [ "assert_matches", "async-trait", diff --git a/autonomi/Cargo.toml b/autonomi/Cargo.toml index 75b11ac4e9..3c3bef4c6d 100644 --- a/autonomi/Cargo.toml +++ b/autonomi/Cargo.toml @@ -28,7 +28,7 @@ rand = "0.8.5" rmp-serde = "1.1.1" self_encryption = "~0.30.0" serde = { version = "1.0.133", features = ["derive", "rc"] } -sn_client = { path = "../sn_client", version = "0.110.3" } +sn_client = { path = "../sn_client", version = "0.110.4" } sn_protocol = { version = "0.17.11", path = "../sn_protocol" } sn_registers = { path = "../sn_registers", version = "0.3.21" } sn_transfers = { path = "../sn_transfers", version = "0.19.3" } diff --git a/release-cycle-info b/release-cycle-info index 2d6f60ca88..2b83422132 100644 --- a/release-cycle-info +++ b/release-cycle-info @@ -15,4 +15,4 @@ release-year: 2024 release-month: 10 release-cycle: 2 -release-cycle-counter: 2 +release-cycle-counter: 3 diff --git a/sn_auditor/Cargo.toml b/sn_auditor/Cargo.toml index 6251404483..675ab2fcd7 100644 --- a/sn_auditor/Cargo.toml +++ b/sn_auditor/Cargo.toml @@ -2,7 +2,7 @@ authors = ["MaidSafe Developers "] description = "Safe Network Auditor" name = "sn_auditor" -version = "0.3.4" +version = "0.3.5" edition = "2021" homepage = "https://maidsafe.net" repository = "https://github.com/maidsafe/safe_network" @@ -31,7 +31,7 @@ graphviz-rust = { version = "0.9.0", optional = true } lazy_static = "1.4.0" serde = { version = "1.0.133", features = ["derive", "rc"] } serde_json = "1.0.108" -sn_client = { path = "../sn_client", version = "0.110.3" } +sn_client = { path = "../sn_client", version = "0.110.4" } sn_logging = { path = "../sn_logging", version = "0.2.36" } sn_peers_acquisition = { path = "../sn_peers_acquisition", version = "0.5.3" } tiny_http = { version = "0.12", features = ["ssl-rustls"] } diff --git a/sn_cli/Cargo.toml b/sn_cli/Cargo.toml index abc247336b..59686dd5dc 100644 --- a/sn_cli/Cargo.toml +++ b/sn_cli/Cargo.toml @@ -55,7 +55,7 @@ rmp-serde = "1.1.1" rpassword = "7.3.1" serde = { version = "1.0.133", features = ["derive"] } sn_build_info = { path = "../sn_build_info", version = "0.1.15" } -sn_client = { path = "../sn_client", version = "0.110.3" } +sn_client = { path = "../sn_client", version = "0.110.4" } sn_logging = { path = "../sn_logging", version = "0.2.36" } sn_peers_acquisition = { path = "../sn_peers_acquisition", version = "0.5.3" } sn_protocol = { path = "../sn_protocol", version = "0.17.11" } @@ -80,7 +80,7 @@ eyre = "0.6.8" criterion = "0.5.1" tempfile = "3.6.0" rand = { version = "~0.8.5", features = ["small_rng"] } -sn_client = { path = "../sn_client", version = "0.110.3", features = [ +sn_client = { path = "../sn_client", version = "0.110.4", features = [ "test-utils", ] } diff --git a/sn_client/Cargo.toml b/sn_client/Cargo.toml index 3c68c6e66c..a1d49b0508 100644 --- a/sn_client/Cargo.toml +++ b/sn_client/Cargo.toml @@ -8,7 +8,7 @@ license = "GPL-3.0" name = "sn_client" readme = "README.md" repository = "https://github.com/maidsafe/safe_network" -version = "0.110.3" +version = "0.110.4" [features] default = [] diff --git a/sn_faucet/Cargo.toml b/sn_faucet/Cargo.toml index fecabf236e..9a2d3e7d03 100644 --- a/sn_faucet/Cargo.toml +++ b/sn_faucet/Cargo.toml @@ -39,7 +39,7 @@ serde = { version = "1.0.193", features = ["derive"] } serde_json = "1.0.108" sn_build_info = { path = "../sn_build_info", version = "0.1.15" } sn_cli = { path = "../sn_cli", version = "0.95.3" } -sn_client = { path = "../sn_client", version = "0.110.3" } +sn_client = { path = "../sn_client", version = "0.110.4" } sn_logging = { path = "../sn_logging", version = "0.2.36" } sn_peers_acquisition = { path = "../sn_peers_acquisition", version = "0.5.3" } sn_protocol = { path = "../sn_protocol", version = "0.17.11" } diff --git a/sn_node/Cargo.toml b/sn_node/Cargo.toml index 1781e451ee..2cd71e7a4b 100644 --- a/sn_node/Cargo.toml +++ b/sn_node/Cargo.toml @@ -84,7 +84,7 @@ reqwest = { version = "0.12.2", default-features = false, features = [ "rustls-tls-manual-roots", ] } serde_json = "1.0" -sn_client = { path = "../sn_client", version = "0.110.3" } +sn_client = { path = "../sn_client", version = "0.110.4" } sn_protocol = { path = "../sn_protocol", version = "0.17.11", features = [ "rpc", ] } diff --git a/sn_node_rpc_client/Cargo.toml b/sn_node_rpc_client/Cargo.toml index ce6ba4dc71..f1c6aaa814 100644 --- a/sn_node_rpc_client/Cargo.toml +++ b/sn_node_rpc_client/Cargo.toml @@ -23,7 +23,7 @@ color-eyre = "0.6.2" hex = "~0.4.3" libp2p = { version="0.53", features = ["kad"]} libp2p-identity = { version="0.2.7", features = ["rand"] } -sn_client = { path = "../sn_client", version = "0.110.3" } +sn_client = { path = "../sn_client", version = "0.110.4" } sn_logging = { path = "../sn_logging", version = "0.2.36" } sn_node = { path = "../sn_node", version = "0.111.4" } sn_peers_acquisition = { path = "../sn_peers_acquisition", version = "0.5.3" }