Skip to content

Commit

Permalink
fix leader fail before block+votes
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Sep 23, 2024
1 parent 217d9f1 commit 05ea983
Show file tree
Hide file tree
Showing 17 changed files with 94 additions and 42 deletions.
4 changes: 0 additions & 4 deletions dan_layer/consensus/src/hotstuff/current_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ impl CurrentView {
Self::default()
}

pub(crate) fn set_next_height(&self) {
self.height.fetch_add(1, atomic::Ordering::SeqCst);
}

pub fn get_epoch(&self) -> Epoch {
self.epoch.load(atomic::Ordering::SeqCst).into()
}
Expand Down
3 changes: 2 additions & 1 deletion dan_layer/consensus/src/hotstuff/on_catch_up_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl<TConsensusSpec: ConsensusSpec> OnCatchUpSync<TConsensusSpec> {
from,
self.pacemaker.current_view()
);

// Reset leader timeout to previous height since we're behind and need to process catch up blocks. This is the
// only case where the view is non-monotonic. TODO: is this correct?
self.pacemaker
Expand All @@ -52,7 +53,7 @@ impl<TConsensusSpec: ConsensusSpec> OnCatchUpSync<TConsensusSpec> {
.outbound_messaging
.send(
from,
HotstuffMessage::CatchUpSyncRequest(SyncRequestMessage { epoch, high_qc }),
HotstuffMessage::CatchUpSyncRequest(SyncRequestMessage { high_qc }),
)
.await
.is_err()
Expand Down
22 changes: 21 additions & 1 deletion dan_layer/consensus/src/hotstuff/on_next_sync_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tari_dan_storage::{
use tari_epoch_manager::EpochManagerReader;

use crate::{
hotstuff::HotStuffError,
hotstuff::{pacemaker_handle::PaceMakerHandle, HotStuffError},
messages::{HotstuffMessage, NewViewMessage, VoteMessage},
traits::{ConsensusSpec, LeaderStrategy, OutboundMessaging},
};
Expand All @@ -22,6 +22,8 @@ pub struct OnNextSyncViewHandler<TConsensusSpec: ConsensusSpec> {
outbound_messaging: TConsensusSpec::OutboundMessaging,
leader_strategy: TConsensusSpec::LeaderStrategy,
epoch_manager: TConsensusSpec::EpochManager,
local_validator_addr: TConsensusSpec::Addr,
pacemaker: PaceMakerHandle,
}

impl<TConsensusSpec: ConsensusSpec> OnNextSyncViewHandler<TConsensusSpec> {
Expand All @@ -30,12 +32,16 @@ impl<TConsensusSpec: ConsensusSpec> OnNextSyncViewHandler<TConsensusSpec> {
outbound_messaging: TConsensusSpec::OutboundMessaging,
leader_strategy: TConsensusSpec::LeaderStrategy,
epoch_manager: TConsensusSpec::EpochManager,
local_validator_addr: TConsensusSpec::Addr,
pacemaker: PaceMakerHandle,
) -> Self {
Self {
store,
outbound_messaging,
leader_strategy,
epoch_manager,
local_validator_addr,
pacemaker,
}
}

Expand All @@ -56,6 +62,20 @@ impl<TConsensusSpec: ConsensusSpec> OnNextSyncViewHandler<TConsensusSpec> {
}

let local_committee = self.epoch_manager.get_local_committee(epoch).await?;
let current_leader = self.leader_strategy.get_leader(&local_committee, new_height);

// If we're the leader do not send a NEWVIEW for ourselves or update the view. We may receive block/votes
// shortly after leader failure is triggered. In this case we propose the next block and which may be
// accepted by other nodes before they leader fail.
if *current_leader == self.local_validator_addr {
info!(target: LOG_TARGET, "🌟❓️ I am the next leader for height {new_height}. Not sending NEWVIEW.");
return Ok(());
}

self.pacemaker
.update_view(epoch, new_height, high_qc.block_height())
.await?;

let next_leader = self
.leader_strategy
.get_leader_for_next_block(&local_committee, new_height);
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ where TConsensusSpec: ConsensusSpec
} else {
// Parent is not justified which means we have dummy blocks between the parent and the justified block so we
// can exclude them from the query. Also note that the query will fail if we used the parent
// block id, since the dummy blocks does not exist yet.
// block id, since the dummy blocks do not exist yet.
high_qc_certificate.block_id()
};

Expand Down
21 changes: 13 additions & 8 deletions dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,16 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
) -> Result<(), HotStuffError> {
let _timer = TraceTimer::debug(LOG_TARGET, "OnReceiveLocalProposalHandler");

let exists = self.store.with_read_tx(|tx| msg.block.exists(tx))?;
if exists {
info!(target: LOG_TARGET, "🧊 Block {} already exists", msg.block);
let is_processed = self
.store
.with_read_tx(|tx| Block::has_been_processed(tx, msg.block.id()))
.optional()?
.unwrap_or(false);
if is_processed {
info!(target: LOG_TARGET, "🧊 Block {} has already been processed", msg.block);
return Ok(());
}

// Do not trigger leader failures while processing a proposal.
// Leader failures will be resumed after the proposal has been processed.
// If we vote ACCEPT for the proposal, the leader failure timer will be reset and resume, otherwise (no vote)
Expand Down Expand Up @@ -431,7 +436,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
valid_block.block().save_foreign_send_counters(tx)?;
valid_block.block().justify().save(tx)?;
valid_block.save_all_dummy_blocks(tx)?;
valid_block.block().insert(tx)?;
valid_block.block().save(tx)?;

let (_, high_qc) = valid_block.block().justify().check_high_qc(tx)?;
Ok(high_qc)
Expand Down Expand Up @@ -592,6 +597,8 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
.into());
}

let high_qc = HighQc::get(tx, candidate_block.epoch())?;

// Check that details included in the justify match previously added blocks
let Some(justify_block) = candidate_block.justify().get_block(tx).optional()? else {
// This will trigger a sync
Expand Down Expand Up @@ -635,7 +642,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec

// if the block parent is not the justify parent, then we have experienced a leader failure
// and should make dummy blocks to fill in the gaps.
if !justify_block.is_zero() && !candidate_block.justifies_parent() {
if !high_qc.block_id().is_zero() && !candidate_block.justifies_parent() {
let dummy_blocks = calculate_dummy_blocks_from_justify(
&candidate_block,
&justify_block,
Expand Down Expand Up @@ -670,9 +677,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
return Ok(ValidBlock::with_dummy_blocks(candidate_block, dummy_blocks));
}

// Now that we have all dummy blocks (if any) in place, we can check if the candidate block is safe.
// Specifically, it should extend the locked block via the dummy blocks.
if !candidate_block.is_safe(tx)? {
if !high_qc.block_id().is_zero() && !candidate_block.is_safe(tx)? {
return Err(ProposalValidationError::NotSafeBlock {
proposed_by: candidate_block.proposed_by().to_string(),
hash: *candidate_block.id(),
Expand Down
11 changes: 7 additions & 4 deletions dan_layer/consensus/src/hotstuff/on_receive_new_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ where TConsensusSpec: ConsensusSpec
if let Some(vote) = last_vote {
debug!(
target: LOG_TARGET,
"🔥 Receive VOTE with NEWVIEW for node {} from {}", vote.block_id, from,
"🔥 Receive VOTE with NEWVIEW for node {} {} from {}", vote.block_height, vote.block_id, from,
);
self.vote_receiver
.handle(from.clone(), vote, false, local_committee_info)
Expand Down Expand Up @@ -185,16 +185,19 @@ where TConsensusSpec: ConsensusSpec

info!(
target: LOG_TARGET,
"🌟 Received NEWVIEW {} (QC: {}) has {} votes out of {}",
new_height,
latest_high_qc,
"🌟 Received NEWVIEW (QUORUM: {}/{}) {} (QC: {})",
newview_count,
threshold,
new_height,
latest_high_qc,
);
// Once we have received enough (quorum) NEWVIEWS, we can create the dummy block(s) and propose the next block.
// Any subsequent NEWVIEWs for this height/view are ignored.
if newview_count == threshold {
info!(target: LOG_TARGET, "🌟✅ NEWVIEW for block {} (high_qc: {}) has reached quorum ({}/{})", new_height, latest_high_qc.as_high_qc(), newview_count, threshold);
self.pacemaker
.update_view(epoch, new_height, high_qc.block_height())
.await?;
if latest_high_qc.block_height() + NodeHeight(1) > new_height {
// CASE: the votes received from NEWVIEWS created a new high QC, so there are no dummy blocks to create
// We can force beat with our current leaf and the justified block is the parent.
Expand Down
12 changes: 8 additions & 4 deletions dan_layer/consensus/src/hotstuff/on_sync_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use log::*;
use tari_dan_common_types::{committee::CommitteeInfo, optional::Optional, Epoch};
use tari_dan_storage::{
consensus_models::{Block, LastSentVote, LeafBlock},
consensus_models::{Block, LastProposed, LastSentVote, LeafBlock},
StateStore,
};
use tokio::task;
Expand Down Expand Up @@ -39,12 +39,12 @@ impl<TConsensusSpec: ConsensusSpec> OnSyncRequest<TConsensusSpec> {
epoch: Epoch,
msg: SyncRequestMessage,
) {
if msg.epoch != epoch {
if msg.high_qc.epoch() != epoch {
warn!(
target: LOG_TARGET,
"Received SyncRequest from {} for epoch {} but our epoch is {}. Ignoring request.",
from,
msg.epoch,
msg.high_qc.epoch(),
epoch
);
return;
Expand All @@ -55,7 +55,11 @@ impl<TConsensusSpec: ConsensusSpec> OnSyncRequest<TConsensusSpec> {

task::spawn(async move {
let result = store.with_read_tx(|tx| {
let leaf_block = LeafBlock::get(tx, epoch)?;
let mut leaf_block = LeafBlock::get(tx, epoch)?;
let last_proposed = LastProposed::get(tx)?;
if last_proposed.height > leaf_block.height() {
leaf_block = last_proposed.as_leaf_block();
}

if leaf_block.height() < msg.high_qc.block_height() {
return Err(HotStuffError::InvalidSyncRequest {
Expand Down
13 changes: 9 additions & 4 deletions dan_layer/consensus/src/hotstuff/pacemaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl PaceMaker {
});
}

#[allow(clippy::too_many_lines)]
pub async fn run(
&mut self,
on_beat: OnBeat,
Expand All @@ -88,6 +89,7 @@ impl PaceMaker {
let mut started = false;
let mut leader_failure_suspended = false;
let mut leader_failure_triggered_during_suspension = false;
let mut leader_failure_height_offset = NodeHeight(0);

loop {
tokio::select! {
Expand All @@ -108,6 +110,7 @@ impl PaceMaker {
let delta = self.delta_time();
info!(target: LOG_TARGET, "Reset! Current height: {}, Delta: {:.2?}", self.current_view, delta);
leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta);
leader_failure_height_offset = NodeHeight(0);
// set a timer for when we must send a block...
block_timer.as_mut().reset(self.block_time());
},
Expand All @@ -122,6 +125,7 @@ impl PaceMaker {
let delta = self.delta_time();
info!(target: LOG_TARGET, "Reset! Current height: {}, Delta: {:.2?}", self.current_view, delta);
leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta);
leader_failure_height_offset = NodeHeight(0);
block_timer.as_mut().reset(self.block_time());
on_beat.beat();
started = true;
Expand All @@ -132,6 +136,7 @@ impl PaceMaker {
leader_failure_suspended = false;
leader_failure_triggered_during_suspension = false;
// TODO: we could use futures-rs Either
leader_failure_height_offset = NodeHeight(0);
leader_timeout.as_mut().reset(far_future());
block_timer.as_mut().reset(far_future());
},
Expand All @@ -152,8 +157,8 @@ impl PaceMaker {
let delta = self.delta_time();
leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta);
info!(target: LOG_TARGET, "⚠️ Resumed leader timeout! Current view: {}, Delta: {:.2?}", self.current_view, delta);
self.current_view.set_next_height();
on_leader_timeout.leader_timed_out(self.current_view.get_height());
leader_failure_height_offset += NodeHeight(1);
on_leader_timeout.leader_timed_out(self.current_view.get_height() + leader_failure_height_offset);
}
debug!(target: LOG_TARGET, "🧿 Pacemaker resume leader failure");
}
Expand All @@ -179,8 +184,8 @@ impl PaceMaker {
leader_failure_triggered_during_suspension = true;
} else {
info!(target: LOG_TARGET, "⚠️ Leader timeout! Current view: {}, Delta: {:.2?}", self.current_view, delta);
self.current_view.set_next_height();
on_leader_timeout.leader_timed_out(self.current_view.get_height());
leader_failure_height_offset += NodeHeight(1);
on_leader_timeout.leader_timed_out(self.current_view.get_height() + NodeHeight(1));
}
},

Expand Down
4 changes: 3 additions & 1 deletion dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
let transaction_manager = ConsensusTransactionManager::new(transaction_executor.clone());

Self {
local_validator_addr,
local_validator_addr: local_validator_addr.clone(),
config: config.clone(),
tx_events: tx_events.clone(),
rx_new_transactions,
Expand All @@ -138,6 +138,8 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
outbound_messaging.clone(),
leader_strategy.clone(),
epoch_manager.clone(),
local_validator_addr,
pacemaker.clone_handle(),
),
on_receive_local_proposal: OnReceiveLocalProposalHandler::new(
state_store.clone(),
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/consensus/src/messages/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl HotstuffMessage {
Self::Vote(msg) => msg.epoch,
Self::MissingTransactionsRequest(msg) => msg.epoch,
Self::MissingTransactionsResponse(msg) => msg.epoch,
Self::CatchUpSyncRequest(msg) => msg.epoch,
Self::CatchUpSyncRequest(msg) => msg.high_qc.epoch(),
Self::SyncResponse(msg) => msg.epoch,
}
}
Expand Down
1 change: 0 additions & 1 deletion dan_layer/consensus/src/messages/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use tari_transaction::Transaction;

#[derive(Debug, Clone, Serialize)]
pub struct SyncRequestMessage {
pub epoch: Epoch,
pub high_qc: HighQc,
}

Expand Down
4 changes: 2 additions & 2 deletions dan_layer/consensus_tests/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,6 @@ async fn multishard_local_inputs_foreign_outputs() {
async fn multishard_local_inputs_and_outputs_foreign_outputs() {
setup_logger();
let mut test = Test::builder()
.debug_sql("/tmp/test{}.db")
.add_committee(0, vec!["1", "2"])
.add_committee(1, vec!["3", "4"])
.add_committee(2, vec!["5", "6"])
Expand Down Expand Up @@ -894,6 +893,7 @@ async fn leader_failure_node_goes_down() {
if committed_height == NodeHeight(1) {
// This allows a few more leader failures to occur
test.send_transaction_to_all(Decision::Commit, 1, 2, 1).await;
test.wait_for_pool_count(TestVnDestination::All, 1).await;
}

if test.validators().filter(|vn| vn.address != failure_node).all(|v| {
Expand All @@ -904,7 +904,7 @@ async fn leader_failure_node_goes_down() {
break;
}

if committed_height > NodeHeight(100) {
if committed_height > NodeHeight(50) {
panic!("Not all transaction committed after {} blocks", committed_height);
}
}
Expand Down
12 changes: 12 additions & 0 deletions dan_layer/consensus_tests/src/support/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,18 @@ impl Test {
.await
}

pub async fn wait_for_pool_count(&self, dest: TestVnDestination, count: usize) {
self.wait_all_for_predicate("waiting for pool count", |vn| {
if !dest.is_for_vn(vn) {
return true;
}
let c = vn.get_transaction_pool_count();
log::info!("{} has {} transactions in pool", vn.address, c);
c >= count
})
.await
}

pub fn with_all_validators(&self, f: impl FnMut(&Validator)) {
self.validators.values().for_each(f);
}
Expand Down
3 changes: 1 addition & 2 deletions dan_layer/p2p/proto/consensus.proto
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,7 @@ message SubstateDestroyed {
}

message SyncRequest {
uint64 epoch = 1;
HighQc high_qc = 2;
HighQc high_qc = 1;
}

message HighQc {
Expand Down
4 changes: 1 addition & 3 deletions dan_layer/p2p/src/conversions/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,11 +888,10 @@ impl From<SubstateDestroyed> for proto::consensus::SubstateDestroyed {
impl From<&SyncRequestMessage> for proto::consensus::SyncRequest {
fn from(value: &SyncRequestMessage) -> Self {
Self {
epoch: value.epoch.as_u64(),
high_qc: Some(proto::consensus::HighQc {
block_id: value.high_qc.block_id.as_bytes().to_vec(),
block_height: value.high_qc.block_height.as_u64(),
epoch: value.epoch.as_u64(),
epoch: value.high_qc.epoch.as_u64(),
qc_id: value.high_qc.qc_id.as_bytes().to_vec(),
}),
}
Expand All @@ -904,7 +903,6 @@ impl TryFrom<proto::consensus::SyncRequest> for SyncRequestMessage {

fn try_from(value: proto::consensus::SyncRequest) -> Result<Self, Self::Error> {
Ok(Self {
epoch: Epoch(value.epoch),
high_qc: value
.high_qc
.map(|value| {
Expand Down
Loading

0 comments on commit 05ea983

Please sign in to comment.