Skip to content

Commit

Permalink
feat: add step size check for bsc stage sync (#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
pythonberg1997 authored Oct 11, 2024
1 parent 6abcab0 commit 01ee568
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 11 deletions.
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ members = [
"crates/trie/parallel/",
"crates/trie/prefetch/",
"crates/trie/trie",
"crates/bsc/node/",
"crates/bsc/engine/",
"examples/beacon-api-sidecar-fetcher/",
"examples/beacon-api-sse/",
"examples/bsc-p2p",
Expand Down Expand Up @@ -301,6 +299,7 @@ reth-bench = { path = "bin/reth-bench" }
reth-bsc-chainspec = { path = "crates/bsc/chainspec" }
reth-bsc-cli = { path = "crates/bsc/cli" }
reth-bsc-consensus = { path = "crates/bsc/consensus" }
reth-bsc-engine = { path = "crates/bsc/engine" }
reth-blockchain-tree = { path = "crates/blockchain-tree" }
reth-blockchain-tree-api = { path = "crates/blockchain-tree-api" }
reth-chain-state = { path = "crates/chain-state" }
Expand Down Expand Up @@ -407,7 +406,6 @@ reth-static-file-types = { path = "crates/static-file/types" }
reth-storage-api = { path = "crates/storage/storage-api" }
reth-storage-errors = { path = "crates/storage/errors" }
reth-tasks = { path = "crates/tasks" }
reth-bsc-engine = { path = "crates/bsc/engine" }
reth-testing-utils = { path = "testing/testing-utils" }
reth-tokio-util = { path = "crates/tokio-util" }
reth-tracing = { path = "crates/tracing" }
Expand Down
6 changes: 6 additions & 0 deletions crates/bsc/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct ParliaEngineBuilder<Client, Provider, Engine: EngineTypes, P> {
provider: Provider,
parlia: Parlia,
snapshot_reader: SnapshotReader<P>,
merkle_clean_threshold: u64,
}

// === impl ParliaEngineBuilder ===
Expand All @@ -60,6 +61,7 @@ where
P: ParliaProvider + 'static,
{
/// Creates a new builder instance to configure all parts.
#[allow(clippy::too_many_arguments)]
pub fn new(
chain_spec: Arc<ChainSpec>,
cfg: ParliaConfig,
Expand All @@ -68,6 +70,7 @@ where
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
network_block_event_rx: Arc<Mutex<UnboundedReceiver<EngineMessage>>>,
fetch_client: Client,
merkle_clean_threshold: u64,
) -> Self {
let latest_header = provider
.latest_header()
Expand Down Expand Up @@ -97,6 +100,7 @@ where
to_engine,
network_block_event_rx,
fetch_client,
merkle_clean_threshold,
}
}

Expand All @@ -113,6 +117,7 @@ where
provider,
parlia,
snapshot_reader,
merkle_clean_threshold,
} = self;
let parlia_client = ParliaClient::new(storage.clone(), fetch_client);
if start_engine_task {
Expand All @@ -126,6 +131,7 @@ where
storage,
parlia_client.clone(),
cfg.period,
merkle_clean_threshold,
);
}
parlia_client
Expand Down
53 changes: 51 additions & 2 deletions crates/bsc/engine/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ use tokio::{
};
use tracing::{debug, error, info, trace};

// Minimum number of blocks for rebuilding the merkle tree
// When the number of blocks between the trusted header and the new header is less than this value,
// executing stage sync in batch can save time by avoiding merkle tree rebuilding.
const MIN_BLOCKS_FOR_MERKLE_REBUILD: u64 = 100_000;

/// All message variants that can be sent to beacon engine.
#[derive(Debug)]
enum ForkChoiceMessage {
Expand Down Expand Up @@ -84,6 +89,9 @@ pub(crate) struct ParliaEngineTask<
chain_tracker_tx: UnboundedSender<ForkChoiceMessage>,
/// The channel to receive chain tracker messages
chain_tracker_rx: Arc<Mutex<UnboundedReceiver<ForkChoiceMessage>>>,
/// The threshold (in number of blocks) for switching from incremental trie building of changes
/// to whole rebuild.
merkle_clean_threshold: u64,
}

// === impl ParliaEngineTask ===
Expand All @@ -106,6 +114,7 @@ impl<
storage: Storage,
block_fetcher: ParliaClient<Client>,
block_interval: u64,
merkle_clean_threshold: u64,
) {
let (fork_choice_tx, fork_choice_rx) = mpsc::unbounded_channel();
let (chain_tracker_tx, chain_tracker_rx) = mpsc::unbounded_channel();
Expand All @@ -123,6 +132,7 @@ impl<
fork_choice_rx: Arc::new(Mutex::new(fork_choice_rx)),
chain_tracker_tx,
chain_tracker_rx: Arc::new(Mutex::new(chain_tracker_rx)),
merkle_clean_threshold,
};

