Skip to content

Commit

Permalink
Merge pull request #193 from xch-dev/manual-nft-id-entry
Browse files Browse the repository at this point in the history
Initial sync tweaks
  • Loading branch information
Rigidity authored Dec 25, 2024
2 parents 1e27571 + aac9a58 commit 74c9bfb
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 86 deletions.
6 changes: 1 addition & 5 deletions crates/sage-cli/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,7 @@ async fn start_rpc(path: PathBuf) -> Result<()> {
let mut app = Sage::new(&path);
let mut receiver = app.initialize().await?;

tokio::spawn(async move {
while let Some(message) = receiver.recv().await {
println!("{message:?}");
}
});
tokio::spawn(async move { while let Some(_message) = receiver.recv().await {} });

let addr: SocketAddr = ([127, 0, 0, 1], app.config.rpc.server_port).into();
info!("RPC server is listening at {addr}");
Expand Down
74 changes: 37 additions & 37 deletions crates/sage-wallet/src/queues/puzzle_queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use chia::protocol::{Bytes32, CoinState};
use chia::protocol::{Bytes32, Coin};
use futures_util::{stream::FuturesUnordered, StreamExt};
use sage_database::{CoinKind, Database};
use tokio::{
Expand Down Expand Up @@ -76,21 +76,47 @@ impl PuzzleQueue {
let db = self.db.clone();
let genesis_challenge = self.genesis_challenge;
let addr = peer.socket_addr();
let coin_id = coin_state.coin.coin_id();
let peer = peer.clone();

if db.is_p2_puzzle_hash(coin_state.coin.puzzle_hash).await? {
db.sync_coin(coin_state.coin.coin_id(), None, CoinKind::Xch)
.await?;
warn!(
"Could {} should already be synced, but isn't",
coin_state.coin.coin_id()
);
continue;
}

futures.push(async move {
let result = fetch_puzzle(&peer, &db, genesis_challenge, coin_state).await;
(addr, coin_id, result)
let result = fetch_puzzle(&peer, genesis_challenge, coin_state.coin).await;
(addr, coin_state, result)
});
}
}

let mut subscriptions = Vec::new();

while let Some((addr, coin_id, result)) = futures.next().await {
while let Some((addr, coin_state, result)) = futures.next().await {
let coin_id = coin_state.coin.coin_id();

match result {
Ok(subscribe) => {
Ok((info, minter_did)) => {
let subscribe = info.subscribe();

let remove = match info.p2_puzzle_hash() {
Some(p2_puzzle_hash) => !self.db.is_p2_puzzle_hash(p2_puzzle_hash).await?,
None => true,
};

if remove {
self.db.delete_coin_state(coin_state.coin.coin_id()).await?;
} else {
let mut tx = self.db.tx().await?;
insert_puzzle(&mut tx, coin_state, info, minter_did).await?;
tx.commit().await?;
}

if subscribe {
subscriptions.push(coin_id);
}
Expand Down Expand Up @@ -136,23 +162,12 @@ impl PuzzleQueue {
/// Fetches info for a coin's puzzle and inserts it into the database.
async fn fetch_puzzle(
peer: &WalletPeer,
db: &Database,
genesis_challenge: Bytes32,
coin_state: CoinState,
) -> Result<bool, WalletError> {
if db.is_p2_puzzle_hash(coin_state.coin.puzzle_hash).await? {
db.sync_coin(coin_state.coin.coin_id(), None, CoinKind::Xch)
.await?;
warn!(
"Could {} should already be synced, but isn't",
coin_state.coin.coin_id()
);
return Ok(false);
}

coin: Coin,
) -> Result<(ChildKind, Option<Bytes32>), WalletError> {
let parent_spend = timeout(
Duration::from_secs(15),
peer.fetch_coin_spend(coin_state.coin.parent_coin_info, genesis_challenge),
peer.fetch_coin_spend(coin.parent_coin_info, genesis_challenge),
)
.await??;

Expand All @@ -161,7 +176,7 @@ async fn fetch_puzzle(
parent_spend.coin,
&parent_spend.puzzle_reveal,
&parent_spend.solution,
coin_state.coin,
coin,
)
})
.await??;
Expand All @@ -172,20 +187,5 @@ async fn fetch_puzzle(
None
};

let subscribe = info.subscribe();

let remove = match info.p2_puzzle_hash() {
Some(p2_puzzle_hash) => !db.is_p2_puzzle_hash(p2_puzzle_hash).await?,
None => true,
};

if remove {
db.delete_coin_state(coin_state.coin.coin_id()).await?;
} else {
let mut tx = db.tx().await?;
insert_puzzle(&mut tx, coin_state, info, minter_did).await?;
tx.commit().await?;
}

Ok(subscribe)
Ok((info, minter_did))
}
1 change: 1 addition & 0 deletions crates/sage-wallet/src/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ impl SyncManager {
Duration::from_secs(3),
info.peer.subscribe_coins(
mem::take(&mut self.pending_coin_subscriptions),
None,
self.network.genesis_challenge,
),
)
Expand Down
74 changes: 32 additions & 42 deletions crates/sage-wallet/src/sync_manager/wallet_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ pub async fn sync_wallet(
) -> Result<(), WalletError> {
info!("Starting sync against peer {}", peer.socket_addr());

let p2_puzzle_hashes = wallet.db.p2_puzzle_hashes().await?;

let (start_height, start_header_hash) = wallet.db.latest_peak().await?.map_or_else(
|| (None, wallet.genesis_challenge),
|(peak, header_hash)| (Some(peak), header_hash),
);

let mut coin_ids = Vec::new();
coin_ids.extend(wallet.db.unspent_nft_coin_ids().await?);
coin_ids.extend(wallet.db.unspent_did_coin_ids().await?);
Expand All @@ -36,19 +43,13 @@ pub async fn sync_wallet(
sync_coin_ids(
&wallet,
&peer,
wallet.genesis_challenge,
start_height,
start_header_hash,
coin_ids,
sync_sender.clone(),
)
.await?;

let p2_puzzle_hashes = wallet.db.p2_puzzle_hashes().await?;

let (start_height, start_header_hash) = wallet.db.latest_peak().await?.map_or_else(
|| (None, wallet.genesis_challenge),
|(peak, header_hash)| (Some(peak), header_hash),
);

let mut derive_more = p2_puzzle_hashes.is_empty();

for batch in p2_puzzle_hashes.chunks(500) {
Expand Down Expand Up @@ -134,7 +135,8 @@ pub async fn sync_wallet(
async fn sync_coin_ids(
wallet: &Wallet,
peer: &WalletPeer,
genesis_challenge: Bytes32,
start_height: Option<u32>,
start_header_hash: Bytes32,
coin_ids: Vec<Bytes32>,
sync_sender: mpsc::Sender<SyncEvent>,
) -> Result<(), WalletError> {
Expand All @@ -151,7 +153,7 @@ async fn sync_coin_ids(

let coin_states = timeout(
Duration::from_secs(10),
peer.subscribe_coins(coin_ids.to_vec(), genesis_challenge),
peer.subscribe_coins(coin_ids.to_vec(), start_height, start_header_hash),
)
.await??;

Expand Down Expand Up @@ -220,48 +222,31 @@ pub async fn incremental_sync(
derive_automatically: bool,
sync_sender: &mpsc::Sender<SyncEvent>,
) -> Result<(), WalletError> {
let mut batch_index = 0;

for batch in coin_states.chunks(10000) {
batch_index += 1;

let mut tx = wallet.db.tx().await?;
let mut tx = wallet.db.tx().await?;

let start = Instant::now();
let start = Instant::now();

let mut counters = UpsertCounters::default();
let mut counters = UpsertCounters::default();

for &coin_state in batch {
upsert_coin(&mut tx, coin_state, None, &mut counters).await?;
for &coin_state in &coin_states {
upsert_coin(&mut tx, coin_state, None, &mut counters).await?;

if coin_state.spent_height.is_some() {
let start = Instant::now();
delete_puzzle(&mut tx, coin_state.coin.coin_id()).await?;
counters.delete_puzzle += start.elapsed();
}
if coin_state.spent_height.is_some() {
let start = Instant::now();
delete_puzzle(&mut tx, coin_state.coin.coin_id()).await?;
counters.delete_puzzle += start.elapsed();
}

tx.commit().await?;

debug!(
"Upserted {} coins in batch #{batch_index} in {:?}, with counters {:?}",
batch.len(),
start.elapsed(),
counters
);

sleep(Duration::from_secs(5)).await;
}

sync_sender
.send(SyncEvent::CoinsUpdated { coin_states })
.await
.ok();
debug!(
"Upserted {} coins in {:?}, with counters {:?}",
coin_states.len(),
start.elapsed(),
counters
);

let mut derived = false;

let mut tx = wallet.db.tx().await?;

let mut next_index = tx.derivation_index(false).await?;

if derive_automatically {
Expand All @@ -282,6 +267,11 @@ pub async fn incremental_sync(

tx.commit().await?;

sync_sender
.send(SyncEvent::CoinsUpdated { coin_states })
.await
.ok();

if derived {
sync_sender
.send(SyncEvent::DerivationIndex { next_index })
Expand Down
5 changes: 3 additions & 2 deletions crates/sage-wallet/src/wallet_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,12 @@ impl WalletPeer {
pub async fn subscribe_coins(
&self,
coin_ids: Vec<Bytes32>,
genesis_challenge: Bytes32,
previous_height: Option<u32>,
header_hash: Bytes32,
) -> Result<Vec<CoinState>, WalletError> {
let response = self
.peer
.request_coin_state(coin_ids, None, genesis_challenge, true)
.request_coin_state(coin_ids, previous_height, header_hash, true)
.await?
.map_err(|error| match error.reason {
RejectStateReason::ExceededSubscriptionLimit => {
Expand Down

0 comments on commit 74c9bfb

Please sign in to comment.