Skip to content

Commit

Permalink
Merge pull request #31 from Duddino/index_sync
Browse files Browse the repository at this point in the history
Add index sync progress bar
  • Loading branch information
Duddino authored Jan 24, 2025
2 parents d206663 + 4de44e4 commit 6d441df
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 3 deletions.
14 changes: 14 additions & 0 deletions src-tauri/src/address_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ pub mod types;
use block_source::{BlockSource, BlockSourceType};
use database::Database;
use futures::StreamExt;
use std::sync::Arc;
use tokio::sync::RwLock;
use types::{Block, Vin};

#[derive(Clone)]
pub struct AddressIndex<D: Database> {
database: D,
block_source: BlockSourceType,
pub indexed_blocks: Arc<RwLock<u64>>,
}

impl<D> AddressIndex<D>
Expand All @@ -27,6 +30,7 @@ where
BlockSourceType::Regular(block_source) => {
let mut stream = block_source.get_blocks()?.chunks(500_000);
while let Some(blocks) = stream.next().await {
*self.indexed_blocks.write().await += blocks.len() as u64;
Self::store_blocks(&mut self.database, blocks.into_iter()).await?;
}
}
Expand All @@ -35,6 +39,7 @@ where
let mut stream = block_source.get_blocks_indexed(start)?.chunks(10);
while let Some(blocks) = stream.next().await {
let block_count = blocks.last().map(|(_, i)| *i);
*self.indexed_blocks.write().await += blocks.len() as u64;
Self::store_blocks(
&mut self.database,
blocks.into_iter().map(|(block, _)| block),
Expand Down Expand Up @@ -66,6 +71,7 @@ where
Self {
database,
block_source: block_source.instantiate(),
indexed_blocks: Arc::new(RwLock::new(0)),
}
}
pub async fn get_address_txids(&self, address: &str) -> crate::error::Result<Vec<String>> {
Expand All @@ -82,6 +88,14 @@ where
{
self.block_source = block_source.instantiate();
}

pub async fn get_last_indexed_block(&self) -> crate::error::Result<u64> {
self.database.get_last_indexed_block().await
}

pub async fn update_block_count(&mut self, block_count: u64) -> crate::error::Result<()> {
self.database.update_block_count(block_count).await
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src-tauri/src/address_index/pivx_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl BlockStream {
let hash: String = client
.request::<_, (), _>("getblockhash", rpc_params![current_block])
.await
.unwrap();
.ok()?;
let block: Result<Block, _> = client
.request::<_, (), _>("getblock", rpc_params![hash, 2])
.await;
Expand Down
43 changes: 42 additions & 1 deletion src-tauri/src/explorer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ where
{
address_index: Arc<RwLock<AddressIndex<D>>>,
pivx_rpc: PIVXRpc,
indexed_blocks: Arc<RwLock<u64>>,
done_indexing: Arc<RwLock<bool>>,
}

#[derive(Deserialize)]
Expand All @@ -37,15 +39,20 @@ where
D: Database + Send + Clone,
{
fn new(address_index: AddressIndex<D>, rpc: PIVXRpc) -> Self {
let indexed_blocks = address_index.indexed_blocks.clone();
Self {
address_index: Arc::new(RwLock::new(address_index)),
pivx_rpc: rpc,
indexed_blocks,
done_indexing: Arc::new(RwLock::new(false)),
}
}
}

static EXPLORER: OnceCell<DefaultExplorer> = OnceCell::const_new();
static PIVX_RPC: OnceCell<PIVXRpc> = OnceCell::const_new();
// If more than `LAST_BLOCK_GAP` are left to sync, prefer BlockFileSource
const LAST_BLOCK_GAP: u64 = 10_000;

async fn get_pivx_rpc() -> &'static PIVXRpc {
PIVX_RPC
Expand Down Expand Up @@ -78,6 +85,11 @@ async fn get_explorer() -> &'static DefaultExplorer {
// Cloning is very cheap, it's just a Pathbuf and some Arcs
let explorer_clone = explorer.clone();
tokio::spawn(async move {
while match explorer_clone.is_initial_sync().await {
Ok(is_initial_sync) => is_initial_sync,
Err(_) => true,
} {}

if let Err(err) = explorer_clone.sync().await {
eprintln!("Warning: Syncing failed with error {}", err);
}
Expand Down Expand Up @@ -188,7 +200,28 @@ where
}

pub async fn sync(&self) -> crate::error::Result<()> {
self.address_index.write().await.sync().await
let current_block = self.get_block_count().await?;
let last_indexed_block = self
.address_index
.read()
.await
.get_last_indexed_block()
.await?;

if current_block - last_indexed_block >= LAST_BLOCK_GAP {
self.switch_to_blockfile_source().await?;
}

self.address_index.write().await.sync().await?;
self.address_index
.write()
.await
// Leave 100 blocks as buffer
.update_block_count(self.get_block_count().await? - 100)
.await?;
self.switch_to_rpc_source().await?;
*self.done_indexing.write().await = true;
Ok(())
}

pub async fn switch_to_rpc_source(&self) -> crate::error::Result<()> {
Expand Down Expand Up @@ -225,4 +258,12 @@ where
.await?;
Ok(chain_info.verificationprogress)
}

pub async fn get_index_progress(&self) -> crate::error::Result<f64> {
Ok((*self.indexed_blocks.read().await as f64) / (self.get_block_count().await? as f64))
}

pub async fn index_is_done(&self) -> crate::error::Result<bool> {
Ok(*self.done_indexing.read().await)
}
}
2 changes: 2 additions & 0 deletions src-tauri/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ fn main() {
explorer_switch_to_blockfile_source,
explorer_is_initial_sync,
explorer_get_sync_progress,
explorer_get_index_progress,
explorer_index_is_done,
])
.run(tauri::generate_context!())
.expect("error while running tauri application");
Expand Down

0 comments on commit 6d441df

Please sign in to comment.