this.start_block_event_listening();
Expand All @@ -143,6 +153,7 @@ impl<
let fork_choice_tx = self.fork_choice_tx.clone();
let chain_tracker_tx = self.chain_tracker_tx.clone();
let fetch_header_timeout_duration = Duration::from_secs(block_interval);
let merkle_clean_threshold = self.merkle_clean_threshold;

tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -249,7 +260,9 @@ impl<
let mut trusted_header = latest_unsafe_header.clone();
// if parent hash is not equal to latest unsafe hash
// may be a fork chain detected, we need to trust the finalized header
if latest_header.parent_hash != latest_unsafe_header.hash() {
if latest_header.number - 1 == latest_unsafe_header.number &&
latest_header.parent_hash != latest_unsafe_header.hash()
{
trusted_header = finalized_header.clone();
}

Expand All @@ -260,7 +273,7 @@ impl<
// than the predicted timestamp and less than the current timestamp.
let predicted_timestamp = trusted_header.timestamp +
block_interval * (latest_header.number - 1 - trusted_header.number);
let sealed_header = latest_header.clone().seal_slow();
let mut sealed_header = latest_header.clone().seal_slow();
let is_valid_header = match consensus
.validate_header_with_predicted_timestamp(&sealed_header, predicted_timestamp)
{
Expand Down Expand Up @@ -343,6 +356,42 @@ impl<
}
};

// if the target header is not far enough from the trusted header, make sure not to
// rebuild the merkle tree
if pipeline_sync &&
(sealed_header.number - trusted_header.number > merkle_clean_threshold &&
sealed_header.number - trusted_header.number <
MIN_BLOCKS_FOR_MERKLE_REBUILD)
{
let fetch_headers_result = match timeout(
fetch_header_timeout_duration,
block_fetcher.get_headers(HeadersRequest {
start: (trusted_header.number + merkle_clean_threshold - 1).into(),
limit: 1,
direction: HeadersDirection::Falling,
}),
)
.await
{
Ok(result) => result,
Err(_) => {
trace!(target: "consensus::parlia", "Fetch header timeout");
continue
}
};
if fetch_headers_result.is_err() {
trace!(target: "consensus::parlia", "Failed to fetch header");
continue
}

let headers = fetch_headers_result.unwrap().into_data();
if headers.is_empty() {
continue
}

sealed_header = headers[0].clone().seal_slow();
};

disconnected_headers.insert(0, sealed_header.clone());
disconnected_headers.reverse();
// cache header and block
Expand Down
5 changes: 2 additions & 3 deletions crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ use reth_node_core::{
version::{CARGO_PKG_VERSION, CLIENT_CODE, NAME_CLIENT, VERGEN_GIT_SHA},
};
use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
#[cfg(feature = "bsc")]
use reth_primitives::parlia::ParliaConfig;
use reth_provider::providers::BlockchainProvider2;
use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
use reth_rpc_types::{engine::ClientVersionV1, WithOtherFields};
Expand Down Expand Up @@ -251,12 +249,13 @@ where
let engine_rx = ctx.node_adapter().components.network().get_to_engine_rx();
let client = ParliaEngineBuilder::new(
ctx.chain_spec(),
ParliaConfig::default(),
ctx.toml_config().parlia.clone(),
ctx.blockchain_db().clone(),
ctx.blockchain_db().clone(),
consensus_engine_tx.clone(),
engine_rx,
network_client.clone(),
ctx.toml_config().stages.merkle.clean_threshold,
)
.build(ctx.node_config().debug.tip.is_none());
let eth_service = EngineService::new(
Expand Down
5 changes: 2 additions & 3 deletions crates/node/builder/src/launch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ use reth_node_core::{
};
use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
use reth_primitives::format_ether;
#[cfg(feature = "bsc")]
use reth_primitives::parlia::ParliaConfig;
use reth_provider::providers::BlockchainProvider;
use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
use reth_rpc_types::{engine::ClientVersionV1, WithOtherFields};
Expand Down Expand Up @@ -294,12 +292,13 @@ where
let engine_rx = ctx.node_adapter().components.network().get_to_engine_rx();
let client = ParliaEngineBuilder::new(
ctx.chain_spec(),
ParliaConfig::default(),
ctx.toml_config().parlia.clone(),
ctx.blockchain_db().clone(),
ctx.blockchain_db().clone(),
consensus_engine_tx.clone(),
engine_rx,
network_client.clone(),
ctx.toml_config().stages.merkle.clean_threshold,
)
.build(ctx.node_config().debug.tip.is_none());
(pipeline, Either::Right(client))
Expand Down

0 comments on commit 01ee568

Please sign in to comment.