Skip to content

Commit

Permalink
1, use selec header to update startinfo
Browse files Browse the repository at this point in the history
2, add related testing case
  • Loading branch information
jxcom committed Oct 8, 2023
1 parent 2436a7a commit f5d4c68
Show file tree
Hide file tree
Showing 14 changed files with 617 additions and 75 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion block-relayer/src/block_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ impl BlockRelayer {
ctx: &mut ServiceContext<BlockRelayer>,
) -> Result<()> {
let network = ctx.get_shared::<NetworkServiceRef>()?;
let block_connector_service = ctx.service_ref::<BlockConnectorService>()?.clone();
let block_connector_service = ctx
.service_ref::<BlockConnectorService<TxPoolService>>()?
.clone();
let txpool = self.txpool.clone();
let metrics = self.metrics.clone();
let fut = async move {
Expand Down
2 changes: 1 addition & 1 deletion config/src/available_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn get_ephemeral_port() -> ::std::io::Result<u16> {
use std::net::{TcpListener, TcpStream};

// Request a random available port from the OS
let listener = TcpListener::bind(("localhost", 0))?;
let listener = TcpListener::bind(("127.0.0.1", 0))?;
let addr = listener.local_addr()?;

// Create and accept a connection (which we'll promptly drop) in order to force the port
Expand Down
14 changes: 10 additions & 4 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use starcoin_sync::block_connector::{BlockConnectorService, ExecuteRequest, Rese
use starcoin_sync::sync::SyncService;
use starcoin_sync::txn_sync::TxnSyncService;
use starcoin_sync::verified_rpc_client::VerifiedRpcClient;
use starcoin_txpool::TxPoolActorService;
use starcoin_txpool::{TxPoolActorService, TxPoolService};
use starcoin_types::system_events::{SystemShutdown, SystemStarted};
use starcoin_vm_runtime::metrics::VMMetrics;
use std::sync::Arc;
Expand Down Expand Up @@ -133,7 +133,9 @@ impl ServiceHandler<Self, NodeRequest> for NodeService {
.start_service_sync(GenerateBlockEventPacemaker::service_name()),
),
NodeRequest::ResetNode(block_hash) => {
let connect_service = ctx.service_ref::<BlockConnectorService>()?.clone();
let connect_service = ctx
.service_ref::<BlockConnectorService<TxPoolService>>()?
.clone();
let fut = async move {
info!("Prepare to reset node startup info to {}", block_hash);
connect_service.send(ResetRequest { block_hash }).await?
Expand All @@ -147,7 +149,9 @@ impl ServiceHandler<Self, NodeRequest> for NodeService {
.get_shared_sync::<Arc<Storage>>()
.expect("Storage must exist.");

let connect_service = ctx.service_ref::<BlockConnectorService>()?.clone();
let connect_service = ctx
.service_ref::<BlockConnectorService<TxPoolService>>()?
.clone();
let network = ctx.get_shared::<NetworkServiceRef>()?;
let fut = async move {
info!("Prepare to re execute block {}", block_hash);
Expand Down Expand Up @@ -347,7 +351,9 @@ impl NodeService {

registry.register::<ChainNotifyHandlerService>().await?;

registry.register::<BlockConnectorService>().await?;
registry
.register::<BlockConnectorService<TxPoolService>>()
.await?;
registry.register::<SyncService>().await?;

let block_relayer = registry.register::<BlockRelayer>().await?;
Expand Down
1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ stest = { workspace = true }
stream-task = { workspace = true }
sysinfo = { workspace = true }
thiserror = { workspace = true }
timeout-join-handler = { workspace = true }

[dev-dependencies]
hex = { workspace = true }
Expand Down
164 changes: 141 additions & 23 deletions sync/src/block_connector/block_connector_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

use crate::block_connector::{ExecuteRequest, ResetRequest, WriteBlockChainService};
use crate::sync::{CheckSyncEvent, SyncService};
use crate::tasks::{BlockConnectedEvent, BlockDiskCheckEvent};
use crate::tasks::{BlockConnectedEvent, BlockConnectedFinishEvent, BlockDiskCheckEvent};
use anyhow::{format_err, Result};
use network_api::PeerProvider;
use starcoin_chain_api::{ConnectBlockError, WriteableChainService};
use starcoin_chain_api::{ConnectBlockError, WriteableChainService, ChainReader};
use starcoin_config::{NodeConfig, G_CRATE_VERSION};
use starcoin_crypto::HashValue;
use starcoin_executor::VMMetrics;
use starcoin_logger::prelude::*;
use starcoin_network::NetworkServiceRef;
Expand All @@ -17,24 +18,35 @@ use starcoin_service_registry::{
use starcoin_storage::{BlockStore, Storage};
use starcoin_sync_api::PeerNewBlock;
use starcoin_txpool::TxPoolService;
use starcoin_txpool_api::TxPoolSyncService;
#[cfg(test)]
use starcoin_txpool_mock_service::MockTxPoolService;
use starcoin_types::block::ExecutedBlock;
use starcoin_types::sync_status::SyncStatus;
use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown};
use std::sync::Arc;
use sysinfo::{DiskExt, System, SystemExt};
#[cfg(test)]
use super::CheckBlockConnectorHashValue;

const DISK_CHECKPOINT_FOR_PANIC: u64 = 1024 * 1024 * 1024 * 3;
const DISK_CHECKPOINT_FOR_WARN: u64 = 1024 * 1024 * 1024 * 5;

pub struct BlockConnectorService {
chain_service: WriteBlockChainService<TxPoolService>,
pub struct BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
chain_service: WriteBlockChainService<TransactionPoolServiceT>,
sync_status: Option<SyncStatus>,
config: Arc<NodeConfig>,
}

impl BlockConnectorService {
impl<TransactionPoolServiceT> BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
pub fn new(
chain_service: WriteBlockChainService<TxPoolService>,
chain_service: WriteBlockChainService<TransactionPoolServiceT>,
config: Arc<NodeConfig>,
) -> Self {
Self {
Expand All @@ -51,6 +63,10 @@ impl BlockConnectorService {
}
}

pub fn chain_head_id(&self) -> HashValue {
self.chain_service.get_main().status().head.id()
}

pub fn check_disk_space(&mut self) -> Option<Result<u64>> {
if System::IS_SUPPORTED {
let mut sys = System::new_all();
Expand Down Expand Up @@ -97,11 +113,17 @@ impl BlockConnectorService {
}
}

impl ServiceFactory<Self> for BlockConnectorService {
fn create(ctx: &mut ServiceContext<BlockConnectorService>) -> Result<BlockConnectorService> {
impl<TransactionPoolServiceT> ServiceFactory<Self>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn create(
ctx: &mut ServiceContext<BlockConnectorService<TransactionPoolServiceT>>,
) -> Result<BlockConnectorService<TransactionPoolServiceT>> {
let config = ctx.get_shared::<Arc<NodeConfig>>()?;
let bus = ctx.bus_ref().clone();
let txpool = ctx.get_shared::<TxPoolService>()?;
let txpool = ctx.get_shared::<TransactionPoolServiceT>()?;
let storage = ctx.get_shared::<Arc<Storage>>()?;
let startup_info = storage
.get_startup_info()?
Expand All @@ -120,7 +142,10 @@ impl ServiceFactory<Self> for BlockConnectorService {
}
}

impl ActorService for BlockConnectorService {
impl<TransactionPoolServiceT> ActorService for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn started(&mut self, ctx: &mut ServiceContext<Self>) -> Result<()> {
//TODO figure out a more suitable value.
ctx.set_mailbox_capacity(1024);
Expand All @@ -141,11 +166,15 @@ impl ActorService for BlockConnectorService {
}
}

impl EventHandler<Self, BlockDiskCheckEvent> for BlockConnectorService {
impl<TransactionPoolServiceT> EventHandler<Self, BlockDiskCheckEvent>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle_event(
&mut self,
_: BlockDiskCheckEvent,
ctx: &mut ServiceContext<BlockConnectorService>,
ctx: &mut ServiceContext<BlockConnectorService<TransactionPoolServiceT>>,
) {
if let Some(res) = self.check_disk_space() {
match res {
Expand All @@ -161,23 +190,74 @@ impl EventHandler<Self, BlockDiskCheckEvent> for BlockConnectorService {
}
}

impl EventHandler<Self, BlockConnectedEvent> for BlockConnectorService {
impl EventHandler<Self, BlockConnectedEvent>
for BlockConnectorService<TxPoolService>
{
fn handle_event(
&mut self,
msg: BlockConnectedEvent,
_ctx: &mut ServiceContext<BlockConnectorService>,
ctx: &mut ServiceContext<BlockConnectorService<TxPoolService>>,
) {
//because this block has execute at sync task, so just try connect to select head chain.
//TODO refactor connect and execute

let block = msg.block;
if let Err(e) = self.chain_service.try_connect(block) {
error!("Process connected block error: {:?}", e);
let feedback = msg.feedback;

match msg.action {
crate::tasks::BlockConnectAction::ConnectNewBlock => {
if let Err(e) = self.chain_service.try_connect(block) {
error!("Process connected new block from sync error: {:?}", e);
}
}
crate::tasks::BlockConnectAction::ConnectExecutedBlock => {
if let Err(e) = self.chain_service.switch_new_main(block.header().id(), ctx) {
error!("Process connected executed block from sync error: {:?}", e);
}
}
}

feedback.map(|f| f.unbounded_send(BlockConnectedFinishEvent));
}
}

impl EventHandler<Self, MinedBlock> for BlockConnectorService {
#[cfg(test)]
impl EventHandler<Self, BlockConnectedEvent>
for BlockConnectorService<MockTxPoolService>
{
fn handle_event(
&mut self,
msg: BlockConnectedEvent,
ctx: &mut ServiceContext<BlockConnectorService<MockTxPoolService>>,
) {
//because this block has execute at sync task, so just try connect to select head chain.
//TODO refactor connect and execute

let block = msg.block;
let feedback = msg.feedback;

match msg.action {
crate::tasks::BlockConnectAction::ConnectNewBlock => {
if let Err(e) = self.chain_service.apply_failed(block) {
error!("Process connected new block from sync error: {:?}", e);
}
}
crate::tasks::BlockConnectAction::ConnectExecutedBlock => {
if let Err(e) = self.chain_service.switch_new_main(block.header().id(), ctx) {
error!("Process connected executed block from sync error: {:?}", e);
}
}
}

feedback.map(|f| f.unbounded_send(BlockConnectedFinishEvent));
}
}

impl<TransactionPoolServiceT> EventHandler<Self, MinedBlock>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle_event(&mut self, msg: MinedBlock, _ctx: &mut ServiceContext<Self>) {
let MinedBlock(new_block) = msg;
let id = new_block.header().id();
Expand All @@ -192,13 +272,21 @@ impl EventHandler<Self, MinedBlock> for BlockConnectorService {
}
}

impl EventHandler<Self, SyncStatusChangeEvent> for BlockConnectorService {
impl<TransactionPoolServiceT> EventHandler<Self, SyncStatusChangeEvent>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle_event(&mut self, msg: SyncStatusChangeEvent, _ctx: &mut ServiceContext<Self>) {
self.sync_status = Some(msg.0);
}
}

impl EventHandler<Self, PeerNewBlock> for BlockConnectorService {
impl<TransactionPoolServiceT> EventHandler<Self, PeerNewBlock>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle_event(&mut self, msg: PeerNewBlock, ctx: &mut ServiceContext<Self>) {
if !self.is_synced() {
debug!("[connector] Ignore PeerNewBlock event because the node has not been synchronized yet.");
Expand Down Expand Up @@ -257,22 +345,52 @@ impl EventHandler<Self, PeerNewBlock> for BlockConnectorService {
}
}

impl ServiceHandler<Self, ResetRequest> for BlockConnectorService {
impl<TransactionPoolServiceT> ServiceHandler<Self, ResetRequest>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle(
&mut self,
msg: ResetRequest,
_ctx: &mut ServiceContext<BlockConnectorService>,
_ctx: &mut ServiceContext<BlockConnectorService<TransactionPoolServiceT>>,
) -> Result<()> {
self.chain_service.reset(msg.block_hash)
}
}

impl ServiceHandler<Self, ExecuteRequest> for BlockConnectorService {
impl<TransactionPoolServiceT> ServiceHandler<Self, ExecuteRequest>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle(
&mut self,
msg: ExecuteRequest,
_ctx: &mut ServiceContext<BlockConnectorService>,
_ctx: &mut ServiceContext<BlockConnectorService<TransactionPoolServiceT>>,
) -> Result<ExecutedBlock> {
self.chain_service.execute(msg.block)
}
}

#[cfg(test)]
impl<TransactionPoolServiceT> ServiceHandler<Self, CheckBlockConnectorHashValue>
for BlockConnectorService<TransactionPoolServiceT>
where
TransactionPoolServiceT: TxPoolSyncService + 'static,
{
fn handle(
&mut self,
msg: CheckBlockConnectorHashValue,
_ctx: &mut ServiceContext<BlockConnectorService<TransactionPoolServiceT>>,
) -> Result<()> {
if self.chain_service.get_main().status().head().id() == msg.head_hash {
info!("the branch in chain service is the same as target's branch");
return Ok(());
}
info!("mock branch in chain service is not the same as target's branch");
bail!("blockchain in chain service is not the same as target!");
}
}


12 changes: 12 additions & 0 deletions sync/src/block_connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,15 @@ pub struct ExecuteRequest {
impl ServiceRequest for ExecuteRequest {
type Response = anyhow::Result<ExecutedBlock>;
}

#[cfg(test)]
#[derive(Debug, Clone)]
pub struct CheckBlockConnectorHashValue {
pub head_hash: HashValue,
}

#[cfg(test)]
impl ServiceRequest for CheckBlockConnectorHashValue {
type Response = anyhow::Result<()>;
}

Loading

0 comments on commit f5d4c68

Please sign in to comment.