Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Sep 23, 2024
1 parent 217d9f1 commit 9439b69
Show file tree
Hide file tree
Showing 18 changed files with 128 additions and 42 deletions.
42 changes: 42 additions & 0 deletions applications/tari_swarm_daemon/src/webserver/rpc/instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,27 @@ use serde::{Deserialize, Serialize};

use crate::{config::InstanceType, process_manager::InstanceId, webserver::context::HandlerContext};

#[derive(Debug, Clone, Deserialize)]
pub struct StartAllRequest {
instance_type: Option<InstanceType>,
}

#[derive(Debug, Clone, Serialize)]
pub struct StartAllResponse {
pub num_instances: u32,
}

pub async fn start_all(context: &HandlerContext, req: StartAllRequest) -> Result<StartAllResponse, anyhow::Error> {
let instances = context.process_manager().list_instances(req.instance_type).await?;

let num_instances = instances.len() as u32;
for instance in instances {
context.process_manager().start_instance(instance.id).await?;
}

Ok(StartAllResponse { num_instances })
}

pub type StartInstanceRequest = String;

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -65,6 +86,27 @@ pub async fn stop(context: &HandlerContext, req: StopInstanceRequest) -> Result<
Ok(StopInstanceResponse { success: true })
}

#[derive(Debug, Clone, Deserialize)]
pub struct StopAllRequest {
instance_type: Option<InstanceType>,
}

#[derive(Debug, Clone, Serialize)]
pub struct StopAllResponse {
pub num_instances: u32,
}

pub async fn stop_all(context: &HandlerContext, req: StopAllRequest) -> Result<StopAllResponse, anyhow::Error> {
let instances = context.process_manager().list_instances(req.instance_type).await?;

let num_instances = instances.len() as u32;
for instance in instances {
context.process_manager().stop_instance(instance.id).await?;
}

Ok(StopAllResponse { num_instances })
}

#[derive(Debug, Clone, Deserialize)]
pub struct ListInstancesRequest {
pub by_type: Option<InstanceType>,
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_swarm_daemon/src/webserver/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ async fn json_rpc_handler(Extension(context): Extension<Arc<HandlerContext>>, va
"add_indexer" => call_handler(context, value, rpc::indexers::create).await,
"add_validator_node" => call_handler(context, value, rpc::validator_nodes::create).await,
"start" => call_handler(context, value, rpc::instances::start).await,
"start_all" => call_handler(context, value, rpc::instances::start_all).await,
"stop" => call_handler(context, value, rpc::instances::stop).await,
"stop_all" => call_handler(context, value, rpc::instances::stop_all).await,
"list_instances" => call_handler(context, value, rpc::instances::list).await,
"delete_data" => call_handler(context, value, rpc::instances::delete_data).await,
"burn_funds" => call_handler(context, value, rpc::minotari_wallets::burn_funds).await,
Expand Down
11 changes: 11 additions & 0 deletions applications/tari_swarm_daemon/webui/src/routes/Main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,19 @@ export default function Main() {
console.log("resp", resp);
});
};

const stopAll = () => {
jsonRpc("stop_all", { instance_type: "TariValidatorNode" }).then(getInfo);
};

const startAll = () => {
jsonRpc("start_all", { instance_type: "TariValidatorNode" }).then(getInfo);
};

