Skip to content

Commit

Permalink
refactor(consensus)!: split block header and body
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Nov 5, 2024
1 parent 4df7ec7 commit b1dd6a7
Show file tree
Hide file tree
Showing 64 changed files with 1,364 additions and 735 deletions.
56 changes: 17 additions & 39 deletions applications/tari_indexer/src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,16 @@ use anyhow::anyhow;
use futures::StreamExt;
use log::*;
use tari_bor::decode;
use tari_common::configuration::Network;
use tari_consensus::consensus_constants::ConsensusConstants;
use tari_crypto::{ristretto::RistrettoPublicKey, tari_utilities::message_format::MessageFormat};
use tari_dan_common_types::{committee::Committee, Epoch, NumPreshards, PeerAddress, ShardGroup};
use tari_crypto::tari_utilities::message_format::MessageFormat;
use tari_dan_common_types::{committee::Committee, Epoch, PeerAddress, ShardGroup};
use tari_dan_p2p::proto::rpc::{GetTransactionResultRequest, PayloadResultStatus, SyncBlocksRequest};
use tari_dan_storage::consensus_models::{Block, BlockError, BlockId, Decision, TransactionRecord};
use tari_dan_storage::consensus_models::{Block, BlockId, Decision, TransactionRecord};
use tari_engine_types::{
commit_result::{ExecuteResult, TransactionResult},
events::Event,
substate::{Substate, SubstateId, SubstateValue},
};
use tari_epoch_manager::EpochManagerReader;
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};
use tari_template_lib::models::{EntityId, TemplateAddress};
use tari_transaction::{Transaction, TransactionId};
use tari_validator_node_rpc::client::{TariValidatorNodeRpcClientFactory, ValidatorNodeClientFactory};
Expand Down Expand Up @@ -96,33 +94,24 @@ struct TransactionMetadata {
}

pub struct EventScanner {
network: Network,
sidechain_id: Option<RistrettoPublicKey>,
epoch_manager: Box<dyn EpochManagerReader<Addr = PeerAddress>>,
epoch_manager: EpochManagerHandle<PeerAddress>,
client_factory: TariValidatorNodeRpcClientFactory,
substate_store: SqliteSubstateStore,
event_filters: Vec<EventFilter>,
consensus_constants: ConsensusConstants,
}

impl EventScanner {
pub fn new(
network: Network,
sidechain_id: Option<RistrettoPublicKey>,
epoch_manager: Box<dyn EpochManagerReader<Addr = PeerAddress>>,
epoch_manager: EpochManagerHandle<PeerAddress>,
client_factory: TariValidatorNodeRpcClientFactory,
substate_store: SqliteSubstateStore,
event_filters: Vec<EventFilter>,
consensus_constants: ConsensusConstants,
) -> Self {
Self {
network,
sidechain_id,
epoch_manager,
client_factory,
substate_store,
event_filters,
consensus_constants,
}
}

Expand Down Expand Up @@ -151,7 +140,7 @@ impl EventScanner {
},
None => {
// by default we start scanning since the current epoch
// TODO: it would be nice a new parameter in the indexer to spcify a custom starting epoch
// TODO: it would be nice a new parameter in the indexer to specify a custom starting epoch
event_count += self.scan_events_of_epoch(newest_epoch).await?;
},
}
Expand Down Expand Up @@ -454,12 +443,6 @@ impl EventScanner {
.collect()
}

fn build_genesis_block_id(&self, num_preshards: NumPreshards) -> Result<BlockId, BlockError> {
// TODO: this should return the actual genesis for the shard group and epoch
let start_block = Block::zero_block(self.network, num_preshards, self.sidechain_id.clone())?;
Ok(*start_block.id())
}

