From 32e697081c870e267c4c4489c15809e25a07bc86 Mon Sep 17 00:00:00 2001 From: lukaszrzasik Date: Wed, 13 Nov 2024 20:50:51 +0100 Subject: [PATCH] Lr/eqc voting (#3851) * Initial commit * Propose the same block at the end of the epoch * No VID and DAC required for the additional last block proposals * Guard against division by 0 * Traverse the leaves to check an eQC * Remove unneeded async in function definition * Remove trace * Make sure proposal is for the same block if justify QC references the last block * Initial commit * Gate epoch proposal logic * Gate epochs voting logic * Create helper method to check if QC is part of 3-chain * Update epoch on `ViewChange` event * Adjust tests * Update view and epoch when QC is formed * Fixes after review * Get rid of nasty nested if-elses * Fix fmt * Update VID that we reuse at the end of the epoch * Fix fmt * Do not create VID and DAC dependencies when voting for the last block * Simplify how we get a header * NetworkEventTaskState uses OuterConsensus * Refactor some of the eQC related methods in `Consensus` * Add back a debugging trace * Last eQC vote is broadcast * Do not check if we are the leader when receiving last votes for eQC * Add more traces * Correctly update epoch when voting and forming QC * Send ViewChange event when voting after shared state's been updated * Adjust tests * Remove an obsolete DA ViewChange code * Fetch proposal if missing when changing view after forming QC * Fix deadlock * Update epoch only when receiving or forming an eQC * update logging (#3844) * [Tech debt] Remove `async-std` (#3845) * tests and CI * remove `async-std` * `fmt` * fix doc build * remove counter tests * remove counter tests * build w/o lockfile * `lock` -> `std` * Revert "`lock` -> `std`" This reverts commit 21ebf054f3dd862a70d6b776f4016647cfd481c4. * lock * `async_broadcast` * overflow * Revert "`async_broadcast`" This reverts commit f03bb57fcc5fcdefe708c79fee75d3ae85ae04f1. * `try_send` * Fix fmt * Simplify code * Move a helper function to types crate * Add epoch safety check * Rename storage method * Clean up the code * Compare to genesis view --------- Co-authored-by: ss-es <155648797+ss-es@users.noreply.github.com> Co-authored-by: rob-maron <132852777+rob-maron@users.noreply.github.com> --- crates/hotshot/src/tasks/mod.rs | 15 ++++----- crates/task-impls/src/consensus/handlers.rs | 19 +++++++---- crates/task-impls/src/da.rs | 1 + crates/task-impls/src/events.rs | 13 +++++++- crates/task-impls/src/helpers.rs | 23 ++++++++++++- crates/task-impls/src/network.rs | 20 ++++++++--- crates/task-impls/src/quorum_vote/handlers.rs | 12 ++++++- crates/task-impls/src/quorum_vote/mod.rs | 7 ++++ crates/task-impls/src/upgrade.rs | 1 + crates/task-impls/src/view_sync.rs | 33 ++++++++++++++----- crates/task-impls/src/vote_collection.rs | 17 +++++++--- .../src/byzantine/byzantine_behaviour.rs | 4 +-- crates/testing/tests/tests_1/network_task.rs | 5 +-- crates/types/src/consensus.rs | 21 ++++++++++-- 14 files changed, 149 insertions(+), 42 deletions(-) diff --git a/crates/hotshot/src/tasks/mod.rs b/crates/hotshot/src/tasks/mod.rs index 037e72f227..6ddd00b252 100644 --- a/crates/hotshot/src/tasks/mod.rs +++ b/crates/hotshot/src/tasks/mod.rs @@ -10,6 +10,11 @@ pub mod task_state; use std::{collections::BTreeMap, fmt::Debug, num::NonZeroUsize, sync::Arc, time::Duration}; +use crate::{ + tasks::task_state::CreateTaskState, types::SystemContextHandle, ConsensusApi, + ConsensusMetricsValue, ConsensusTaskRegistry, HotShotConfig, HotShotInitializer, + MarketplaceConfig, Memberships, NetworkTaskRegistry, SignatureKey, SystemContext, Versions, +}; use async_broadcast::{broadcast, RecvError}; use async_lock::RwLock; use async_trait::async_trait; @@ -32,7 +37,7 @@ use hotshot_task_impls::{ view_sync::ViewSyncTaskState, }; use hotshot_types::{ - consensus::Consensus, + consensus::{Consensus, OuterConsensus}, constants::EVENT_CHANNEL_SIZE, message::{Message, UpgradeLock}, traits::{ @@ -43,12 +48,6 @@ use hotshot_types::{ use tokio::{spawn, time::sleep}; use vbs::version::StaticVersionType; -use crate::{ - tasks::task_state::CreateTaskState, types::SystemContextHandle, ConsensusApi, - ConsensusMetricsValue, ConsensusTaskRegistry, HotShotConfig, HotShotInitializer, - MarketplaceConfig, Memberships, NetworkTaskRegistry, SignatureKey, SystemContext, Versions, -}; - /// event for global event stream #[derive(Clone, Debug)] pub enum GlobalEvent { @@ -200,7 +199,7 @@ pub fn add_network_event_task< quorum_membership, da_membership, storage: Arc::clone(&handle.storage()), - consensus: Arc::clone(&handle.consensus()), + consensus: OuterConsensus::new(handle.consensus()), upgrade_lock: handle.hotshot.upgrade_lock.clone(), transmit_tasks: BTreeMap::new(), }; diff --git a/crates/task-impls/src/consensus/handlers.rs b/crates/task-impls/src/consensus/handlers.rs index a148ec28a0..cfc399bd56 100644 --- a/crates/task-impls/src/consensus/handlers.rs +++ b/crates/task-impls/src/consensus/handlers.rs @@ -38,14 +38,19 @@ pub(crate) async fn handle_quorum_vote_recv< sender: &Sender>>, task_state: &mut ConsensusTaskState, ) -> Result<()> { - // Are we the leader for this view? + let is_vote_leaf_extended = task_state + .consensus + .read() + .await + .is_leaf_extended(vote.data.leaf_commit); + let we_are_leader = task_state + .quorum_membership + .leader(vote.view_number() + 1, task_state.cur_epoch)? + == task_state.public_key; ensure!( - task_state - .quorum_membership - .leader(vote.view_number() + 1, task_state.cur_epoch)? - == task_state.public_key, + is_vote_leaf_extended || we_are_leader, info!( - "We are not the leader for view {:?}", + "We are not the leader for view {:?} and this is not the last vote for eQC", vote.view_number() + 1 ) ); @@ -60,6 +65,7 @@ pub(crate) async fn handle_quorum_vote_recv< &event, sender, &task_state.upgrade_lock, + !is_vote_leaf_extended, ) .await?; @@ -99,6 +105,7 @@ pub(crate) async fn handle_timeout_vote_recv< &event, sender, &task_state.upgrade_lock, + true, ) .await?; diff --git a/crates/task-impls/src/da.rs b/crates/task-impls/src/da.rs index d0182460f2..edaa5123e5 100644 --- a/crates/task-impls/src/da.rs +++ b/crates/task-impls/src/da.rs @@ -282,6 +282,7 @@ impl, V: Versions> DaTaskState { QuorumProposalSend(Proposal>, TYPES::SignatureKey), /// Send a quorum vote to the next leader; emitted by a replica in the consensus task after seeing a valid quorum proposal QuorumVoteSend(QuorumVote), + /// Broadcast a quorum vote to form an eQC; emitted by a replica in the consensus task after seeing a valid quorum proposal + ExtendedQuorumVoteSend(QuorumVote), /// A quorum proposal with the given parent leaf is validated. /// The full validation checks include: /// 1. The proposal is not for an old view @@ -253,7 +255,9 @@ impl HotShotEvent { | HotShotEvent::QuorumProposalPreliminarilyValidated(proposal) => { Some(proposal.data.view_number()) } - HotShotEvent::QuorumVoteSend(vote) => Some(vote.view_number()), + HotShotEvent::QuorumVoteSend(vote) | HotShotEvent::ExtendedQuorumVoteSend(vote) => { + Some(vote.view_number()) + } HotShotEvent::DaProposalRecv(proposal, _) | HotShotEvent::DaProposalValidated(proposal, _) | HotShotEvent::DaProposalSend(proposal, _) => Some(proposal.data.view_number()), @@ -324,6 +328,13 @@ impl Display for HotShotEvent { HotShotEvent::QuorumVoteRecv(v) => { write!(f, "QuorumVoteRecv(view_number={:?})", v.view_number()) } + HotShotEvent::ExtendedQuorumVoteSend(v) => { + write!( + f, + "ExtendedQuorumVoteSend(view_number={:?})", + v.view_number() + ) + } HotShotEvent::TimeoutVoteRecv(v) => { write!(f, "TimeoutVoteRecv(view_number={:?})", v.view_number()) } diff --git a/crates/task-impls/src/helpers.rs b/crates/task-impls/src/helpers.rs index 5c91ca9adf..cce95b6be3 100644 --- a/crates/task-impls/src/helpers.rs +++ b/crates/task-impls/src/helpers.rs @@ -13,6 +13,7 @@ use async_broadcast::{InactiveReceiver, Receiver, SendError, Sender}; use async_lock::RwLock; use committable::{Commitment, Committable}; use hotshot_task::dependency::{Dependency, EventDependency}; +use hotshot_types::utils::epoch_from_block_number; use hotshot_types::{ consensus::OuterConsensus, data::{Leaf, QuorumProposal, ViewChangeEvidence}, @@ -481,9 +482,29 @@ pub async fn validate_proposal_safety_and_liveness< // Create a positive vote if either liveness or safety check // passes. - // Liveness check. { let consensus_reader = validation_info.consensus.read().await; + // Epoch safety check: + // The proposal is safe if + // 1. the proposed block and the justify QC block belong to the same epoch or + // 2. the justify QC is the eQC for the previous block + let proposal_epoch = + epoch_from_block_number(proposed_leaf.height(), validation_info.epoch_height); + let justify_qc_epoch = + epoch_from_block_number(parent_leaf.height(), validation_info.epoch_height); + ensure!( + proposal_epoch == justify_qc_epoch + || consensus_reader.check_eqc(&proposed_leaf, &parent_leaf), + { + error!( + "Failed epoch safety check \n Proposed leaf is {:?} \n justify QC leaf is {:?}", + proposed_leaf.clone(), + parent_leaf.clone(), + ) + } + ); + + // Liveness check. let liveness_check = justify_qc.view_number() > consensus_reader.locked_view(); // Safety check. diff --git a/crates/task-impls/src/network.rs b/crates/task-impls/src/network.rs index 42c8a19748..d623962424 100644 --- a/crates/task-impls/src/network.rs +++ b/crates/task-impls/src/network.rs @@ -15,7 +15,7 @@ use async_lock::RwLock; use async_trait::async_trait; use hotshot_task::task::TaskState; use hotshot_types::{ - consensus::Consensus, + consensus::OuterConsensus, data::{VidDisperse, VidDisperseShare}, event::{Event, EventType, HotShotAction}, message::{ @@ -212,7 +212,7 @@ pub struct NetworkEventTaskState< /// Storage to store actionable events pub storage: Arc>, /// Shared consensus state - pub consensus: Arc>>, + pub consensus: OuterConsensus, /// Lock for a decided upgrade pub upgrade_lock: UpgradeLock, /// map view number to transmit tasks @@ -294,7 +294,7 @@ impl< let net = Arc::clone(&self.network); let storage = Arc::clone(&self.storage); - let consensus = Arc::clone(&self.consensus); + let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)); spawn(async move { if NetworkEventTaskState::::maybe_record_action( Some(HotShotAction::VidDisperse), @@ -320,7 +320,7 @@ impl< async fn maybe_record_action( maybe_action: Option, storage: Arc>, - consensus: Arc>>, + consensus: OuterConsensus, view: ::View, ) -> std::result::Result<(), ()> { if let Some(mut action) = maybe_action { @@ -408,6 +408,16 @@ impl< TransmitType::Direct(leader), )) } + HotShotEvent::ExtendedQuorumVoteSend(vote) => { + *maybe_action = Some(HotShotAction::Vote); + Some(( + vote.signing_key(), + MessageKind::::from_consensus_message(SequencingMessage::General( + GeneralConsensusMessage::Vote(vote.clone()), + )), + TransmitType::Broadcast, + )) + } HotShotEvent::QuorumProposalRequestSend(req, signature) => Some(( req.key.clone(), MessageKind::::from_consensus_message(SequencingMessage::General( @@ -669,7 +679,7 @@ impl< .committee_members(view_number, self.epoch); let network = Arc::clone(&self.network); let storage = Arc::clone(&self.storage); - let consensus = Arc::clone(&self.consensus); + let consensus = OuterConsensus::new(Arc::clone(&self.consensus.inner_consensus)); let upgrade_lock = self.upgrade_lock.clone(); let handle = spawn(async move { if NetworkEventTaskState::::maybe_record_action( diff --git a/crates/task-impls/src/quorum_vote/handlers.rs b/crates/task-impls/src/quorum_vote/handlers.rs index 2f78773f88..d4b9edc6c7 100644 --- a/crates/task-impls/src/quorum_vote/handlers.rs +++ b/crates/task-impls/src/quorum_vote/handlers.rs @@ -283,6 +283,7 @@ pub(crate) async fn submit_vote, V storage: Arc>, leaf: Leaf, vid_share: Proposal>, + extended_vote: bool, ) -> Result<()> { ensure!( quorum_membership.has_stake(&public_key, epoch_number), @@ -317,7 +318,16 @@ pub(crate) async fn submit_vote, V .await .wrap() .context(error!("Failed to store VID share"))?; - broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await; + + if extended_vote { + broadcast_event( + Arc::new(HotShotEvent::ExtendedQuorumVoteSend(vote)), + &sender, + ) + .await; + } else { + broadcast_event(Arc::new(HotShotEvent::QuorumVoteSend(vote)), &sender).await; + } Ok(()) } diff --git a/crates/task-impls/src/quorum_vote/mod.rs b/crates/task-impls/src/quorum_vote/mod.rs index 053ef851f9..6ac5fe37dd 100644 --- a/crates/task-impls/src/quorum_vote/mod.rs +++ b/crates/task-impls/src/quorum_vote/mod.rs @@ -222,6 +222,11 @@ impl + 'static, V: Versions> Handl ) .await; + let is_vote_leaf_extended = self + .consensus + .read() + .await + .is_leaf_extended(leaf.commit(&self.upgrade_lock).await); if let Err(e) = submit_vote::( self.sender.clone(), Arc::clone(&self.quorum_membership), @@ -233,6 +238,7 @@ impl + 'static, V: Versions> Handl Arc::clone(&self.storage), leaf, vid_share, + is_vote_leaf_extended, ) .await { @@ -685,6 +691,7 @@ impl, V: Versions> QuorumVoteTaskS Arc::clone(&self.storage), proposed_leaf, updated_vid, + false, ) .await { diff --git a/crates/task-impls/src/upgrade.rs b/crates/task-impls/src/upgrade.rs index 71cd0a7ef0..c5e9735701 100644 --- a/crates/task-impls/src/upgrade.rs +++ b/crates/task-impls/src/upgrade.rs @@ -246,6 +246,7 @@ impl, V: Versions> UpgradeTaskStat &event, &tx, &self.upgrade_lock, + true, ) .await?; } diff --git a/crates/task-impls/src/view_sync.rs b/crates/task-impls/src/view_sync.rs index 4b661a660a..9e8b852789 100644 --- a/crates/task-impls/src/view_sync.rs +++ b/crates/task-impls/src/view_sync.rs @@ -332,9 +332,14 @@ impl, V: Versions> ViewSyncTaskSta epoch: self.cur_epoch, id: self.id, }; - let vote_collector = - create_vote_accumulator(&info, event, &event_stream, self.upgrade_lock.clone()) - .await?; + let vote_collector = create_vote_accumulator( + &info, + event, + &event_stream, + self.upgrade_lock.clone(), + true, + ) + .await?; relay_map.insert(relay, vote_collector); } @@ -373,9 +378,14 @@ impl, V: Versions> ViewSyncTaskSta id: self.id, }; - let vote_collector = - create_vote_accumulator(&info, event, &event_stream, self.upgrade_lock.clone()) - .await?; + let vote_collector = create_vote_accumulator( + &info, + event, + &event_stream, + self.upgrade_lock.clone(), + true, + ) + .await?; relay_map.insert(relay, vote_collector); } @@ -412,9 +422,14 @@ impl, V: Versions> ViewSyncTaskSta epoch: self.cur_epoch, id: self.id, }; - let vote_collector = - create_vote_accumulator(&info, event, &event_stream, self.upgrade_lock.clone()) - .await; + let vote_collector = create_vote_accumulator( + &info, + event, + &event_stream, + self.upgrade_lock.clone(), + true, + ) + .await; if let Ok(vote_task) = vote_collector { relay_map.insert(relay, vote_task); } diff --git a/crates/task-impls/src/vote_collection.rs b/crates/task-impls/src/vote_collection.rs index 4c685ca978..c9266ae808 100644 --- a/crates/task-impls/src/vote_collection.rs +++ b/crates/task-impls/src/vote_collection.rs @@ -62,6 +62,9 @@ pub struct VoteCollectionTaskState< /// Node id pub id: u64, + + /// Whether we should check if we are the leader when handling a vote + pub check_if_leader: bool, } /// Describes the functions a vote must implement for it to be aggregatable by the generic vote collection task @@ -103,10 +106,12 @@ impl< vote: &VOTE, event_stream: &Sender>>, ) -> Result> { - ensure!( - vote.leader(&self.membership, self.epoch)? == self.public_key, - info!("Received vote for a view in which we were not the leader.") - ); + if self.check_if_leader { + ensure!( + vote.leader(&self.membership, self.epoch)? == self.public_key, + info!("Received vote for a view in which we were not the leader.") + ); + } ensure!( vote.view_number() == self.view, error!( @@ -189,6 +194,7 @@ pub async fn create_vote_accumulator( event: Arc>, sender: &Sender>>, upgrade_lock: UpgradeLock, + check_if_leader: bool, ) -> Result> where TYPES: NodeType, @@ -219,6 +225,7 @@ where view: info.view, epoch: info.epoch, id: info.id, + check_if_leader, }; state.handle_vote_event(Arc::clone(&event), sender).await?; @@ -246,6 +253,7 @@ pub async fn handle_vote< event: &Arc>, event_stream: &Sender>>, upgrade_lock: &UpgradeLock, + check_if_leader: bool, ) -> Result<()> where VoteCollectionTaskState: HandleVoteEvent, @@ -265,6 +273,7 @@ where Arc::clone(event), event_stream, upgrade_lock.clone(), + check_if_leader, ) .await?; diff --git a/crates/testing/src/byzantine/byzantine_behaviour.rs b/crates/testing/src/byzantine/byzantine_behaviour.rs index e21580a3a5..1ab38f1f29 100644 --- a/crates/testing/src/byzantine/byzantine_behaviour.rs +++ b/crates/testing/src/byzantine/byzantine_behaviour.rs @@ -18,7 +18,7 @@ use hotshot_task_impls::{ }, }; use hotshot_types::{ - consensus::Consensus, + consensus::{Consensus, OuterConsensus}, data::QuorumProposal, message::{Proposal, UpgradeLock}, simple_vote::QuorumVote, @@ -349,7 +349,7 @@ impl + std::fmt::Debug, V: Version quorum_membership, da_membership, storage: Arc::clone(&handle.storage()), - consensus: Arc::clone(&handle.consensus()), + consensus: OuterConsensus::new(handle.consensus()), upgrade_lock: handle.hotshot.upgrade_lock.clone(), transmit_tasks: BTreeMap::new(), }; diff --git a/crates/testing/tests/tests_1/network_task.rs b/crates/testing/tests/tests_1/network_task.rs index 540baaf447..9a539f89e4 100644 --- a/crates/testing/tests/tests_1/network_task.rs +++ b/crates/testing/tests/tests_1/network_task.rs @@ -17,6 +17,7 @@ use hotshot_testing::{ test_task::add_network_message_test_task, view_generator::TestViewGenerator, }; use hotshot_types::{ + consensus::OuterConsensus, data::{EpochNumber, ViewNumber}, message::UpgradeLock, traits::{ @@ -51,7 +52,7 @@ async fn test_network_task() { let network = (launcher.resource_generator.channel_generator)(node_id).await; let storage = Arc::new(RwLock::new((launcher.resource_generator.storage)(node_id))); - let consensus = handle.hotshot.consensus(); + let consensus = OuterConsensus::new(handle.hotshot.consensus()); let config = launcher.resource_generator.config.clone(); let validator_config = launcher.resource_generator.validator_config.clone(); let public_key = validator_config.public_key; @@ -221,7 +222,7 @@ async fn test_network_storage_fail() { let network = (launcher.resource_generator.channel_generator)(node_id).await; - let consensus = handle.hotshot.consensus(); + let consensus = OuterConsensus::new(handle.hotshot.consensus()); let storage = Arc::new(RwLock::new((launcher.resource_generator.storage)(node_id))); storage.write().await.should_return_err = true; let config = launcher.resource_generator.config.clone(); diff --git a/crates/types/src/consensus.rs b/crates/types/src/consensus.rs index 77161111ae..eb8fe4ade2 100644 --- a/crates/types/src/consensus.rs +++ b/crates/types/src/consensus.rs @@ -14,7 +14,7 @@ use std::{ }; use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard}; -use committable::Commitment; +use committable::{Commitment, Committable}; use tracing::instrument; use utils::anytrace::*; use vec1::Vec1; @@ -33,7 +33,9 @@ use crate::{ signature_key::SignatureKey, BlockPayload, ValidatedState, }, - utils::{BuilderCommitment, LeafCommitment, StateAndDelta, Terminator}, + utils::{ + epoch_from_block_number, BuilderCommitment, LeafCommitment, StateAndDelta, Terminator, + }, vid::VidCommitment, vote::HasViewNumber, }; @@ -905,11 +907,12 @@ impl Consensus { /// consecutive views for the last block in the epoch. pub fn is_leaf_extended(&self, leaf_commit: LeafCommitment) -> bool { if !self.is_leaf_for_last_block(leaf_commit) { - tracing::debug!("The given leaf is not for the last block in the epoch."); + tracing::trace!("The given leaf is not for the last block in the epoch."); return false; } let Some(leaf) = self.saved_leaves.get(&leaf_commit) else { + tracing::trace!("We don't have a leaf corresponding to the leaf commit"); return false; }; let leaf_view = leaf.view_number(); @@ -957,6 +960,7 @@ impl Consensus { /// Returns true if a given leaf is for the last block in the epoch pub fn is_leaf_for_last_block(&self, leaf_commit: LeafCommitment) -> bool { let Some(leaf) = self.saved_leaves.get(&leaf_commit) else { + tracing::trace!("We don't have a leaf corresponding to the leaf commit"); return false; }; let block_height = leaf.height(); @@ -966,6 +970,17 @@ impl Consensus { block_height % self.epoch_height == 0 } } + + /// Returns true if the `parent_leaf` formed an eQC for the previous epoch to the `proposed_leaf` + pub fn check_eqc(&self, proposed_leaf: &Leaf, parent_leaf: &Leaf) -> bool { + if parent_leaf.view_number() == TYPES::View::genesis() { + return true; + } + let new_epoch = epoch_from_block_number(proposed_leaf.height(), self.epoch_height); + let old_epoch = epoch_from_block_number(parent_leaf.height(), self.epoch_height); + let parent_leaf_commit = as Committable>::commit(parent_leaf); + new_epoch - 1 == old_epoch && self.is_leaf_extended(parent_leaf_commit) + } } /// Alias for the block payload commitment and the associated metadata. The primary data