Skip to content

Commit

Permalink
Merge branch 'development' of https://github.com/tari-project/tari-dan
Browse files Browse the repository at this point in the history
…into gossip
  • Loading branch information
mrnaveira committed Sep 24, 2024
2 parents 5400231 + 49c82f7 commit 622718a
Show file tree
Hide file tree
Showing 65 changed files with 1,173 additions and 692 deletions.
2 changes: 1 addition & 1 deletion applications/tari_indexer/src/dry_run/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ where TSubstateCache: SubstateCache + 'static
for (epoch, public_key) in claim_instructions {
let vn = self
.epoch_manager
.get_validator_node_by_public_key(epoch, &public_key)
.get_validator_node_by_public_key(epoch, public_key.clone())
.await?;
let address = VirtualSubstateId::UnclaimedValidatorFee {
epoch: epoch.as_u64(),
Expand Down
42 changes: 42 additions & 0 deletions applications/tari_swarm_daemon/src/webserver/rpc/instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,27 @@ use serde::{Deserialize, Serialize};

use crate::{config::InstanceType, process_manager::InstanceId, webserver::context::HandlerContext};

#[derive(Debug, Clone, Deserialize)]
pub struct StartAllRequest {
instance_type: Option<InstanceType>,
}

#[derive(Debug, Clone, Serialize)]
pub struct StartAllResponse {
pub num_instances: u32,
}

pub async fn start_all(context: &HandlerContext, req: StartAllRequest) -> Result<StartAllResponse, anyhow::Error> {
let instances = context.process_manager().list_instances(req.instance_type).await?;

let num_instances = instances.len() as u32;
for instance in instances {
context.process_manager().start_instance(instance.id).await?;
}

Ok(StartAllResponse { num_instances })
}

pub type StartInstanceRequest = String;

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -65,6 +86,27 @@ pub async fn stop(context: &HandlerContext, req: StopInstanceRequest) -> Result<
Ok(StopInstanceResponse { success: true })
}

#[derive(Debug, Clone, Deserialize)]
pub struct StopAllRequest {
instance_type: Option<InstanceType>,
}

#[derive(Debug, Clone, Serialize)]
pub struct StopAllResponse {
pub num_instances: u32,
}

pub async fn stop_all(context: &HandlerContext, req: StopAllRequest) -> Result<StopAllResponse, anyhow::Error> {
let instances = context.process_manager().list_instances(req.instance_type).await?;

let num_instances = instances.len() as u32;
for instance in instances {
context.process_manager().stop_instance(instance.id).await?;
}

Ok(StopAllResponse { num_instances })
}

#[derive(Debug, Clone, Deserialize)]
pub struct ListInstancesRequest {
pub by_type: Option<InstanceType>,
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_swarm_daemon/src/webserver/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ async fn json_rpc_handler(Extension(context): Extension<Arc<HandlerContext>>, va
"add_indexer" => call_handler(context, value, rpc::indexers::create).await,
"add_validator_node" => call_handler(context, value, rpc::validator_nodes::create).await,
"start" => call_handler(context, value, rpc::instances::start).await,
"start_all" => call_handler(context, value, rpc::instances::start_all).await,
"stop" => call_handler(context, value, rpc::instances::stop).await,
"stop_all" => call_handler(context, value, rpc::instances::stop_all).await,
"list_instances" => call_handler(context, value, rpc::instances::list).await,
"delete_data" => call_handler(context, value, rpc::instances::delete_data).await,
"burn_funds" => call_handler(context, value, rpc::minotari_wallets::burn_funds).await,
Expand Down
13 changes: 12 additions & 1 deletion applications/tari_swarm_daemon/webui/src/routes/Main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ function ExtraInfoVN({ name, url, addTxToPool, autoRefresh, state, horizontal }:
}
return (<>
<hr />
<h3>Pool transaction</h3>
<h3>Pool transactions {pool.length}</h3>
<table style={{
width: "100%",
}}>
Expand Down Expand Up @@ -493,8 +493,19 @@ export default function Main() {
console.log("resp", resp);
});
};

const stopAll = () => {
jsonRpc("stop_all", { instance_type: "TariValidatorNode" }).then(getInfo);
};

const startAll = () => {
jsonRpc("start_all", { instance_type: "TariValidatorNode" }).then(getInfo);
};

return (
<div className="main">
<button onClick={() => stopAll()}>Stop all VNs</button>
<button onClick={() => startAll()}>Start all VNs</button>
<button onClick={() => setShowLogs(!showLogs)}>{showLogs && "Hide" || "Show"} logs</button>
<button onClick={() => setAutoRefresh(!autoRefresh)}>{autoRefresh && "Disable" || "Enable"} autorefresh
</button>
Expand Down
3 changes: 3 additions & 0 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ pub async fn spawn_services(
state_store.clone(),
mempool.clone(),
virtual_substate_manager,
consensus_handle.clone(),
)
.await?;
// Save final node identity after comms has initialized. This is required because the public_address can be
Expand Down Expand Up @@ -447,6 +448,7 @@ async fn spawn_p2p_rpc(
shard_store_store: SqliteStateStore<PeerAddress>,
mempool: MempoolHandle,
virtual_substate_manager: VirtualSubstateManager<SqliteStateStore<PeerAddress>, EpochManagerHandle<PeerAddress>>,
consensus: ConsensusHandle,
) -> anyhow::Result<()> {
let rpc_server = RpcServer::builder()
.with_maximum_simultaneous_sessions(config.validator_node.rpc.max_simultaneous_sessions)
Expand All @@ -457,6 +459,7 @@ async fn spawn_p2p_rpc(
shard_store_store,
mempool,
virtual_substate_manager,
consensus,
));

let (notify_tx, notify_rx) = mpsc::unbounded_channel();
Expand Down
5 changes: 5 additions & 0 deletions applications/tari_validator_node/src/consensus/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use tari_consensus::hotstuff::{ConsensusCurrentState, CurrentView, HotstuffEvent};
use tari_dan_common_types::Epoch;
use tari_transaction::Transaction;
use tokio::sync::{broadcast, mpsc, watch};

Expand Down Expand Up @@ -30,6 +31,10 @@ impl ConsensusHandle {
}
}

pub fn current_epoch(&self) -> Epoch {
self.current_view.get_epoch()
}

pub async fn notify_new_transaction(
&self,
transaction: Transaction,
Expand Down
8 changes: 7 additions & 1 deletion applications/tari_validator_node/src/p2p/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,24 @@ use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_state_store_sqlite::SqliteStateStore;
use tari_validator_node_rpc::rpc_service::ValidatorNodeRpcServer;

use crate::{p2p::services::mempool::MempoolHandle, virtual_substate::VirtualSubstateManager};
use crate::{
consensus::ConsensusHandle,
p2p::services::mempool::MempoolHandle,
virtual_substate::VirtualSubstateManager,
};

pub fn create_tari_validator_node_rpc_service(
epoch_manager: EpochManagerHandle<PeerAddress>,
shard_store_store: SqliteStateStore<PeerAddress>,
mempool: MempoolHandle,
virtual_substate_manager: VirtualSubstateManager<SqliteStateStore<PeerAddress>, EpochManagerHandle<PeerAddress>>,
consensus: ConsensusHandle,
) -> ValidatorNodeRpcServer<ValidatorNodeRpcServiceImpl> {
ValidatorNodeRpcServer::new(ValidatorNodeRpcServiceImpl::new(
epoch_manager,
shard_store_store,
mempool,
virtual_substate_manager,
consensus,
))
}
10 changes: 5 additions & 5 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use tari_validator_node_rpc::rpc_service::ValidatorNodeRpcService;
use tokio::{sync::mpsc, task};

use crate::{
consensus::ConsensusHandle,
p2p::{
rpc::{block_sync_task::BlockSyncTask, state_sync_task::StateSyncTask},
services::mempool::MempoolHandle,
Expand All @@ -80,6 +81,7 @@ pub struct ValidatorNodeRpcServiceImpl {
shard_state_store: SqliteStateStore<PeerAddress>,
mempool: MempoolHandle,
virtual_substate_manager: VirtualSubstateManager<SqliteStateStore<PeerAddress>, EpochManagerHandle<PeerAddress>>,
consensus: ConsensusHandle,
}

impl ValidatorNodeRpcServiceImpl {
Expand All @@ -91,12 +93,14 @@ impl ValidatorNodeRpcServiceImpl {
SqliteStateStore<PeerAddress>,
EpochManagerHandle<PeerAddress>,
>,
consensus: ConsensusHandle,
) -> Self {
Self {
epoch_manager,
shard_state_store,
mempool,
virtual_substate_manager,
consensus,
}
}
}
Expand Down Expand Up @@ -340,11 +344,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
request: Request<GetCheckpointRequest>,
) -> Result<Response<GetCheckpointResponse>, RpcStatus> {
let msg = request.into_message();
let current_epoch = self
.epoch_manager
.current_epoch()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
let current_epoch = self.consensus.current_epoch();
if msg.current_epoch != current_epoch {
// This may occur if one of the nodes has not fully scanned the base layer
return Err(RpcStatus::bad_request(format!(
Expand Down
12 changes: 0 additions & 12 deletions dan_layer/common_types/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,18 +259,6 @@ impl CommitteeInfo {
.into_iter()
.filter(|substate_address| self.includes_substate_address(substate_address.borrow()))
}

/// Calculates the number of distinct shard groups for the given addresses
pub fn count_distinct_shard_groups<B: Borrow<SubstateAddress>, I: IntoIterator<Item = B>>(
&self,
addresses: I,
) -> usize {
addresses
.into_iter()
.map(|addr| addr.borrow().to_shard_group(self.num_shards, self.num_committees))
.collect::<std::collections::HashSet<_>>()
.len()
}
}

#[derive(Debug, Clone, Serialize)]
Expand Down
16 changes: 5 additions & 11 deletions dan_layer/common_types/src/versioned_substate_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl SubstateRequirement {
.map(|v| SubstateAddress::from_substate_id(self.substate_id(), v))
}

pub fn to_substate_address_zero_version(&self) -> SubstateAddress {
SubstateAddress::from_substate_id(self.substate_id(), 0)
}

/// Calculates and returns the shard number that this SubstateAddress belongs.
/// A shard is a fixed division of the 256-bit shard space.
/// If the substate version is not known, None is returned.
Expand Down Expand Up @@ -118,7 +122,7 @@ impl Display for SubstateRequirement {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.version {
Some(v) => write!(f, "{}:{}", self.substate_id, v),
None => write!(f, "{}", self.substate_id),
None => write!(f, "{}:?", self.substate_id),
}
}
}
Expand Down Expand Up @@ -180,16 +184,6 @@ impl VersionedSubstateId {
self.version
}

/// Calculates and returns the shard number that this SubstateAddress belongs.
/// A shard is an equal division of the 256-bit shard space.
pub fn to_shard(&self, num_shards: NumPreshards) -> Shard {
self.to_substate_address().to_shard(num_shards)
}

pub fn to_shard_group(&self, num_shards: NumPreshards, num_committees: u32) -> ShardGroup {
self.to_substate_address().to_shard_group(num_shards, num_committees)
}

pub fn to_previous_version(&self) -> Option<Self> {
self.version
.checked_sub(1)
Expand Down
7 changes: 4 additions & 3 deletions dan_layer/consensus/src/block_validations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub async fn check_proposal<TConsensusSpec: ConsensusSpec>(
check_sidechain_id(block, config)?;
check_hash_and_height(block)?;
let committee_for_block = epoch_manager
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by())
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by().clone())
.await?;
check_proposed_by_leader(leader_strategy, &committee_for_block, block)?;
check_signature(block)?;
Expand Down Expand Up @@ -181,7 +181,7 @@ pub async fn check_quorum_certificate<TConsensusSpec: ConsensusSpec>(
let mut vns = vec![];
for signature in qc.signatures() {
let vn = epoch_manager
.get_validator_node_by_public_key(qc.epoch(), signature.public_key())
.get_validator_node_by_public_key(qc.epoch(), signature.public_key().clone())
.await?;
let committee_info = epoch_manager
.get_committee_info_for_substate(qc.epoch(), vn.shard_key)
Expand Down Expand Up @@ -209,7 +209,8 @@ pub async fn check_quorum_certificate<TConsensusSpec: ConsensusSpec>(
qc.signatures()
.first()
.ok_or::<HotStuffError>(ProposalValidationError::QuorumWasNotReached { qc: qc.clone() }.into())?
.public_key(),
.public_key()
.clone(),
)
.await?;

Expand Down
Loading

0 comments on commit 622718a

Please sign in to comment.