return (
<div className="main">
<button onClick={() => stopAll()}>Stop all VNs</button>
<button onClick={() => startAll()}>Start all VNs</button>
<button onClick={() => setShowLogs(!showLogs)}>{showLogs && "Hide" || "Show"} logs</button>
<button onClick={() => setAutoRefresh(!autoRefresh)}>{autoRefresh && "Disable" || "Enable"} autorefresh
</button>
Expand Down
3 changes: 2 additions & 1 deletion dan_layer/consensus/src/hotstuff/on_catch_up_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl<TConsensusSpec: ConsensusSpec> OnCatchUpSync<TConsensusSpec> {
from,
self.pacemaker.current_view()
);

// Reset leader timeout to previous height since we're behind and need to process catch up blocks. This is the
// only case where the view is non-monotonic. TODO: is this correct?
self.pacemaker
Expand All @@ -52,7 +53,7 @@ impl<TConsensusSpec: ConsensusSpec> OnCatchUpSync<TConsensusSpec> {
.outbound_messaging
.send(
from,
HotstuffMessage::CatchUpSyncRequest(SyncRequestMessage { epoch, high_qc }),
HotstuffMessage::CatchUpSyncRequest(SyncRequestMessage { high_qc }),
)
.await
.is_err()
Expand Down
4 changes: 2 additions & 2 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,13 +409,13 @@ where TConsensusSpec: ConsensusSpec

let justifies_parent = high_qc_certificate.block_id() == parent_block.block_id();
let next_height = parent_block.height() + NodeHeight(1);
let start_of_chain_id = if justifies_parent || parent_block.height() == NodeHeight(1) {
let start_of_chain_id = if justifies_parent || high_qc_certificate.is_zero() {
// Parent is justified - we can include its state in the MR calc, foreign propose etc
parent_block.block_id()
} else {
// Parent is not justified which means we have dummy blocks between the parent and the justified block so we
// can exclude them from the query. Also note that the query will fail if we used the parent
// block id, since the dummy blocks does not exist yet.
// block id, since the dummy blocks do not exist yet.
high_qc_certificate.block_id()
};

Expand Down
23 changes: 14 additions & 9 deletions dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,16 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
) -> Result<(), HotStuffError> {
let _timer = TraceTimer::debug(LOG_TARGET, "OnReceiveLocalProposalHandler");

let exists = self.store.with_read_tx(|tx| msg.block.exists(tx))?;
if exists {
info!(target: LOG_TARGET, "🧊 Block {} already exists", msg.block);
let is_justified = self
.store
.with_read_tx(|tx| Block::has_been_justified(tx, msg.block.id()))
.optional()?
.unwrap_or(false);
if is_justified {
info!(target: LOG_TARGET, "🧊 Block {} has already been processed", msg.block);
return Ok(());
}

// Do not trigger leader failures while processing a proposal.
// Leader failures will be resumed after the proposal has been processed.
// If we vote ACCEPT for the proposal, the leader failure timer will be reset and resume, otherwise (no vote)
Expand Down Expand Up @@ -431,7 +436,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
valid_block.block().save_foreign_send_counters(tx)?;
valid_block.block().justify().save(tx)?;
valid_block.save_all_dummy_blocks(tx)?;
valid_block.block().insert(tx)?;
valid_block.block().save(tx)?;

let (_, high_qc) = valid_block.block().justify().check_high_qc(tx)?;
Ok(high_qc)
Expand Down Expand Up @@ -584,14 +589,16 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
local_committee: &Committee<TConsensusSpec::Addr>,
_local_committee_info: &CommitteeInfo,
) -> Result<ValidBlock, HotStuffError> {
if Block::has_been_processed(tx, candidate_block.id())? {
if Block::has_been_justified(tx, candidate_block.id())? {
return Err(ProposalValidationError::BlockAlreadyProcessed {
block_id: *candidate_block.id(),
height: candidate_block.height(),
}
.into());
}

let high_qc = HighQc::get(tx, candidate_block.epoch())?;

// Check that details included in the justify match previously added blocks
let Some(justify_block) = candidate_block.justify().get_block(tx).optional()? else {
// This will trigger a sync
Expand Down Expand Up @@ -635,7 +642,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec

// if the block parent is not the justify parent, then we have experienced a leader failure
// and should make dummy blocks to fill in the gaps.
if !justify_block.is_zero() && !candidate_block.justifies_parent() {
if !high_qc.block_id().is_zero() && !candidate_block.justifies_parent() {
let dummy_blocks = calculate_dummy_blocks_from_justify(
&candidate_block,
&justify_block,
Expand Down Expand Up @@ -670,9 +677,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
return Ok(ValidBlock::with_dummy_blocks(candidate_block, dummy_blocks));
}

// Now that we have all dummy blocks (if any) in place, we can check if the candidate block is safe.
// Specifically, it should extend the locked block via the dummy blocks.
if !candidate_block.is_safe(tx)? {
if !high_qc.block_id().is_zero() && !candidate_block.is_safe(tx)? {
return Err(ProposalValidationError::NotSafeBlock {
proposed_by: candidate_block.proposed_by().to_string(),
hash: *candidate_block.id(),
Expand Down
11 changes: 7 additions & 4 deletions dan_layer/consensus/src/hotstuff/on_receive_new_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where TConsensusSpec: ConsensusSpec
if let Some(vote) = last_vote {
debug!(
target: LOG_TARGET,
"🔥 Receive VOTE with NEWVIEW for node {} from {}", vote.block_id, from,
"🔥 Receive VOTE with NEWVIEW for node {} {} from {}", vote.block_height, vote.block_id, from,
);
self.vote_receiver
.handle(from.clone(), vote, false, local_committee_info)
Expand Down Expand Up @@ -185,16 +185,19 @@ where TConsensusSpec: ConsensusSpec

info!(
target: LOG_TARGET,
"🌟 Received NEWVIEW {} (QC: {}) has {} votes out of {}",
new_height,
latest_high_qc,
"🌟 Received NEWVIEW (QUORUM: {}/{}) {} (QC: {})",
newview_count,
threshold,
new_height,
latest_high_qc,
);
// Once we have received enough (quorum) NEWVIEWS, we can create the dummy block(s) and propose the next block.
// Any subsequent NEWVIEWs for this height/view are ignored.
if newview_count == threshold {
info!(target: LOG_TARGET, "🌟✅ NEWVIEW for block {} (high_qc: {}) has reached quorum ({}/{})", new_height, latest_high_qc.as_high_qc(), newview_count, threshold);
self.pacemaker
.update_view(epoch, new_height, high_qc.block_height())
.await?;
if latest_high_qc.block_height() + NodeHeight(1) > new_height {
// CASE: the votes received from NEWVIEWS created a new high QC, so there are no dummy blocks to create
// We can force beat with our current leaf and the justified block is the parent.
Expand Down
12 changes: 8 additions & 4 deletions dan_layer/consensus/src/hotstuff/on_sync_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use log::*;
use tari_dan_common_types::{committee::CommitteeInfo, optional::Optional, Epoch};
use tari_dan_storage::{
consensus_models::{Block, LastSentVote, LeafBlock},
consensus_models::{Block, LastProposed, LastSentVote, LeafBlock},
StateStore,
};
use tokio::task;
Expand Down Expand Up @@ -39,12 +39,12 @@ impl<TConsensusSpec: ConsensusSpec> OnSyncRequest<TConsensusSpec> {
epoch: Epoch,
msg: SyncRequestMessage,
) {
if msg.epoch != epoch {
if msg.high_qc.epoch() != epoch {
warn!(
target: LOG_TARGET,
"Received SyncRequest from {} for epoch {} but our epoch is {}. Ignoring request.",
from,
msg.epoch,
msg.high_qc.epoch(),
epoch
);
return;
Expand All @@ -55,7 +55,11 @@ impl<TConsensusSpec: ConsensusSpec> OnSyncRequest<TConsensusSpec> {

task::spawn(async move {
let result = store.with_read_tx(|tx| {
let leaf_block = LeafBlock::get(tx, epoch)?;
let mut leaf_block = LeafBlock::get(tx, epoch)?;
let last_proposed = LastProposed::get(tx)?;
if last_proposed.height > leaf_block.height() {
leaf_block = last_proposed.as_leaf_block();
}

if leaf_block.height() < msg.high_qc.block_height() {
return Err(HotStuffError::InvalidSyncRequest {
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
// }

// If we can propose a block end, let's not wait for the block time to do it
self.pacemaker.beat();
// self.pacemaker.beat();
},
EpochManagerEvent::ThisValidatorIsRegistered { .. } => {},
}
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/consensus/src/messages/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl HotstuffMessage {
Self::Vote(msg) => msg.epoch,
Self::MissingTransactionsRequest(msg) => msg.epoch,
Self::MissingTransactionsResponse(msg) => msg.epoch,
Self::CatchUpSyncRequest(msg) => msg.epoch,
Self::CatchUpSyncRequest(msg) => msg.high_qc.epoch(),
Self::SyncResponse(msg) => msg.epoch,
}
}
Expand Down
1 change: 0 additions & 1 deletion dan_layer/consensus/src/messages/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use tari_transaction::Transaction;

#[derive(Debug, Clone, Serialize)]
pub struct SyncRequestMessage {
pub epoch: Epoch,
pub high_qc: HighQc,
}

Expand Down
4 changes: 2 additions & 2 deletions dan_layer/consensus_tests/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,6 @@ async fn multishard_local_inputs_foreign_outputs() {
async fn multishard_local_inputs_and_outputs_foreign_outputs() {
setup_logger();
let mut test = Test::builder()
.debug_sql("/tmp/test{}.db")
.add_committee(0, vec!["1", "2"])
.add_committee(1, vec!["3", "4"])
.add_committee(2, vec!["5", "6"])
Expand Down Expand Up @@ -894,6 +893,7 @@ async fn leader_failure_node_goes_down() {
if committed_height == NodeHeight(1) {
// This allows a few more leader failures to occur
test.send_transaction_to_all(Decision::Commit, 1, 2, 1).await;
test.wait_for_pool_count(TestVnDestination::All, 1).await;
}

if test.validators().filter(|vn| vn.address != failure_node).all(|v| {
Expand All @@ -904,7 +904,7 @@ async fn leader_failure_node_goes_down() {
break;
}

if committed_height > NodeHeight(100) {
if committed_height > NodeHeight(50) {
panic!("Not all transaction committed after {} blocks", committed_height);
}
}
Expand Down
12 changes: 12 additions & 0 deletions dan_layer/consensus_tests/src/support/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,18 @@ impl Test {
.await
}

pub async fn wait_for_pool_count(&self, dest: TestVnDestination, count: usize) {
self.wait_all_for_predicate("waiting for pool count", |vn| {
if !dest.is_for_vn(vn) {
return true;
}
let c = vn.get_transaction_pool_count();
log::info!("{} has {} transactions in pool", vn.address, c);
c >= count
})
.await
}

pub fn with_all_validators(&self, f: impl FnMut(&Validator)) {
self.validators.values().for_each(f);
}
Expand Down
3 changes: 1 addition & 2 deletions dan_layer/p2p/proto/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ message SubstateDestroyed {
}

message SyncRequest {
uint64 epoch = 1;
HighQc high_qc = 2;
HighQc high_qc = 1;
}

message HighQc {
Expand Down
4 changes: 1 addition & 3 deletions dan_layer/p2p/src/conversions/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,11 +888,10 @@ impl From<SubstateDestroyed> for proto::consensus::SubstateDestroyed {
impl From<&SyncRequestMessage> for proto::consensus::SyncRequest {
fn from(value: &SyncRequestMessage) -> Self {
Self {
epoch: value.epoch.as_u64(),
high_qc: Some(proto::consensus::HighQc {
block_id: value.high_qc.block_id.as_bytes().to_vec(),
block_height: value.high_qc.block_height.as_u64(),
epoch: value.epoch.as_u64(),
epoch: value.high_qc.epoch.as_u64(),
qc_id: value.high_qc.qc_id.as_bytes().to_vec(),
}),
}
Expand All @@ -904,7 +903,6 @@ impl TryFrom<proto::consensus::SyncRequest> for SyncRequestMessage {

fn try_from(value: proto::consensus::SyncRequest) -> Result<Self, Self::Error> {
Ok(Self {
epoch: Epoch(value.epoch),
high_qc: value
.high_qc
.map(|value| {
Expand Down
Loading

0 comments on commit 9439b69

Please sign in to comment.