Skip to content

Commit

Permalink
add time out and exit
Browse files Browse the repository at this point in the history
  • Loading branch information
jackzhhuang committed Dec 4, 2024
1 parent baf1e9e commit 0d3077a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 11 deletions.
44 changes: 33 additions & 11 deletions sync/src/parallel/executor.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use starcoin_chain::{verifier::DagVerifierWithGhostData, BlockChain, ChainReader};
use starcoin_chain_api::ExecutedBlock;
use starcoin_config::TimeService;
use starcoin_crypto::HashValue;
use starcoin_dag::blockdag::BlockDAG;
use starcoin_executor::VMMetrics;
use starcoin_logger::prelude::{error, info};
use starcoin_logger::prelude::{error, info, warn};
use starcoin_storage::Store;
use starcoin_types::block::{Block, BlockHeader};
use tokio::{
Expand Down Expand Up @@ -78,27 +78,27 @@ impl DagBlockExecutor {
chain: &BlockDAG,
storage: Arc<dyn Store>,
parents_hash: Vec<HashValue>,
) -> anyhow::Result<bool> {
) -> anyhow::Result<(bool, Option<HashValue>)> {
for parent_id in parents_hash {
let header = match storage.get_block_header_by_hash(parent_id)? {
Some(header) => header,
None => return Ok(false),
None => return Ok((false, Some(parent_id))),
};

if storage.get_block_info(header.id())?.is_none() {
return Ok(false);
return Ok((false, Some(parent_id)));
}

if !chain.has_dag_block(parent_id)? {
return Ok(false);
return Ok((false, Some(parent_id)));
}
}
Ok(true)
Ok((true, None))
}

pub fn start_to_execute(mut self) -> anyhow::Result<JoinHandle<()>> {
let handle = tokio::spawn(async move {
let _ = ExecutorDeconstructor::new(self.worker_scheduler.clone());
let _worker_guard = ExecutorDeconstructor::new(self.worker_scheduler.clone());
let mut chain = None;
loop {
if self.worker_scheduler.check_if_stop().await {
Expand All @@ -123,6 +123,11 @@ impl DagBlockExecutor {
block.header().id()
);

const MAX_ATTEMPTS: u32 = 150;
const INITIAL_DELAY: Duration = Duration::from_millis(100);
let mut delay = INITIAL_DELAY;
let mut attempts: u32 = 0;

loop {
if self.worker_scheduler.check_if_stop().await {
info!("sync worker scheduler stopped");
Expand All @@ -133,11 +138,28 @@ impl DagBlockExecutor {
self.storage.clone(),
block.header().parents_hash(),
) {
Ok(true) => break,
Ok(false) => {
Ok((true, None)) => break,
Ok((false, Some(absent_id))) => {
attempts = attempts.saturating_add(1);
if attempts >= MAX_ATTEMPTS {
warn!("Timeout waiting for workers to exit, waiting for parents for block: {:?}, delay: {:?}", absent_id, delay);
return;
}
info!(
"waiting for parents for block: {:?}, waiting for: {:?}, delay: {:?}",
header.id(), absent_id, delay
);
tokio::task::yield_now().await;
tokio::time::sleep(tokio::time::Duration::from_millis(100))
.await
.await;
// the delay is no more than 2 hours
delay = std::cmp::min(
delay.saturating_mul(2),
Duration::from_secs(60 * 60 * 2),
);
}
Ok(_) => {
panic!("impossible flow, check the code in waiting_for_parents")
}
Err(e) => {
error!(
Expand Down
1 change: 1 addition & 0 deletions sync/src/parallel/worker_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl WorkerScheduler {
tokio::task::yield_now().await;
debug!("waiting for worker to exit, attempt {}", attempts);
tokio::time::sleep(delay).await;
// the delay is no more than 2 hours
delay = std::cmp::min(delay.saturating_mul(2), Duration::from_secs(60 * 60 * 2));
}

Expand Down

0 comments on commit 0d3077a

Please sign in to comment.