Skip to content

Commit

Permalink
Add index sync progress bar
Browse files Browse the repository at this point in the history
Automatically switch to blockfile source if there are many blocks left
to sync
  • Loading branch information
Duddino committed Jan 24, 2025
1 parent d206663 commit 4de44e4
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 3 deletions.
14 changes: 14 additions & 0 deletions src-tauri/src/address_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ pub mod types;
use block_source::{BlockSource, BlockSourceType};
use database::Database;
use futures::StreamExt;
use std::sync::Arc;
use tokio::sync::RwLock;
use types::{Block, Vin};

#[derive(Clone)]
pub struct AddressIndex<D: Database> {
database: D,
block_source: BlockSourceType,
pub indexed_blocks: Arc<RwLock<u64>>,
}

impl<D> AddressIndex<D>
Expand All @@ -27,6 +30,7 @@ where
BlockSourceType::Regular(block_source) => {
let mut stream = block_source.get_blocks()?.chunks(500_000);
while let Some(blocks) = stream.next().await {
*self.indexed_blocks.write().await += blocks.len() as u64;
Self::store_blocks(&mut self.database, blocks.into_iter()).await?;
}
}
Expand All @@ -35,6 +39,7 @@ where
let mut stream = block_source.get_blocks_indexed(start)?.chunks(10);
while let Some(blocks) = stream.next().await {
let block_count = blocks.last().map(|(_, i)| *i);
*self.indexed_blocks.write().await += blocks.len() as u64;
Self::store_blocks(
&mut self.database,
blocks.into_iter().map(|(block, _)| block),
Expand Down Expand Up @@ -66,6 +71,7 @@ where
Self {
database,
block_source: block_source.instantiate(),
indexed_blocks: Arc::new(RwLock::new(0)),
}
}
pub async fn get_address_txids(&self, address: &str) -> crate::error::Result<Vec<String>> {
Expand All @@ -82,6 +88,14 @@ where
{
self.block_source = block_source.instantiate();
}

pub async fn get_last_indexed_block(&self) -> crate::error::Result<u64> {
self.database.get_last_indexed_block().await
}

pub async fn update_block_count(&mut self, block_count: u64) -> crate::error::Result<()> {
self.database.update_block_count(block_count).await
}
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src-tauri/src/address_index/pivx_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl BlockStream {
let hash: String = client
.request::<_, (), _>("getblockhash", rpc_params![current_block])
.await
.unwrap();
.ok()?;
let block: Result<Block, _> = client
.request::<_, (), _>("getblock", rpc_params![hash, 2])
.await;
Expand Down
43 changes: 42 additions & 1 deletion src-tauri/src/explorer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ where
{
address_index: Arc<RwLock<AddressIndex<D>>>,
pivx_rpc: PIVXRpc,
indexed_blocks: Arc<RwLock<u64>>,
done_indexing: Arc<RwLock<bool>>,
}

#[derive(Deserialize)]
Expand All @@ -37,15 +39,20 @@ where
D: Database + Send + Clone,
{
fn new(address_index: AddressIndex<D>, rpc: PIVXRpc) -> Self {
let indexed_blocks = address_index.indexed_blocks.clone();
Self {
address_index: Arc::new(RwLock::new(address_index)),
pivx_rpc: rpc,
indexed_blocks,
done_indexing: Arc::new(RwLock::new(false)),
}
}
}

static EXPLORER: OnceCell<DefaultExplorer> = OnceCell::const_new();
static PIVX_RPC: OnceCell<PIVXRpc> = OnceCell::const_new();
// If more than `LAST_BLOCK_GAP` are left to sync, prefer BlockFileSource
const LAST_BLOCK_GAP: u64 = 10_000;

async fn get_pivx_rpc() -> &'static PIVXRpc {
PIVX_RPC
Expand Down Expand Up @@ -78,6 +85,11 @@ async fn get_explorer() -> &'static DefaultExplorer {
// Cloning is very cheap, it's just a Pathbuf and some Arcs
let explorer_clone = explorer.clone();
tokio::spawn(async move {
while match explorer_clone.is_initial_sync().await {
Ok(is_initial_sync) => is_initial_sync,
Err(_) => true,
} {}

if let Err(err) = explorer_clone.sync().await {
eprintln!("Warning: Syncing failed with error {}", err);
}
Expand Down Expand Up @@ -188,7 +200,28 @@ where
}

pub async fn sync(&self) -> crate::error::Result<()> {
self.address_index.write().await.sync().await
let current_block = self.get_block_count().await?;
let last_indexed_block = self
.address_index
.read()
.await
.get_last_indexed_block()
.await?;

if current_block - last_indexed_block >= LAST_BLOCK_GAP {
self.switch_to_blockfile_source().await?;
}

self.address_index.write().await.sync().await?;
self.address_index
.write()
.await
// Leave 100 blocks as buffer
.update_block_count(self.get_block_count().await? - 100)
.await?;
self.switch_to_rpc_source().await?;
*self.done_indexing.write().await = true;
Ok(())
}

pub async fn switch_to_rpc_source(&self) -> crate::error::Result<()> {
Expand Down Expand Up @@ -225,4 +258,12 @@ where
.await?;
Ok(chain_info.verificationprogress)
}

pub async fn get_index_progress(&self) -> crate::error::Result<f64> {
Ok((*self.indexed_blocks.read().await as f64) / (self.get_block_count().await? as f64))
}

pub async fn index_is_done(&self) -> crate::error::Result<bool> {
Ok(*self.done_indexing.read().await)
}
}
2 changes: 2 additions & 0 deletions src-tauri/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ fn main() {
explorer_switch_to_blockfile_source,
explorer_is_initial_sync,
explorer_get_sync_progress,
explorer_get_index_progress,
explorer_index_is_done,
])
.run(tauri::generate_context!())
.expect("error while running tauri application");
Expand Down

0 comments on commit 4de44e4

Please sign in to comment.