From 0d3077ab07b9c500a149ed4c6d7fb55641be4e6a Mon Sep 17 00:00:00 2001 From: jackzhhuang Date: Wed, 4 Dec 2024 11:40:30 +0800 Subject: [PATCH] add time out and exit --- sync/src/parallel/executor.rs | 44 ++++++++++++++++++++------- sync/src/parallel/worker_scheduler.rs | 1 + 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/sync/src/parallel/executor.rs b/sync/src/parallel/executor.rs index ca98549248..c945c10e26 100644 --- a/sync/src/parallel/executor.rs +++ b/sync/src/parallel/executor.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use starcoin_chain::{verifier::DagVerifierWithGhostData, BlockChain, ChainReader}; use starcoin_chain_api::ExecutedBlock; @@ -6,7 +6,7 @@ use starcoin_config::TimeService; use starcoin_crypto::HashValue; use starcoin_dag::blockdag::BlockDAG; use starcoin_executor::VMMetrics; -use starcoin_logger::prelude::{error, info}; +use starcoin_logger::prelude::{error, info, warn}; use starcoin_storage::Store; use starcoin_types::block::{Block, BlockHeader}; use tokio::{ @@ -78,27 +78,27 @@ impl DagBlockExecutor { chain: &BlockDAG, storage: Arc, parents_hash: Vec, - ) -> anyhow::Result { + ) -> anyhow::Result<(bool, Option)> { for parent_id in parents_hash { let header = match storage.get_block_header_by_hash(parent_id)? { Some(header) => header, - None => return Ok(false), + None => return Ok((false, Some(parent_id))), }; if storage.get_block_info(header.id())?.is_none() { - return Ok(false); + return Ok((false, Some(parent_id))); } if !chain.has_dag_block(parent_id)? { - return Ok(false); + return Ok((false, Some(parent_id))); } } - Ok(true) + Ok((true, None)) } pub fn start_to_execute(mut self) -> anyhow::Result> { let handle = tokio::spawn(async move { - let _ = ExecutorDeconstructor::new(self.worker_scheduler.clone()); + let _worker_guard = ExecutorDeconstructor::new(self.worker_scheduler.clone()); let mut chain = None; loop { if self.worker_scheduler.check_if_stop().await { @@ -123,6 +123,11 @@ impl DagBlockExecutor { block.header().id() ); + const MAX_ATTEMPTS: u32 = 150; + const INITIAL_DELAY: Duration = Duration::from_millis(100); + let mut delay = INITIAL_DELAY; + let mut attempts: u32 = 0; + loop { if self.worker_scheduler.check_if_stop().await { info!("sync worker scheduler stopped"); @@ -133,11 +138,28 @@ impl DagBlockExecutor { self.storage.clone(), block.header().parents_hash(), ) { - Ok(true) => break, - Ok(false) => { + Ok((true, None)) => break, + Ok((false, Some(absent_id))) => { + attempts = attempts.saturating_add(1); + if attempts >= MAX_ATTEMPTS { + warn!("Timeout waiting for workers to exit, waiting for parents for block: {:?}, delay: {:?}", absent_id, delay); + return; + } + info!( + "waiting for parents for block: {:?}, waiting for: {:?}, delay: {:?}", + header.id(), absent_id, delay + ); tokio::task::yield_now().await; tokio::time::sleep(tokio::time::Duration::from_millis(100)) - .await + .await; + // the delay is no more than 2 hours + delay = std::cmp::min( + delay.saturating_mul(2), + Duration::from_secs(60 * 60 * 2), + ); + } + Ok(_) => { + panic!("impossible flow, check the code in waiting_for_parents") } Err(e) => { error!( diff --git a/sync/src/parallel/worker_scheduler.rs b/sync/src/parallel/worker_scheduler.rs index 92db50b1a3..bd212834d0 100644 --- a/sync/src/parallel/worker_scheduler.rs +++ b/sync/src/parallel/worker_scheduler.rs @@ -78,6 +78,7 @@ impl WorkerScheduler { tokio::task::yield_now().await; debug!("waiting for worker to exit, attempt {}", attempts); tokio::time::sleep(delay).await; + // the delay is no more than 2 hours delay = std::cmp::min(delay.saturating_mul(2), Duration::from_secs(60 * 60 * 2)); }