Skip to content

Commit

Permalink
Lr/eqc voting (#3851)
Browse files Browse the repository at this point in the history
* Initial commit

* Propose the same block at the end of the epoch

* No VID and DAC required for the additional last block proposals

* Guard against division by 0

* Traverse the leaves to check an eQC

* Remove unneeded async in function definition

* Remove trace

* Make sure proposal is for the same block if justify QC references the last block

* Initial commit

* Gate epoch proposal logic

* Gate epochs voting logic

* Create helper method to check if QC is part of 3-chain

* Update epoch on `ViewChange` event

* Adjust tests

* Update view and epoch when QC is formed

* Fixes after review

* Get rid of nasty nested if-elses

* Fix fmt

* Update VID that we reuse at the end of the epoch

* Fix fmt

* Do not create VID and DAC dependencies when voting for the last block

* Simplify how we get a header

* NetworkEventTaskState uses OuterConsensus

* Refactor some of the eQC related methods in `Consensus`

* Add back a debugging trace

* Last eQC vote is broadcast

* Do not check if we are the leader when receiving last votes for eQC

* Add more traces

* Correctly update epoch when voting and forming QC

* Send ViewChange event when voting after shared state's been updated

* Adjust tests

* Remove an obsolete DA ViewChange code

* Fetch proposal if missing when changing view after forming QC

* Fix deadlock

* Update epoch only when receiving or forming an eQC

* update logging (#3844)

* [Tech debt] Remove `async-std` (#3845)

* tests and CI

* remove `async-std`

* `fmt`

* fix doc build

* remove counter tests

* remove counter tests

* build w/o lockfile

* `lock` -> `std`

* Revert "`lock` -> `std`"

This reverts commit 21ebf05.

* lock

* `async_broadcast`

* overflow

* Revert "`async_broadcast`"

This reverts commit f03bb57.

* `try_send`

* Fix fmt

* Simplify code

* Move a helper function to types crate

* Add epoch safety check

* Rename storage method

* Clean up the code

* Compare to genesis view

---------

Co-authored-by: ss-es <155648797+ss-es@users.noreply.github.com>
Co-authored-by: rob-maron <132852777+rob-maron@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 13, 2024
1 parent c04a16a commit 32e6970
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 42 deletions.
15 changes: 7 additions & 8 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
pub mod task_state;
use std::{collections::BTreeMap, fmt::Debug, num::NonZeroUsize, sync::Arc, time::Duration};

use crate::{
tasks::task_state::CreateTaskState, types::SystemContextHandle, ConsensusApi,
ConsensusMetricsValue, ConsensusTaskRegistry, HotShotConfig, HotShotInitializer,
MarketplaceConfig, Memberships, NetworkTaskRegistry, SignatureKey, SystemContext, Versions,
};
use async_broadcast::{broadcast, RecvError};
use async_lock::RwLock;
use async_trait::async_trait;
Expand All @@ -32,7 +37,7 @@ use hotshot_task_impls::{
view_sync::ViewSyncTaskState,
};
use hotshot_types::{
consensus::Consensus,
consensus::{Consensus, OuterConsensus},
constants::EVENT_CHANNEL_SIZE,
message::{Message, UpgradeLock},
traits::{
Expand All @@ -43,12 +48,6 @@ use hotshot_types::{
use tokio::{spawn, time::sleep};
use vbs::version::StaticVersionType;

use crate::{
tasks::task_state::CreateTaskState, types::SystemContextHandle, ConsensusApi,
ConsensusMetricsValue, ConsensusTaskRegistry, HotShotConfig, HotShotInitializer,
MarketplaceConfig, Memberships, NetworkTaskRegistry, SignatureKey, SystemContext, Versions,
};

/// event for global event stream
#[derive(Clone, Debug)]
pub enum GlobalEvent {
Expand Down Expand Up @@ -200,7 +199,7 @@ pub fn add_network_event_task<
quorum_membership,
da_membership,
storage: Arc::clone(&handle.storage()),
consensus: Arc::clone(&handle.consensus()),
consensus: OuterConsensus::new(handle.consensus()),
upgrade_lock: handle.hotshot.upgrade_lock.clone(),
transmit_tasks: BTreeMap::new(),
};
Expand Down
19 changes: 13 additions & 6 deletions crates/task-impls/src/consensus/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,19 @@ pub(crate) async fn handle_quorum_vote_recv<
sender: &Sender<Arc<HotShotEvent<TYPES>>>,
task_state: &mut ConsensusTaskState<TYPES, I, V>,
) -> Result<()> {
// Are we the leader for this view?
let is_vote_leaf_extended = task_state
.consensus
.read()
.await
.is_leaf_extended(vote.data.leaf_commit);
let we_are_leader = task_state
.quorum_membership
.leader(vote.view_number() + 1, task_state.cur_epoch)?
== task_state.public_key;
ensure!(
task_state
.quorum_membership
.leader(vote.view_number() + 1, task_state.cur_epoch)?
== task_state.public_key,
is_vote_leaf_extended || we_are_leader,
info!(
"We are not the leader for view {:?}",
"We are not the leader for view {:?} and this is not the last vote for eQC",
vote.view_number() + 1
)
);
Expand All @@ -60,6 +65,7 @@ pub(crate) async fn handle_quorum_vote_recv<
&event,
sender,
&task_state.upgrade_lock,
!is_vote_leaf_extended,
)
.await?;

Expand Down Expand Up @@ -99,6 +105,7 @@ pub(crate) async fn handle_timeout_vote_recv<
&event,
sender,
&task_state.upgrade_lock,
true,
)
.await?;

Expand Down
1 change: 1 addition & 0 deletions crates/task-impls/src/da.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> DaTaskState<TYP
&event,
&event_stream,
&self.upgrade_lock,
true,
)
.await?;
}
Expand Down
13 changes: 12 additions & 1 deletion crates/task-impls/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ pub enum HotShotEvent<TYPES: NodeType> {
QuorumProposalSend(Proposal<TYPES, QuorumProposal<TYPES>>, TYPES::SignatureKey),
/// Send a quorum vote to the next leader; emitted by a replica in the consensus task after seeing a valid quorum proposal
QuorumVoteSend(QuorumVote<TYPES>),
/// Broadcast a quorum vote to form an eQC; emitted by a replica in the consensus task after seeing a valid quorum proposal
ExtendedQuorumVoteSend(QuorumVote<TYPES>),
/// A quorum proposal with the given parent leaf is validated.
/// The full validation checks include:
/// 1. The proposal is not for an old view
Expand Down Expand Up @@ -253,7 +255,9 @@ impl<TYPES: NodeType> HotShotEvent<TYPES> {
| HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => {
Some(proposal.data.view_number())
}
HotShotEvent::QuorumVoteSend(vote) => Some(vote.view_number()),
HotShotEvent::QuorumVoteSend(vote) | HotShotEvent::ExtendedQuorumVoteSend(vote) => {
Some(vote.view_number())
}
HotShotEvent::DaProposalRecv(proposal, _)
| HotShotEvent::DaProposalValidated(proposal, _)
| HotShotEvent::DaProposalSend(proposal, _) => Some(proposal.data.view_number()),
Expand Down Expand Up @@ -324,6 +328,13 @@ impl<TYPES: NodeType> Display for HotShotEvent<TYPES> {
HotShotEvent::QuorumVoteRecv(v) => {
write!(f, "QuorumVoteRecv(view_number={:?})", v.view_number())
}
HotShotEvent::ExtendedQuorumVoteSend(v) => {
write!(
f,
"ExtendedQuorumVoteSend(view_number={:?})",
v.view_number()
)
}
HotShotEvent::TimeoutVoteRecv(v) => {
write!(f, "TimeoutVoteRecv(view_number={:?})", v.view_number())
}
Expand Down
23 changes: 22 additions & 1 deletion crates/task-impls/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use async_broadcast::{InactiveReceiver, Receiver, SendError, Sender};
use async_lock::RwLock;
use committable::{Commitment, Committable};
use hotshot_task::dependency::{Dependency, EventDependency};
use hotshot_types::utils::epoch_from_block_number;
use hotshot_types::{
consensus::OuterConsensus,
data::{Leaf, QuorumProposal, ViewChangeEvidence},
Expand Down Expand Up @@ -481,9 +482,29 @@ pub async fn validate_proposal_safety_and_liveness<
// Create a positive vote if either liveness or safety check
// passes.

// Liveness check.
{
let consensus_reader = validation_info.consensus.read().await;
// Epoch safety check:
// The proposal is safe if
// 1. the proposed block and the justify QC block belong to the same epoch or
// 2. the justify QC is the eQC for the previous block
let proposal_epoch =
epoch_from_block_number(proposed_leaf.height(), validation_info.epoch_height);
let justify_qc_epoch =
epoch_from_block_number(parent_leaf.height(), validation_info.epoch_height);
ensure!(
proposal_epoch == justify_qc_epoch
|| consensus_reader.check_eqc(&proposed_leaf, &parent_leaf),
{
error!(
"Failed epoch safety check \n Proposed leaf is {:?} \n justify QC leaf is {:?}",
proposed_leaf.clone(),
parent_leaf.clone(),
)
}
);

// Liveness check.
let liveness_check = justify_qc.view_number() > consensus_reader.locked_view();

// Safety check.
Expand Down
20 changes: 15 additions & 5 deletions crates/task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use async_lock::RwLock;
use async_trait::async_trait;
use hotshot_task::task::TaskState;
use hotshot_types::{
consensus::Consensus,
consensus::OuterConsensus,
data::{VidDisperse, VidDisperseShare},
event::{Event, EventType, HotShotAction},
message::{
Expand Down Expand Up @@ -212,7 +212,7 @@ pub struct NetworkEventTaskState<
/// Storage to store actionable events
pub storage: Arc<RwLock<S>>,
/// Shared consensus state
pub consensus: Arc<RwLock<Consensus<TYPES>>>,
pub consensus: OuterConsensus<TYPES>,
/// Lock for a decided upgrade
pub upgrade_lock: UpgradeLock<TYPES, V>,
/// map view number to transmit tasks
Expand Down Expand Up @@ -294,7 +294,7 @@ impl<

let net = Arc::clone(&self.network);
let storage = Arc::clone(&self.storage);
let consensus = Arc::clone(&self.consensus);
let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
spawn(async move {
if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
Some(HotShotAction::VidDisperse),
Expand All @@ -320,7 +320,7 @@ impl<
async fn maybe_record_action(
maybe_action: Option<HotShotAction>,
storage: Arc<RwLock<S>>,
consensus: Arc<RwLock<Consensus<TYPES>>>,
consensus: OuterConsensus<TYPES>,
view: <TYPES as NodeType>::View,
) -> std::result::Result<(), ()> {
if let Some(mut action) = maybe_action {
Expand Down Expand Up @@ -408,6 +408,16 @@ impl<
TransmitType::Direct(leader),
))
}
HotShotEvent::ExtendedQuorumVoteSend(vote) => {
*maybe_action = Some(HotShotAction::Vote);
Some((
vote.signing_key(),
MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
GeneralConsensusMessage::Vote(vote.clone()),
)),
TransmitType::Broadcast,
))
}
HotShotEvent::QuorumProposalRequestSend(req, signature) => Some((
req.key.clone(),
MessageKind::<TYPES>::from_consensus_message(SequencingMessage::General(
Expand Down Expand Up @@ -669,7 +679,7 @@ impl<
.committee_members(view_number, self.epoch);
let network = Arc::clone(&self.network);
let storage = Arc::clone(&self.storage);
let consensus = Arc::clone(&self.consensus);
let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus));
let upgrade_lock = self.upgrade_lock.clone();
let handle = spawn(async move {
if NetworkEventTaskState::<TYPES, V, NET, S>::maybe_record_action(
Expand Down
12 changes: 11 additions & 1 deletion crates/task-impls/src/quorum_vote/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>, V
storage: Arc<RwLock<I::Storage>>,
leaf: Leaf<TYPES>,
vid_share: Proposal<TYPES, VidDisperseShare<TYPES>>,
extended_vote: bool,
) -> Result<()> {
ensure!(
quorum_membership.has_stake(&public_key, epoch_number),
Expand Down Expand Up @@ -317,7 +318,16 @@ pub(crate) async fn submit_vote<TYPES: NodeType, I: NodeImplementation<TYPES>, V
.await
.wrap()
.context(error!("Failed to store VID share"))?;
broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;

if extended_vote {
broadcast_event(
Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)),
&sender,
)
.await;
} else {
broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await;
}

Ok(())
}
7 changes: 7 additions & 0 deletions crates/task-impls/src/quorum_vote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> Handl
)
.await;

let is_vote_leaf_extended = self
.consensus
.read()
.await
.is_leaf_extended(leaf.commit(&self.upgrade_lock).await);
if let Err(e) = submit_vote::<TYPES, I, V>(
self.sender.clone(),
Arc::clone(&self.quorum_membership),
Expand All @@ -233,6 +238,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions> Handl
Arc::clone(&self.storage),
leaf,
vid_share,
is_vote_leaf_extended,
)
.await
{
Expand Down Expand Up @@ -685,6 +691,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> QuorumVoteTaskS
Arc::clone(&self.storage),
proposed_leaf,
updated_vid,
false,
)
.await
{
Expand Down
1 change: 1 addition & 0 deletions crates/task-impls/src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> UpgradeTaskStat
&event,
&tx,
&self.upgrade_lock,
true,
)
.await?;
}
Expand Down
33 changes: 24 additions & 9 deletions crates/task-impls/src/view_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,14 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ViewSyncTaskSta
epoch: self.cur_epoch,
id: self.id,
};
let vote_collector =
create_vote_accumulator(&info, event, &event_stream, self.upgrade_lock.clone())
.await?;
let vote_collector = create_vote_accumulator(
&info,
event,
&event_stream,
self.upgrade_lock.clone(),
true,
)
.await?;

relay_map.insert(relay, vote_collector);
}
Expand Down Expand Up @@ -373,9 +378,14 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ViewSyncTaskSta
id: self.id,
};

let vote_collector =
create_vote_accumulator(&info, event, &event_stream, self.upgrade_lock.clone())
.await?;
let vote_collector = create_vote_accumulator(
&info,
event,
&event_stream,
self.upgrade_lock.clone(),
true,
)
.await?;
relay_map.insert(relay, vote_collector);
}

Expand Down Expand Up @@ -412,9 +422,14 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> ViewSyncTaskSta
epoch: self.cur_epoch,
id: self.id,
};
let vote_collector =
create_vote_accumulator(&info, event, &event_stream, self.upgrade_lock.clone())
.await;
let vote_collector = create_vote_accumulator(
&info,
event,
&event_stream,
self.upgrade_lock.clone(),
true,
)
.await;
if let Ok(vote_task) = vote_collector {
relay_map.insert(relay, vote_task);
}
Expand Down
Loading

0 comments on commit 32e6970

Please sign in to comment.