async fn get_oldest_scanned_epoch(&self) -> Result<Option<Epoch>, anyhow::Error> {
self.substate_store
.with_read_tx(|tx| tx.get_oldest_scanned_epoch())
Expand All @@ -473,23 +456,18 @@ impl EventScanner {
committee: &mut Committee<PeerAddress>,
epoch: Epoch,
) -> Result<Vec<Block>, anyhow::Error> {
// We start scanning from the last scanned block for this commitee
// We start scanning from the last scanned block for this committee
let start_block_id = self
.substate_store
.with_read_tx(|tx| tx.get_last_scanned_block_id(epoch, shard_group))?;

let start_block_id = match start_block_id {
Some(block_id) => block_id,
None => self.build_genesis_block_id(self.consensus_constants.num_preshards)?,
};

committee.shuffle();
let mut last_block_id = start_block_id;

info!(
target: LOG_TARGET,
"Scanning new blocks since {} from (epoch={}, shard={})",
last_block_id,
"Scanning new blocks from (start_id={}, epoch={}, shard={})",
last_block_id.map(|id| id.to_string()).unwrap_or_else(|| "None".to_string()),
epoch,
shard_group
);
Expand All @@ -502,7 +480,7 @@ impl EventScanner {
epoch,
shard_group
);
let resp = self.get_blocks_from_vn(member, start_block_id, Some(epoch)).await;
let resp = self.get_blocks_from_vn(member, last_block_id, epoch).await;

match resp {
Ok(blocks) => {
Expand All @@ -520,9 +498,9 @@ impl EventScanner {
let last_block = blocks.iter().max_by_key(|b| (b.epoch(), b.height()));

if let Some(block) = last_block {
last_block_id = *block.id();
last_block_id = Some(*block.id());
// Store the latest scanned block id in the database for future scans
self.save_scanned_block_id(epoch, shard_group, last_block_id)?;
self.save_scanned_block_id(epoch, shard_group, *block.id())?;
}
return Ok(blocks);
},
Expand Down Expand Up @@ -578,8 +556,8 @@ impl EventScanner {
async fn get_blocks_from_vn(
&self,
vn_addr: &PeerAddress,
start_block_id: BlockId,
up_to_epoch: Option<Epoch>,
start_block_id: Option<BlockId>,
up_to_epoch: Epoch,
) -> Result<Vec<Block>, anyhow::Error> {
let mut blocks = vec![];

Expand All @@ -588,8 +566,8 @@ impl EventScanner {

let mut stream = client
.sync_blocks(SyncBlocksRequest {
start_block_id: start_block_id.as_bytes().to_vec(),
up_to_epoch: up_to_epoch.map(|epoch| epoch.into()),
start_block_id: start_block_id.map(|id| id.as_bytes().to_vec()).unwrap_or_default(),
epoch: Some(up_to_epoch.into()),
})
.await?;
while let Some(resp) = stream.next().await {
Expand Down
10 changes: 3 additions & 7 deletions applications/tari_indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,12 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow
.map(TryInto::try_into)
.collect::<Result<_, _>>()
.map_err(|e| ExitError::new(ExitCode::ConfigError, format!("Invalid event filters: {}", e)))?;
let consensus_constants = ConsensusConstants::from(config.network);
let event_scanner = Arc::new(EventScanner::new(
config.network,
config.indexer.sidechain_id,
Box::new(services.epoch_manager.clone()),
let event_scanner = EventScanner::new(
services.epoch_manager.clone(),
services.validator_node_client_factory.clone(),
services.substate_store.clone(),
event_filters,
consensus_constants,
));
);

// Run the GraphQL API
let graphql_address = config.indexer.graphql_address;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,7 @@ impl SubstateStoreWriteTransaction for SqliteSubstateStoreWriteTransaction<'_> {
use crate::substate_storage_sqlite::schema::scanned_block_ids;

diesel::delete(scanned_block_ids::table)
.filter(scanned_block_ids::epoch.lt(epoch.0 as i64))
.filter(scanned_block_ids::epoch.lt(epoch.as_u64() as i64))
.execute(&mut *self.connection())
.map_err(|e| StorageError::QueryError {
reason: format!("delete_scanned_epochs_older_than: {}", e),
Expand Down
4 changes: 3 additions & 1 deletion applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use tari_common::{
configuration::Network,
exit_codes::{ExitCode, ExitError},
};
use tari_common_types::types::FixedHash;
use tari_consensus::consensus_constants::ConsensusConstants;
#[cfg(not(feature = "metrics"))]
use tari_consensus::traits::hooks::NoopHooks;
Expand Down Expand Up @@ -603,8 +604,9 @@ where
network,
Epoch(0),
ShardGroup::all_shards(num_preshards),
FixedHash::default(),
sidechain_id.clone(),
)?;
);
let substate_id = substate_id.into();
let id = VersionedSubstateId::new(substate_id, 0);
SubstateRecord {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,29 +29,29 @@ type BlockBuffer = Vec<BlockData>;

pub struct BlockSyncTask<TStateStore: StateStore> {
store: TStateStore,
start_block: Block,
start_block_id: BlockId,
up_to_epoch: Option<Epoch>,
sender: mpsc::Sender<Result<SyncBlocksResponse, RpcStatus>>,
}

impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
pub fn new(
store: TStateStore,
start_block: Block,
start_block_id: BlockId,
up_to_epoch: Option<Epoch>,
sender: mpsc::Sender<Result<SyncBlocksResponse, RpcStatus>>,
) -> Self {
Self {
store,
start_block,
start_block_id,
up_to_epoch,
sender,
}
}

pub async fn run(mut self) -> Result<(), ()> {
let mut buffer = Vec::with_capacity(BLOCK_BUFFER_SIZE);
let mut current_block_id = *self.start_block.id();
let mut current_block_id = self.start_block_id;
let mut counter = 0;
loop {
match self.fetch_next_batch(&mut buffer, &current_block_id) {
Expand Down
89 changes: 46 additions & 43 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::convert::{TryFrom, TryInto};

use log::*;
use tari_bor::{decode_exact, encode};
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, PeerAddress, SubstateAddress};
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, NodeHeight, PeerAddress, SubstateAddress};
use tari_dan_p2p::{
proto,
proto::rpc::{
Expand All @@ -45,16 +45,7 @@ use tari_dan_p2p::{
},
};
use tari_dan_storage::{
consensus_models::{
Block,
BlockId,
EpochCheckpoint,
HighQc,
LockedBlock,
StateTransitionId,
SubstateRecord,
TransactionRecord,
},
consensus_models::{Block, BlockId, EpochCheckpoint, HighQc, StateTransitionId, SubstateRecord, TransactionRecord},
StateStore,
};
use tari_engine_types::virtual_substate::VirtualSubstateId;
Expand Down Expand Up @@ -279,41 +270,53 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

let (sender, receiver) = mpsc::channel(10);

let start_block_id = BlockId::try_from(req.start_block_id)
let start_block_id = Some(req.start_block_id)
.filter(|i| !i.is_empty())
.map(BlockId::try_from)
.transpose()
.map_err(|e| RpcStatus::bad_request(format!("Invalid encoded block id: {}", e)))?;
// Check if we have the blocks
let start_block = store
.with_read_tx(|tx| Block::get(tx, &start_block_id).optional())
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found(format!("start_block_id {start_block_id} not found")))?;

// Check that the start block is not after the locked block
let locked_block = store
.with_read_tx(|tx| LockedBlock::get(tx, current_epoch).optional())
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?
.ok_or_else(|| RpcStatus::not_found("No locked block"))?;
let epoch_is_after = start_block.epoch() > locked_block.epoch();
let height_is_after =
(start_block.epoch() == locked_block.epoch) && (start_block.height() > locked_block.height());

if epoch_is_after || height_is_after {
return Err(RpcStatus::not_found(format!(
"start_block_id {} is after locked block {}",
start_block_id, locked_block
)));
}
let start_block_id = {
let tx = store
.create_read_tx()
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

task::spawn(
BlockSyncTask::new(
self.shard_state_store.clone(),
start_block,
req.up_to_epoch.map(|epoch| epoch.into()),
sender,
)
.run(),
);
match start_block_id {
Some(id) => {
if !Block::record_exists(&tx, &id).map_err(RpcStatus::log_internal_error(LOG_TARGET))? {
return Err(RpcStatus::not_found(format!("start_block_id {id} not found",)));
}
id
},
None => {
let epoch = req
.epoch
.map(Epoch::from)
.map(|end| end.min(current_epoch))
.unwrap_or(current_epoch);

let mut block_ids = Block::get_ids_by_epoch_and_height(&tx, epoch, NodeHeight::zero())
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;

let Some(block_id) = block_ids.pop() else {
return Err(RpcStatus::not_found(
"Block not found with epoch={epoch},height={height}",
));
};
if !block_ids.is_empty() {
return Err(RpcStatus::conflict(format!(
"Multiple applicable blocks for epoch={} and height=0",
current_epoch
)));
}

block_id
},
}
};

let (sender, receiver) = mpsc::channel(10);
task::spawn(BlockSyncTask::new(self.shard_state_store.clone(), start_block_id, None, sender).run());

Ok(Streaming::new(receiver))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import { useState, useEffect } from "react";
import { useParams } from "react-router-dom";
import { Link, useParams } from "react-router-dom";
import { Accordion, AccordionDetails, AccordionSummary } from "../../Components/Accordion";
import { Grid, Table, TableContainer, TableBody, TableRow, TableCell, Button, Fade, Alert } from "@mui/material";
import Typography from "@mui/material/Typography";
Expand Down Expand Up @@ -185,23 +185,23 @@ export default function BlockDetails() {
</TableRow>
<TableRow>
<TableCell>Epoch</TableCell>
<DataTableCell>{block!.epoch}</DataTableCell>
<DataTableCell>{block!.header.epoch}</DataTableCell>
</TableRow>
<TableRow>
<TableCell>Height</TableCell>
<DataTableCell>{block!.height}</DataTableCell>
<DataTableCell>{block!.header.height}</DataTableCell>
</TableRow>
<TableRow>
<TableCell>Parent block</TableCell>
<DataTableCell>
<a href={`/blocks/${block!.parent}`}>{block!.parent}</a>
<Link to={`/blocks/${block!.header.parent}`}>{block!.header.parent}</Link>
</DataTableCell>
</TableRow>
<TableRow>
<TableCell>Total Fees</TableCell>
<DataTableCell>
<div className={block!.proposed_by === identity!.public_key ? "my_money" : ""}>
{block!.total_leader_fee}
<div className={block!.header.proposed_by === identity!.public_key ? "my_money" : ""}>
{block!.header.total_leader_fee}
</div>
</DataTableCell>
</TableRow>
Expand All @@ -213,7 +213,7 @@ export default function BlockDetails() {
</TableRow>
<TableRow>
<TableCell>Proposed by</TableCell>
<DataTableCell>{block!.proposed_by}</DataTableCell>
<DataTableCell>{block!.header.proposed_by}</DataTableCell>
</TableRow>
<TableRow>
<TableCell>Block time</TableCell>
Expand Down
Loading

0 comments on commit b1dd6a7

Please sign in to comment.