diff --git a/crates/sage-cli/src/router.rs b/crates/sage-cli/src/router.rs index 9d80497..52a91c0 100644 --- a/crates/sage-cli/src/router.rs +++ b/crates/sage-cli/src/router.rs @@ -132,11 +132,7 @@ async fn start_rpc(path: PathBuf) -> Result<()> { let mut app = Sage::new(&path); let mut receiver = app.initialize().await?; - tokio::spawn(async move { - while let Some(message) = receiver.recv().await { - println!("{message:?}"); - } - }); + tokio::spawn(async move { while let Some(_message) = receiver.recv().await {} }); let addr: SocketAddr = ([127, 0, 0, 1], app.config.rpc.server_port).into(); info!("RPC server is listening at {addr}"); diff --git a/crates/sage-wallet/src/queues/puzzle_queue.rs b/crates/sage-wallet/src/queues/puzzle_queue.rs index c33cdd2..63901e1 100644 --- a/crates/sage-wallet/src/queues/puzzle_queue.rs +++ b/crates/sage-wallet/src/queues/puzzle_queue.rs @@ -1,6 +1,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; -use chia::protocol::{Bytes32, CoinState}; +use chia::protocol::{Bytes32, Coin}; use futures_util::{stream::FuturesUnordered, StreamExt}; use sage_database::{CoinKind, Database}; use tokio::{ @@ -76,21 +76,47 @@ impl PuzzleQueue { let db = self.db.clone(); let genesis_challenge = self.genesis_challenge; let addr = peer.socket_addr(); - let coin_id = coin_state.coin.coin_id(); let peer = peer.clone(); + if db.is_p2_puzzle_hash(coin_state.coin.puzzle_hash).await? { + db.sync_coin(coin_state.coin.coin_id(), None, CoinKind::Xch) + .await?; + warn!( + "Could {} should already be synced, but isn't", + coin_state.coin.coin_id() + ); + continue; + } + futures.push(async move { - let result = fetch_puzzle(&peer, &db, genesis_challenge, coin_state).await; - (addr, coin_id, result) + let result = fetch_puzzle(&peer, genesis_challenge, coin_state.coin).await; + (addr, coin_state, result) }); } } let mut subscriptions = Vec::new(); - while let Some((addr, coin_id, result)) = futures.next().await { + while let Some((addr, coin_state, result)) = futures.next().await { + let coin_id = coin_state.coin.coin_id(); + match result { - Ok(subscribe) => { + Ok((info, minter_did)) => { + let subscribe = info.subscribe(); + + let remove = match info.p2_puzzle_hash() { + Some(p2_puzzle_hash) => !self.db.is_p2_puzzle_hash(p2_puzzle_hash).await?, + None => true, + }; + + if remove { + self.db.delete_coin_state(coin_state.coin.coin_id()).await?; + } else { + let mut tx = self.db.tx().await?; + insert_puzzle(&mut tx, coin_state, info, minter_did).await?; + tx.commit().await?; + } + if subscribe { subscriptions.push(coin_id); } @@ -136,23 +162,12 @@ impl PuzzleQueue { /// Fetches info for a coin's puzzle and inserts it into the database. async fn fetch_puzzle( peer: &WalletPeer, - db: &Database, genesis_challenge: Bytes32, - coin_state: CoinState, -) -> Result { - if db.is_p2_puzzle_hash(coin_state.coin.puzzle_hash).await? { - db.sync_coin(coin_state.coin.coin_id(), None, CoinKind::Xch) - .await?; - warn!( - "Could {} should already be synced, but isn't", - coin_state.coin.coin_id() - ); - return Ok(false); - } - + coin: Coin, +) -> Result<(ChildKind, Option), WalletError> { let parent_spend = timeout( Duration::from_secs(15), - peer.fetch_coin_spend(coin_state.coin.parent_coin_info, genesis_challenge), + peer.fetch_coin_spend(coin.parent_coin_info, genesis_challenge), ) .await??; @@ -161,7 +176,7 @@ async fn fetch_puzzle( parent_spend.coin, &parent_spend.puzzle_reveal, &parent_spend.solution, - coin_state.coin, + coin, ) }) .await??; @@ -172,20 +187,5 @@ async fn fetch_puzzle( None }; - let subscribe = info.subscribe(); - - let remove = match info.p2_puzzle_hash() { - Some(p2_puzzle_hash) => !db.is_p2_puzzle_hash(p2_puzzle_hash).await?, - None => true, - }; - - if remove { - db.delete_coin_state(coin_state.coin.coin_id()).await?; - } else { - let mut tx = db.tx().await?; - insert_puzzle(&mut tx, coin_state, info, minter_did).await?; - tx.commit().await?; - } - - Ok(subscribe) + Ok((info, minter_did)) } diff --git a/crates/sage-wallet/src/sync_manager.rs b/crates/sage-wallet/src/sync_manager.rs index 182a37c..56cde54 100644 --- a/crates/sage-wallet/src/sync_manager.rs +++ b/crates/sage-wallet/src/sync_manager.rs @@ -205,6 +205,7 @@ impl SyncManager { Duration::from_secs(3), info.peer.subscribe_coins( mem::take(&mut self.pending_coin_subscriptions), + None, self.network.genesis_challenge, ), ) diff --git a/crates/sage-wallet/src/sync_manager/wallet_sync.rs b/crates/sage-wallet/src/sync_manager/wallet_sync.rs index 6d241bb..f11aecd 100644 --- a/crates/sage-wallet/src/sync_manager/wallet_sync.rs +++ b/crates/sage-wallet/src/sync_manager/wallet_sync.rs @@ -28,6 +28,13 @@ pub async fn sync_wallet( ) -> Result<(), WalletError> { info!("Starting sync against peer {}", peer.socket_addr()); + let p2_puzzle_hashes = wallet.db.p2_puzzle_hashes().await?; + + let (start_height, start_header_hash) = wallet.db.latest_peak().await?.map_or_else( + || (None, wallet.genesis_challenge), + |(peak, header_hash)| (Some(peak), header_hash), + ); + let mut coin_ids = Vec::new(); coin_ids.extend(wallet.db.unspent_nft_coin_ids().await?); coin_ids.extend(wallet.db.unspent_did_coin_ids().await?); @@ -36,19 +43,13 @@ pub async fn sync_wallet( sync_coin_ids( &wallet, &peer, - wallet.genesis_challenge, + start_height, + start_header_hash, coin_ids, sync_sender.clone(), ) .await?; - let p2_puzzle_hashes = wallet.db.p2_puzzle_hashes().await?; - - let (start_height, start_header_hash) = wallet.db.latest_peak().await?.map_or_else( - || (None, wallet.genesis_challenge), - |(peak, header_hash)| (Some(peak), header_hash), - ); - let mut derive_more = p2_puzzle_hashes.is_empty(); for batch in p2_puzzle_hashes.chunks(500) { @@ -134,7 +135,8 @@ pub async fn sync_wallet( async fn sync_coin_ids( wallet: &Wallet, peer: &WalletPeer, - genesis_challenge: Bytes32, + start_height: Option, + start_header_hash: Bytes32, coin_ids: Vec, sync_sender: mpsc::Sender, ) -> Result<(), WalletError> { @@ -151,7 +153,7 @@ async fn sync_coin_ids( let coin_states = timeout( Duration::from_secs(10), - peer.subscribe_coins(coin_ids.to_vec(), genesis_challenge), + peer.subscribe_coins(coin_ids.to_vec(), start_height, start_header_hash), ) .await??; @@ -220,48 +222,31 @@ pub async fn incremental_sync( derive_automatically: bool, sync_sender: &mpsc::Sender, ) -> Result<(), WalletError> { - let mut batch_index = 0; - - for batch in coin_states.chunks(10000) { - batch_index += 1; - - let mut tx = wallet.db.tx().await?; + let mut tx = wallet.db.tx().await?; - let start = Instant::now(); + let start = Instant::now(); - let mut counters = UpsertCounters::default(); + let mut counters = UpsertCounters::default(); - for &coin_state in batch { - upsert_coin(&mut tx, coin_state, None, &mut counters).await?; + for &coin_state in &coin_states { + upsert_coin(&mut tx, coin_state, None, &mut counters).await?; - if coin_state.spent_height.is_some() { - let start = Instant::now(); - delete_puzzle(&mut tx, coin_state.coin.coin_id()).await?; - counters.delete_puzzle += start.elapsed(); - } + if coin_state.spent_height.is_some() { + let start = Instant::now(); + delete_puzzle(&mut tx, coin_state.coin.coin_id()).await?; + counters.delete_puzzle += start.elapsed(); } - - tx.commit().await?; - - debug!( - "Upserted {} coins in batch #{batch_index} in {:?}, with counters {:?}", - batch.len(), - start.elapsed(), - counters - ); - - sleep(Duration::from_secs(5)).await; } - sync_sender - .send(SyncEvent::CoinsUpdated { coin_states }) - .await - .ok(); + debug!( + "Upserted {} coins in {:?}, with counters {:?}", + coin_states.len(), + start.elapsed(), + counters + ); let mut derived = false; - let mut tx = wallet.db.tx().await?; - let mut next_index = tx.derivation_index(false).await?; if derive_automatically { @@ -282,6 +267,11 @@ pub async fn incremental_sync( tx.commit().await?; + sync_sender + .send(SyncEvent::CoinsUpdated { coin_states }) + .await + .ok(); + if derived { sync_sender .send(SyncEvent::DerivationIndex { next_index }) diff --git a/crates/sage-wallet/src/wallet_peer.rs b/crates/sage-wallet/src/wallet_peer.rs index cd024e1..20f097b 100644 --- a/crates/sage-wallet/src/wallet_peer.rs +++ b/crates/sage-wallet/src/wallet_peer.rs @@ -153,11 +153,12 @@ impl WalletPeer { pub async fn subscribe_coins( &self, coin_ids: Vec, - genesis_challenge: Bytes32, + previous_height: Option, + header_hash: Bytes32, ) -> Result, WalletError> { let response = self .peer - .request_coin_state(coin_ids, None, genesis_challenge, true) + .request_coin_state(coin_ids, previous_height, header_hash, true) .await? .map_err(|error| match error.reason { RejectStateReason::ExceededSubscriptionLimit => {