diff --git a/crates/orchestrator/src/benchmark.rs b/crates/orchestrator/src/benchmark.rs index f8687f4..b9ec6b3 100644 --- a/crates/orchestrator/src/benchmark.rs +++ b/crates/orchestrator/src/benchmark.rs @@ -38,7 +38,8 @@ pub struct BenchmarkParametersGeneric { /// paying for data sent between the nodes. pub use_internal_ip_address: bool, // Consensus protocol to deploy - // (starfish | starfish-speed | starfish-bls | mysticeti | cordial-miners) + // (starfish | starfish-speed | starfish-bls | mysticeti | + // cordial-miners | bluestreak) pub consensus_protocol: String, /// number Byzantine nodes pub byzantine_nodes: usize, diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 4a5ae98..8c5b581 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -115,7 +115,7 @@ pub enum Operation { /// Consensus to deploy. Available options: /// starfish | starfish-speed | starfish-bls | mysticeti | - /// cordial-miners + /// cordial-miners | bluestreak #[clap(long, value_name = "STRING", default_value = "starfish", global = true)] consensus: String, diff --git a/crates/starfish-core/src/broadcaster.rs b/crates/starfish-core/src/broadcaster.rs index d772410..b9cd166 100644 --- a/crates/starfish-core/src/broadcaster.rs +++ b/crates/starfish-core/src/broadcaster.rs @@ -85,7 +85,8 @@ impl BroadcasterParameters { ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed | ConsensusProtocol::StarfishBls - | ConsensusProtocol::CordialMiners => Self { + | ConsensusProtocol::CordialMiners + | ConsensusProtocol::Bluestreak => Self { batch_own_block_size: committee_size, batch_other_block_size: committee_size * committee_size, batch_shard_size: committee_size * committee_size, @@ -417,7 +418,8 @@ where } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { let all_blocks = self.inner.dag_state.get_storage_blocks(&block_references); let mut blocks = Vec::new(); @@ -821,7 +823,8 @@ fn push_transport_format(consensus_protocol: ConsensusProtocol) -> PushOtherBloc | ConsensusProtocol::StarfishBls => PushOtherBlocksFormat::HeadersAndShards, ConsensusProtocol::CordialMiners | ConsensusProtocol::Mysticeti - | ConsensusProtocol::SailfishPlusPlus => PushOtherBlocksFormat::FullBlocks, + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => PushOtherBlocksFormat::FullBlocks, } } diff --git a/crates/starfish-core/src/consensus/linearizer.rs b/crates/starfish-core/src/consensus/linearizer.rs index 1c7c40f..fddc58b 100644 --- a/crates/starfish-core/src/consensus/linearizer.rs +++ b/crates/starfish-core/src/consensus/linearizer.rs @@ -100,6 +100,7 @@ impl Linearizer { let mut to_commit = Vec::new(); let leader_block_ref = *leader_block.reference(); let min_round = leader_block_ref.round.saturating_sub(MAX_TRAVERSAL_DEPTH); + let follow_unprovable = dag_state.consensus_protocol == ConsensusProtocol::Bluestreak; assert!(self.committed.insert(leader_block_ref)); let mut current_level = vec![leader_block]; @@ -115,6 +116,13 @@ impl Linearizer { next_refs.push(*reference); } } + if follow_unprovable { + if let Some(cert_ref) = x.unprovable_certificate() { + if cert_ref.round >= min_round && self.committed.insert(*cert_ref) { + next_refs.push(*cert_ref); + } + } + } } if next_refs.is_empty() { break; @@ -259,7 +267,8 @@ impl Linearizer { } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { self.collect_subdag_ancestors(dag_state, leader_block) } }; diff --git a/crates/starfish-core/src/consensus/universal_committer.rs b/crates/starfish-core/src/consensus/universal_committer.rs index bf4568f..636365a 100644 --- a/crates/starfish-core/src/consensus/universal_committer.rs +++ b/crates/starfish-core/src/consensus/universal_committer.rs @@ -43,6 +43,9 @@ impl UniversalCommitter { if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus { return self.try_commit_sailfish(last_decided); } + if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { + return self.try_commit_bluestreak(last_decided); + } let highest_known_round = self.dag_state.highest_round(); let last_decided_round = last_decided.round(); @@ -180,6 +183,133 @@ impl UniversalCommitter { .collect() } + fn try_commit_bluestreak(&mut self, last_decided: BlockReference) -> Vec { + let highest_known_round = self.dag_state.highest_round(); + let last_decided_round = last_decided.round(); + let highest_anchor = highest_known_round.saturating_sub(2); + let mut committed = Vec::new(); + let mut newly_committed = AHashSet::new(); + + for round in last_decided_round + 1..=highest_anchor { + let leader = self.committee.elect_leader(round); + + if self.check_direct_skip_bluestreak(leader, round) { + let key = (leader, round); + if !self.decided.contains_key(&key) { + let status = LeaderStatus::Skip(leader, round); + self.decided.insert(key, status.clone()); + if self.metrics_emitted.insert(key) { + tracing::debug!("Decided {status}"); + self.update_metrics(&status, true); + } + committed.push(status); + } + continue; + } + + let Some(anchor) = self.try_direct_commit_bluestreak(leader, round) else { + continue; + }; + + let mut chain = vec![anchor.clone()]; + let mut current = anchor; + for prev_round in (last_decided_round + 1..current.round()).rev() { + let prev_leader = self.committee.elect_leader(prev_round); + let prev_key = (prev_leader, prev_round); + if newly_committed.contains(&prev_key) { + continue; + } + + let mut linked_leaders: Vec<_> = self + .dag_state + .get_blocks_at_authority_round(prev_leader, prev_round) + .into_iter() + .filter(|block| self.dag_state.has_vertex_certificate(block.reference())) + .filter(|block| self.dag_state.linked(¤t, block)) + .collect(); + + if linked_leaders.len() > 1 { + panic!( + "[Bluestreak] More than one linked leader for {}", + format_authority_round(prev_leader, prev_round) + ); + } + + if let Some(prev) = linked_leaders.pop() { + current = prev.clone(); + chain.push(prev); + } + } + + chain.reverse(); + for leader_block in chain { + let key = (leader_block.authority(), leader_block.round()); + if !newly_committed.insert(key) { + continue; + } + let direct_decide = key.1 == round; + let status = LeaderStatus::Commit(leader_block, None); + self.decided.insert(key, status.clone()); + if self.metrics_emitted.insert(key) { + tracing::debug!("Decided {status}"); + self.update_metrics(&status, direct_decide); + } + committed.push(status); + } + } + + committed.sort(); + committed + } + + fn try_direct_commit_bluestreak( + &self, + leader: AuthorityIndex, + leader_round: RoundNumber, + ) -> Option> { + let cert_round = leader_round + 2; + let leader_blocks = self + .dag_state + .get_blocks_at_authority_round(leader, leader_round); + + for leader_block in leader_blocks { + if !self + .dag_state + .has_vertex_certificate(leader_block.reference()) + { + continue; + } + let leader_ref = *leader_block.reference(); + let cert_blocks = self.dag_state.get_blocks_by_round_cached(cert_round); + let mut supporters = StakeAggregator::::new(); + for block in cert_blocks.iter() { + if block.unprovable_certificate() == Some(&leader_ref) + && supporters.add(block.authority(), &self.committee) + { + return Some(leader_block); + } + } + } + + None + } + + fn check_direct_skip_bluestreak(&self, leader: AuthorityIndex, round: RoundNumber) -> bool { + let vote_round = round + 1; + let vote_blocks = self.dag_state.get_blocks_by_round_cached(vote_round); + let mut non_voters = StakeAggregator::::new(); + for block in vote_blocks.iter() { + let votes_for = block + .block_references() + .iter() + .any(|r| r.round == round && r.authority == leader); + if !votes_for && non_voters.add(block.authority(), &self.committee) { + return true; + } + } + false + } + fn try_commit_sailfish(&mut self, last_decided: BlockReference) -> Vec { let highest_known_round = self.dag_state.highest_round(); let last_decided_round = last_decided.round(); @@ -432,6 +562,13 @@ impl UniversalCommitterBuilder { wave_length: WAVE_LENGTH, pipeline: false, }, + ConsensusProtocol::Bluestreak => Self { + committee, + dag_state, + metrics, + wave_length: WAVE_LENGTH, + pipeline: true, + }, ConsensusProtocol::CordialMiners => Self { committee, dag_state, @@ -566,4 +703,99 @@ mod tests { "expected round-1 leader to commit with f+1 certified supporters in round 2" ); } + + #[test] + fn bluestreak_direct_commit_uses_unprovable_certificates() { + let dag_state = open_test_dag_state_for("bluestreak", 0); + let committee = Committee::new_for_benchmarks(4); + let registry = Registry::new(); + let (metrics, _reporter) = Metrics::new( + ®istry, + Some(committee.as_ref()), + Some("bluestreak"), + None, + ); + + let leader = make_full_block(1, 1, vec![BlockReference::new_test(1, 0)]); + let leader_ref = *leader.reference(); + let vote_a = make_full_block(0, 2, vec![leader_ref]); + let vote_b = make_full_block(2, 2, vec![leader_ref]); + let vote_c = make_full_block(3, 2, vec![leader_ref]); + + let empty_transactions = Vec::new(); + let commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); + + let mut cert_a = crate::types::VerifiedBlock::new_with_unprovable( + 0, + 3, + vec![*vote_a.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + Some(leader_ref), + ); + cert_a.preserialize(); + let cert_a = Data::new(cert_a); + + let mut cert_b = crate::types::VerifiedBlock::new_with_unprovable( + 2, + 3, + vec![*vote_b.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + Some(leader_ref), + ); + cert_b.preserialize(); + let cert_b = Data::new(cert_b); + + let mut cert_c = crate::types::VerifiedBlock::new_with_unprovable( + 3, + 3, + vec![*vote_c.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions, + commitment, + None, + None, + None, + Some(leader_ref), + ); + cert_c.preserialize(); + let cert_c = Data::new(cert_c); + + dag_state.insert_general_block(leader, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(vote_a, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(vote_b, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(vote_c, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(cert_a, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(cert_b, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(cert_c, DataSource::BlockBundleStreaming); + + let mut committer = UniversalCommitterBuilder::new(committee, dag_state, metrics).build(); + let decided = committer.try_commit(BlockReference::new_test(0, 0)); + + assert!( + decided.iter().any(|status| { + matches!( + status, + LeaderStatus::Commit(block, None) + if block.authority() == 1 && block.round() == 1 + ) + }), + "expected round-1 leader to commit from round-3 unprovable certificates" + ); + } } diff --git a/crates/starfish-core/src/core.rs b/crates/starfish-core/src/core.rs index 8834c4d..22af544 100644 --- a/crates/starfish-core/src/core.rs +++ b/crates/starfish-core/src/core.rs @@ -13,7 +13,7 @@ use crate::{ block_handler::BlockHandler, block_manager::BlockManager, bls_certificate_aggregator::{BlsCertificateAggregator, apply_certificate_events}, - committee::Committee, + committee::{Committee, QuorumThreshold, StakeAggregator, ValidityThreshold}, config::NodePrivateConfig, consensus::{ CommitMetastate, @@ -480,7 +480,7 @@ impl Core { } // SailfishPlusPlus: require certified parent quorum before creating a block. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + if self.dag_state.consensus_protocol.uses_dual_dag() && clock_round > 1 && !self.dag_state.certified_parent_quorum(clock_round - 1) { @@ -523,9 +523,13 @@ impl Core { // set below threshold-clock quorum, we cannot build a valid block yet. // Requeue both transactions and include refs so the next attempt sees // them again. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + let allow_own_prev_only = self.dag_state.consensus_protocol + == ConsensusProtocol::Bluestreak + && self.committee.elect_leader(clock_round.saturating_sub(1)) == self.authority; + if self.dag_state.consensus_protocol.uses_dual_dag() && clock_round > 1 && block_references.is_empty() + && !allow_own_prev_only { for r in raw_refs { self.pending.push(MetaTransaction::Include(r)); @@ -688,15 +692,19 @@ impl Core { self.compress_pending_block_references(&pending_refs, block_round); // SailfishPlusPlus: filter parents to only include certified blocks. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus { + if self.dag_state.consensus_protocol.uses_dual_dag() { block_references.retain(|r| r.round == 0 || self.dag_state.has_vertex_certificate(r)); } // SailfishPlusPlus: verify the filtered parent set, together with the // creator's own previous block (always included by build_block), still // has quorum stake at round-1. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + let compress_non_leader = self.dag_state.consensus_protocol + == ConsensusProtocol::Bluestreak + && self.committee.elect_leader(block_round) != self.authority; + if self.dag_state.consensus_protocol.uses_dual_dag() && block_round > 1 + && !compress_non_leader { let prev_round = block_round - 1; let mut prev_round_stake: u64 = 0; @@ -746,7 +754,8 @@ impl Core { )), ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => None, + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => None, } } @@ -912,8 +921,17 @@ impl Core { } else { None }; + let unprovable_certificate = if self.dag_state.consensus_protocol + == ConsensusProtocol::Bluestreak + && self.committee.elect_leader(clock_round) != self.authority + && clock_round >= 3 + { + self.compute_unprovable_certificate(clock_round, &block_references) + } else { + None + }; - let mut block = VerifiedBlock::new_with_signer( + let mut block = VerifiedBlock::new_with_signer_and_unprovable( self.authority, clock_round, block_references, @@ -933,6 +951,7 @@ impl Core { precomputed_round_sig, precomputed_leader_sig, sailfish_fields, + unprovable_certificate, ); self.metrics @@ -1024,6 +1043,46 @@ impl Core { true } + fn compute_unprovable_certificate( + &self, + clock_round: RoundNumber, + _block_references: &[BlockReference], + ) -> Option { + let leader_round = clock_round.checked_sub(2)?; + let leader = self.committee.elect_leader(leader_round); + let leader_block = self + .dag_state + .get_blocks_at_authority_round(leader, leader_round) + .into_iter() + .next()?; + let leader_ref = *leader_block.reference(); + + let vote_round = clock_round.checked_sub(1)?; + let vote_blocks = self.dag_state.get_blocks_by_round_cached(vote_round); + let mut vote_stake = StakeAggregator::::new(); + for block in vote_blocks.iter() { + if block.block_references().iter().any(|r| *r == leader_ref) { + vote_stake.add(block.authority(), &self.committee); + } + } + if vote_stake.is_quorum(&self.committee) { + return Some(leader_ref); + } + + let current_round_blocks = self.dag_state.get_blocks_by_round_cached(clock_round); + let mut propagation_stake = StakeAggregator::::new(); + for block in current_round_blocks.iter() { + if block.unprovable_certificate() == Some(&leader_ref) { + propagation_stake.add(block.authority(), &self.committee); + } + } + if propagation_stake.get_stake() >= self.committee.validity_threshold() { + return Some(leader_ref); + } + + None + } + fn prepare_last_blocks(&mut self) { let target = match self.dag_state.byzantine_strategy { Some( @@ -1059,6 +1118,30 @@ impl Core { pending_refs: &[BlockReference], block_round: RoundNumber, ) -> Vec { + if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { + let is_leader = self.committee.elect_leader(block_round) == self.authority; + if !is_leader { + let prev_round = block_round.saturating_sub(1); + let leader = self.committee.elect_leader(prev_round); + return pending_refs + .iter() + .copied() + .filter(|r| r.authority == leader && r.round == prev_round) + .take(1) + .collect(); + } + + let prev_round = block_round.saturating_sub(1); + let mut seen = AHashSet::new(); + return pending_refs + .iter() + .copied() + .filter(|r| { + r.authority != self.authority && seen.insert(*r) && r.round >= prev_round + }) + .collect(); + } + // SailfishPlusPlus: keep all previous-round references unconditionally // so that certified-parent filtering doesn't drop below quorum. // Only older-round references go through normal compression. @@ -1073,6 +1156,32 @@ impl Core { }) .collect(); } + // Bluestreak: non-leaders include only the leader at r-1 + // (own_prev is always prepended by build_block). Leaders keep the + // full clean-DAG frontier for the quorum requirement. + if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { + let is_leader = self.committee.elect_leader(block_round) == self.authority; + if !is_leader { + let prev_round = block_round.saturating_sub(1); + let leader = self.committee.elect_leader(prev_round); + return pending_refs + .iter() + .copied() + .filter(|r| r.authority == leader && r.round == prev_round) + .take(1) + .collect(); + } + // Leaders: keep all previous-round refs (same as SailfishPP). + let prev_round = block_round.saturating_sub(1); + let mut seen = AHashSet::new(); + return pending_refs + .iter() + .copied() + .filter(|r| { + r.authority != self.authority && seen.insert(*r) && r.round >= prev_round + }) + .collect(); + } if self.dag_state.consensus_protocol == ConsensusProtocol::StarfishBls { if self.committee.elect_leader(block_round) != self.authority { return Vec::new(); @@ -1300,7 +1409,7 @@ impl Core { } fn flush_pending_sailfish_certified_refs(&self) { - if self.dag_state.consensus_protocol != ConsensusProtocol::SailfishPlusPlus { + if !self.dag_state.consensus_protocol.uses_dual_dag() { return; } self.dag_state.flush_pending_sailfish_certified_refs(); diff --git a/crates/starfish-core/src/crypto.rs b/crates/starfish-core/src/crypto.rs index a1583c5..f5ca401 100644 --- a/crates/starfish-core/src/crypto.rs +++ b/crates/starfish-core/src/crypto.rs @@ -184,6 +184,30 @@ impl BlockDigest { signature: &SignatureBytes, merkle_root: TransactionsCommitment, strong_vote: Option, + ) -> Self { + Self::new_without_transactions_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + signature, + merkle_root, + strong_vote, + None, + ) + } + + pub fn new_without_transactions_with_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: &[BlockReference], + acknowledgment_references: &[BlockReference], + meta_creation_time_ns: TimestampNs, + signature: &SignatureBytes, + merkle_root: TransactionsCommitment, + strong_vote: Option, + unprovable_certificate: Option<&BlockReference>, ) -> Self { let mut hasher = Blake3Hasher::new(); Self::digest_without_signature( @@ -196,6 +220,7 @@ impl BlockDigest { merkle_root, strong_vote, ); + Self::hash_unprovable_certificate(&mut hasher, unprovable_certificate); hasher.update(signature.as_bytes()); Self(hasher.finalize().into()) } @@ -209,6 +234,30 @@ impl BlockDigest { signature: &SignatureBytes, transactions_commitment: TransactionsCommitment, strong_vote: Option, + ) -> Self { + Self::new_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + signature, + transactions_commitment, + strong_vote, + None, + ) + } + + pub fn new_with_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: &[BlockReference], + acknowledgment_references: &[BlockReference], + meta_creation_time_ns: TimestampNs, + signature: &SignatureBytes, + transactions_commitment: TransactionsCommitment, + strong_vote: Option, + unprovable_certificate: Option<&BlockReference>, ) -> Self { let mut hasher = Blake3Hasher::new(); Self::digest_without_signature( @@ -221,6 +270,7 @@ impl BlockDigest { transactions_commitment, strong_vote, ); + Self::hash_unprovable_certificate(&mut hasher, unprovable_certificate); hasher.update(signature.as_bytes()); Self(hasher.finalize().into()) } @@ -250,6 +300,19 @@ impl BlockDigest { CryptoHash::crypto_hash(&mask, hasher); } } + + /// Extend a block digest hasher with the Bluestreak unprovable + /// certificate reference. Called after `digest_without_signature` and + /// before finalizing. No-op when `None`, preserving backward + /// compatibility. + pub(crate) fn hash_unprovable_certificate( + hasher: &mut Blake3Hasher, + unprovable_certificate: Option<&BlockReference>, + ) { + if let Some(cert_ref) = unprovable_certificate { + cert_ref.crypto_hash(hasher); + } + } } impl From<[u8; BLOCK_DIGEST_SIZE]> for BlockDigest { @@ -418,6 +481,7 @@ impl PublicKey { header.merkle_root(), header.strong_vote(), ); + BlockDigest::hash_unprovable_certificate(&mut hasher, header.unprovable_certificate()); let digest: [u8; BLOCK_DIGEST_SIZE] = hasher.finalize().into(); self.0.verify(&signature, digest.as_ref()) } @@ -461,6 +525,29 @@ impl Signer { meta_creation_time_ns: TimestampNs, transactions_commitment: TransactionsCommitment, strong_vote: Option, + ) -> SignatureBytes { + self.sign_block_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + transactions_commitment, + strong_vote, + None, + ) + } + + pub fn sign_block_with_unprovable( + &self, + authority: AuthorityIndex, + round: RoundNumber, + block_references: &[BlockReference], + acknowledgment_references: &[BlockReference], + meta_creation_time_ns: TimestampNs, + transactions_commitment: TransactionsCommitment, + strong_vote: Option, + unprovable_certificate: Option<&BlockReference>, ) -> SignatureBytes { let mut hasher = Blake3Hasher::new(); BlockDigest::digest_without_signature( @@ -473,6 +560,7 @@ impl Signer { transactions_commitment, strong_vote, ); + BlockDigest::hash_unprovable_certificate(&mut hasher, unprovable_certificate); let digest: [u8; BLOCK_DIGEST_SIZE] = hasher.finalize().into(); let signature = self.0.sign(digest.as_ref()); SignatureBytes(signature.to_bytes()) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index d9ed13c..93fdfe2 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -101,6 +101,7 @@ pub enum ConsensusProtocol { StarfishSpeed, StarfishBls, SailfishPlusPlus, + Bluestreak, } impl ConsensusProtocol { @@ -112,6 +113,7 @@ impl ConsensusProtocol { "starfish-bls" | "starfish-l" => ConsensusProtocol::StarfishBls, "starfish-speed" | "starfish-s" => ConsensusProtocol::StarfishSpeed, "sailfish++" | "sailfish-pp" => ConsensusProtocol::SailfishPlusPlus, + "bluestreak" => ConsensusProtocol::Bluestreak, _ => ConsensusProtocol::Starfish, } } @@ -129,13 +131,27 @@ impl ConsensusProtocol { matches!(self, ConsensusProtocol::SailfishPlusPlus) } + pub fn is_bluestreak(self) -> bool { + matches!(self, ConsensusProtocol::Bluestreak) + } + + /// Protocols that use a dual dirty/clean DAG with vertex certification. + pub fn uses_dual_dag(self) -> bool { + matches!( + self, + ConsensusProtocol::SailfishPlusPlus | ConsensusProtocol::Bluestreak + ) + } + pub fn default_dissemination_mode(self) -> DisseminationMode { match self { - ConsensusProtocol::Mysticeti | ConsensusProtocol::SailfishPlusPlus => { + ConsensusProtocol::Mysticeti + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { DisseminationMode::Pull } - ConsensusProtocol::CordialMiners => DisseminationMode::PushCausal, - ConsensusProtocol::Starfish + ConsensusProtocol::CordialMiners + | ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed | ConsensusProtocol::StarfishBls => DisseminationMode::PushCausal, } @@ -536,10 +552,10 @@ impl DagState { } } - // Recover persisted active Sailfish++ certifications. Only the active, + // Recover persisted active certified vertices. Only the active, // ancestor-closed frontier is persisted; preliminary waiting state is // intentionally not recovered. - if consensus_protocol == ConsensusProtocol::SailfishPlusPlus { + if consensus_protocol.uses_dual_dag() { let certified_from_round = if use_windowed { inner.evicted_rounds.iter().copied().min().unwrap_or(0) } else { @@ -562,6 +578,19 @@ impl DagState { } } } + if consensus_protocol == ConsensusProtocol::Bluestreak { + let mut recovered_blocks: Vec<_> = inner + .index + .iter() + .flat_map(|auth_map| auth_map.values()) + .flat_map(|blocks| blocks.values().cloned()) + .collect(); + recovered_blocks.sort_by_key(|block| *block.reference()); + let mut activated = Vec::new(); + for block in recovered_blocks { + inner.check_bluestreak_pre_clean(&block, &committee, &mut activated); + } + } metrics.dag_state_entries.inc_by(block_count); metrics.dag_highest_round.set(inner.highest_round as i64); @@ -611,6 +640,9 @@ impl DagState { ConsensusProtocol::SailfishPlusPlus => { tracing::info!("Starting Sailfish++ protocol") } + ConsensusProtocol::Bluestreak => { + tracing::info!("Starting Bluestreak protocol") + } } let dag_state = Self { store: store.clone(), @@ -643,7 +675,7 @@ impl DagState { /// clock progress to `r` and quorum stake of RBC-certified vertices in /// round `r - 1`. Other protocols use the raw threshold clock directly. pub fn proposal_round(&self) -> RoundNumber { - if self.consensus_protocol != ConsensusProtocol::SailfishPlusPlus { + if !self.consensus_protocol.uses_dual_dag() { return self.threshold_clock_round(); } @@ -1479,7 +1511,7 @@ impl DagState { } } - if self.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + if self.consensus_protocol.uses_dual_dag() && quorum_round > 1 && !self.certified_parent_quorum(quorum_round - 1) { @@ -2261,19 +2293,43 @@ impl DagStateInner { later_block: &Data, earlier_block: &Data, ) -> bool { - let mut parents = AHashSet::from([later_block.clone()]); - for _round_number in (earlier_block.round()..later_block.round()).rev() { - parents = parents - .iter() - .flat_map(|block| block.block_references()) - .map(|block_reference| { - self.get_storage_block(*block_reference) - .expect("Block should be in DagState") - }) - .filter(|included_block| included_block.round() >= earlier_block.round()) - .collect(); + let target = *earlier_block.reference(); + let mut frontier = vec![later_block.clone()]; + let mut visited = AHashSet::new(); + + while let Some(block) = frontier.pop() { + let block_ref = *block.reference(); + if !visited.insert(block_ref) { + continue; + } + if block_ref == target { + return true; + } + if block.round() <= earlier_block.round() { + continue; + } + + for parent_ref in block.block_references() { + let parent = self + .get_storage_block(*parent_ref) + .expect("Block should be in DagState"); + if parent.round() >= earlier_block.round() { + frontier.push(parent); + } + } + if self.consensus_protocol == ConsensusProtocol::Bluestreak { + if let Some(cert_ref) = block.unprovable_certificate() { + let parent = self + .get_storage_block(*cert_ref) + .expect("Block should be in DagState"); + if parent.round() >= earlier_block.round() { + frontier.push(parent); + } + } + } } - parents.contains(earlier_block) + + false } /// Compute all block references reachable from `later_block` at @@ -2284,16 +2340,42 @@ impl DagStateInner { later_block: &Data, target_round: RoundNumber, ) -> AHashSet { - let mut frontier = AHashSet::from([later_block.clone()]); - for _ in (target_round..later_block.round()).rev() { - frontier = frontier - .iter() - .flat_map(|block| block.block_references()) - .filter_map(|r| self.get_storage_block(*r)) - .filter(|b| b.round() >= target_round) - .collect(); + let mut reachable = AHashSet::new(); + let mut frontier = vec![later_block.clone()]; + let mut visited = AHashSet::new(); + + while let Some(block) = frontier.pop() { + let block_ref = *block.reference(); + if !visited.insert(block_ref) { + continue; + } + if block.round() == target_round { + reachable.insert(block_ref); + continue; + } + if block.round() < target_round { + continue; + } + + for reference in block.block_references() { + if let Some(parent) = self.get_storage_block(*reference) { + if parent.round() >= target_round { + frontier.push(parent); + } + } + } + if self.consensus_protocol == ConsensusProtocol::Bluestreak { + if let Some(cert_ref) = block.unprovable_certificate() { + if let Some(parent) = self.get_storage_block(*cert_ref) { + if parent.round() >= target_round { + frontier.push(parent); + } + } + } + } } - frontier.iter().map(|b| *b.reference()).collect() + + reachable } /// Per-authority eviction using BTreeMap::split_off. @@ -2422,6 +2504,18 @@ impl DagStateInner { missing.push(*parent); } } + // Bluestreak: the unprovable_certificate target is also a + // causal dependency that must be ancestor-closed. + if self.consensus_protocol == ConsensusProtocol::Bluestreak { + if let Some(cert_ref) = block.unprovable_certificate() { + if cert_ref.round > 0 + && !self.vertex_certificates[cert_ref.authority as usize].contains(cert_ref) + && !missing.contains(cert_ref) + { + missing.push(*cert_ref); + } + } + } missing } @@ -2450,6 +2544,14 @@ impl DagStateInner { } } } + if let Some(cert_ref) = block.unprovable_certificate() { + if let Some(children) = self.pending_vertex_certificate_children.get_mut(cert_ref) { + children.remove(&block_ref); + if children.is_empty() { + self.pending_vertex_certificate_children.remove(cert_ref); + } + } + } activated.push(block_ref); self.pending_persisted_vertex_certificates[auth].insert(block_ref); @@ -2500,6 +2602,7 @@ impl DagStateInner { self.update_data_availability(&block); self.update_starfish_speed_leader_hints(&block); self.update_inferred_sailfish_support(&block, committee, activated); + self.check_bluestreak_pre_clean(&block, committee, activated); } fn update_inferred_sailfish_support( @@ -2508,7 +2611,7 @@ impl DagStateInner { committee: &Committee, activated: &mut Vec, ) { - if self.consensus_protocol != ConsensusProtocol::SailfishPlusPlus || block.round() <= 1 { + if !self.consensus_protocol.uses_dual_dag() || block.round() <= 1 { return; } @@ -2532,6 +2635,96 @@ impl DagStateInner { } } + /// Bluestreak pre-clean check: a block is pre-clean if it has no + /// `unprovable_certificate`, or its certificate is verified via the dirty + /// DAG. Pre-clean blocks feed into the vertex certification pipeline. + fn check_bluestreak_pre_clean( + &mut self, + block: &VerifiedBlock, + committee: &Committee, + activated: &mut Vec, + ) { + if self.consensus_protocol != ConsensusProtocol::Bluestreak { + return; + } + let block_ref = *block.reference(); + let is_pre_clean = match block.unprovable_certificate() { + None => true, + Some(leader_ref) => { + self.verify_unprovable_certificate(block_ref.round, leader_ref, committee) + } + }; + if is_pre_clean { + self.note_vertex_certified(block_ref, activated); + } + self.reevaluate_bluestreak_pending(committee, activated); + } + + /// Verify an unprovable certificate against the local dirty DAG. + /// Returns true if 2f+1 blocks at r-1 reference the leader (direct) + /// or f+1 blocks at r propagate the same certificate. + fn verify_unprovable_certificate( + &self, + block_round: RoundNumber, + leader_ref: &BlockReference, + committee: &Committee, + ) -> bool { + if leader_ref.round + 2 != block_round { + return false; + } + + let vote_round = match block_round.checked_sub(1) { + Some(round) => round, + None => return false, + }; + let mut stake = StakeAggregator::::new(); + for block in self.get_blocks_by_round(vote_round) { + if block.block_references().iter().any(|r| r == leader_ref) { + stake.add(block.authority(), committee); + } + } + if stake.is_quorum(committee) { + return true; + } + + let mut propagation = StakeAggregator::::new(); + for block in self.get_blocks_by_round(block_round) { + if block.unprovable_certificate() == Some(leader_ref) { + propagation.add(block.authority(), committee); + } + } + propagation.get_stake() >= committee.validity_threshold() + } + + fn reevaluate_bluestreak_pending( + &mut self, + committee: &Committee, + activated: &mut Vec, + ) { + let mut ready = Vec::new(); + for auth_map in &self.index { + for round_blocks in auth_map.values() { + for block in round_blocks.values() { + let block_ref = *block.reference(); + if self.vertex_certificates[block_ref.authority as usize].contains(&block_ref) { + continue; + } + let Some(leader_ref) = block.unprovable_certificate() else { + continue; + }; + if self.verify_unprovable_certificate(block.round(), leader_ref, committee) { + ready.push(block_ref); + } + } + } + } + ready.sort(); + ready.dedup(); + for block_ref in ready { + self.note_vertex_certified(block_ref, activated); + } + } + fn infer_vertex_certificate_closure( &mut self, root: BlockReference, @@ -2702,6 +2895,7 @@ impl DagStateInner { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak ) && block.transactions().is_none() && block.merkle_root() == TransactionsCommitment::new_from_transactions(&Vec::new()); if block.has_empty_payload() || is_empty_full_block { @@ -3295,7 +3489,8 @@ mod tests { let merkle_root = match consensus_protocol { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { TransactionsCommitment::new_from_transactions(&empty_transactions) } ConsensusProtocol::Starfish @@ -3329,7 +3524,8 @@ mod tests { let merkle_root = match consensus_protocol { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { TransactionsCommitment::new_from_transactions(&empty_transactions) } ConsensusProtocol::Starfish @@ -3883,6 +4079,91 @@ mod tests { assert!(!dag_state.mark_dac_certified(rejected, cert)); } + #[test] + fn bluestreak_reachability_follows_unprovable_certificate() { + let dag_state = open_test_dag_state_for("bluestreak", 0); + let empty_transactions = Vec::new(); + let commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); + + let mut leader = VerifiedBlock::new( + 1, + 1, + vec![BlockReference::new_test(1, 0)], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + ); + leader.preserialize(); + let leader = Data::new(leader); + let leader_ref = *leader.reference(); + + let mut own_round_one = VerifiedBlock::new( + 0, + 1, + vec![BlockReference::new_test(0, 0)], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + ); + own_round_one.preserialize(); + let own_round_one = Data::new(own_round_one); + + let mut own_prev = VerifiedBlock::new( + 0, + 2, + vec![*own_round_one.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + ); + own_prev.preserialize(); + let own_prev = Data::new(own_prev); + + let mut compress = VerifiedBlock::new_with_unprovable( + 0, + 3, + vec![*own_prev.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions, + commitment, + None, + None, + None, + Some(leader_ref), + ); + compress.preserialize(); + let compress = Data::new(compress); + + dag_state.insert_general_block(leader.clone(), DataSource::BlockBundleStreaming); + dag_state.insert_general_block(own_round_one, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(own_prev, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(compress.clone(), DataSource::BlockBundleStreaming); + + assert!(dag_state.linked(&compress, &leader)); + assert!( + dag_state + .reachable_at_round(&compress, 1) + .contains(&leader_ref) + ); + } + #[test] fn consensus_protocol_resolves_dissemination_defaults() { assert_eq!( diff --git a/crates/starfish-core/src/net_sync.rs b/crates/starfish-core/src/net_sync.rs index 73c2c2a..929732b 100644 --- a/crates/starfish-core/src/net_sync.rs +++ b/crates/starfish-core/src/net_sync.rs @@ -504,7 +504,8 @@ impl ConnectionHandler { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { blocks_with_transactions.push(block); } ConsensusProtocol::Starfish @@ -1605,6 +1606,7 @@ impl NetworkSyncer loop { let notified = inner.threshold_clock_notify.notified(); let round = if inner.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + || inner.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { inner.dag_state.proposal_round().saturating_sub(1) } else { diff --git a/crates/starfish-core/src/threshold_clock.rs b/crates/starfish-core/src/threshold_clock.rs index 7490458..288b7b1 100644 --- a/crates/starfish-core/src/threshold_clock.rs +++ b/crates/starfish-core/src/threshold_clock.rs @@ -116,6 +116,7 @@ mod tests { strong_vote: None, bls: None, sailfish: None, + unprovable_certificate: None, serialized: None, } } diff --git a/crates/starfish-core/src/types.rs b/crates/starfish-core/src/types.rs index f3e2331..1f465a6 100644 --- a/crates/starfish-core/src/types.rs +++ b/crates/starfish-core/src/types.rs @@ -283,6 +283,9 @@ pub struct BlockHeader { /// Sailfish++ control certificates (timeout/no-vote). None for all other /// protocols. pub(crate) sailfish: Option>, + /// Bluestreak unprovable certificate: optional reference to a + /// leader at round r-2 certified by 2f+1 votes at r-1. + pub(crate) unprovable_certificate: Option, // -- Cache (not serialized) ----------------------------------------------- /// Cached bincode-serialized bytes. Populated by `preserialize()` off the @@ -415,6 +418,10 @@ impl BlockHeader { .unwrap_or(&[]) } + pub fn unprovable_certificate(&self) -> Option<&BlockReference> { + self.unprovable_certificate.as_ref() + } + pub fn sailfish(&self) -> Option<&SailfishFields> { self.sailfish.as_deref() } @@ -694,6 +701,36 @@ impl VerifiedBlock { strong_vote: Option, bls: Option, sailfish: Option, + ) -> Self { + Self::new_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + signature, + transactions, + merkle_root, + strong_vote, + bls, + sailfish, + None, + ) + } + + pub fn new_with_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: Vec, + acknowledgment_references: Vec, + meta_creation_time_ns: TimestampNs, + signature: SignatureBytes, + transactions: Vec, + merkle_root: TransactionsCommitment, + strong_vote: Option, + bls: Option, + sailfish: Option, + unprovable_certificate: Option, ) -> Self { let (acknowledgment_intersection, acknowledgment_references) = compress_acknowledgments(&block_references, &acknowledgment_references); @@ -706,7 +743,7 @@ impl VerifiedBlock { reference: BlockReference { authority, round, - digest: BlockDigest::new_without_transactions( + digest: BlockDigest::new_without_transactions_with_unprovable( authority, round, &block_references, @@ -715,6 +752,7 @@ impl VerifiedBlock { &signature, merkle_root, strong_vote, + unprovable_certificate.as_ref(), ), }, block_references, @@ -728,6 +766,7 @@ impl VerifiedBlock { strong_vote, bls: bls.map(Box::new), sailfish: sailfish.map(Box::new), + unprovable_certificate, serialized: None, }; @@ -771,6 +810,7 @@ impl VerifiedBlock { strong_vote: None, bls: None, sailfish: None, + unprovable_certificate: None, serialized: None, }; let mut block = Self { @@ -803,6 +843,53 @@ impl VerifiedBlock { precomputed_round_sig: Option, precomputed_leader_sig: Option, sailfish: Option, + ) -> Self { + Self::new_with_signer_and_unprovable( + authority, + round, + block_references, + voted_leader_ref, + acknowledgment_references, + meta_creation_time_ns, + signer, + bls_signer, + committee_opt, + aggregate_dac_sigs, + transactions, + encoded_transactions, + consensus_protocol, + strong_vote, + aggregate_round_sig, + certified_leader, + precomputed_round_sig, + precomputed_leader_sig, + sailfish, + None, + ) + } + + #[allow(clippy::too_many_arguments)] + pub fn new_with_signer_and_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: Vec, + voted_leader_ref: Option, + acknowledgment_references: Vec, + meta_creation_time_ns: TimestampNs, + signer: &Signer, + bls_signer: Option<&BlsSigner>, + committee_opt: Option<&Committee>, + aggregate_dac_sigs: Vec, + transactions: Vec, + encoded_transactions: Option>, + consensus_protocol: ConsensusProtocol, + strong_vote: Option, + aggregate_round_sig: Option, + certified_leader: Option<(BlockReference, BlsAggregateCertificate)>, + precomputed_round_sig: Option, + precomputed_leader_sig: Option, + sailfish: Option, + unprovable_certificate: Option, ) -> Self { let transactions_commitment = match consensus_protocol { ConsensusProtocol::Starfish @@ -820,7 +907,8 @@ impl VerifiedBlock { } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { TransactionsCommitment::new_from_transactions(&transactions) } }; @@ -836,7 +924,7 @@ impl VerifiedBlock { &acknowledgment_references, aggregate_dac_sigs, ); - let signature = signer.sign_block( + let signature = signer.sign_block_with_unprovable( authority, round, &block_references, @@ -844,6 +932,7 @@ impl VerifiedBlock { meta_creation_time_ns, transactions_commitment, strong_vote, + unprovable_certificate.as_ref(), ); // Build BLS fields when the StarfishBls path is active. Partial round @@ -872,7 +961,7 @@ impl VerifiedBlock { } }); - Self::new( + Self::new_with_unprovable( authority, round, block_references, @@ -884,6 +973,7 @@ impl VerifiedBlock { strong_vote, bls, sailfish, + unprovable_certificate, ) } @@ -965,6 +1055,10 @@ impl VerifiedBlock { self.header.is_strong_blame() } + pub fn unprovable_certificate(&self) -> Option<&BlockReference> { + self.header.unprovable_certificate() + } + pub fn sailfish(&self) -> Option<&SailfishFields> { self.header.sailfish() } @@ -1111,7 +1205,8 @@ impl VerifiedBlock { } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { let empty_transactions = Vec::new(); let empty_transactions_commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); @@ -1148,7 +1243,7 @@ impl VerifiedBlock { ); } let acknowledgments = self.header.acknowledgments(); - let digest = BlockDigest::new( + let digest = BlockDigest::new_with_unprovable( self.authority(), round, &self.header.block_references, @@ -1157,6 +1252,7 @@ impl VerifiedBlock { &self.header.signature, self.header.transactions_commitment, self.header.strong_vote, + self.header.unprovable_certificate(), ); ensure!( digest == self.digest(), @@ -1189,6 +1285,10 @@ impl VerifiedBlock { } match consensus_protocol { ConsensusProtocol::StarfishBls => { + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); let bls = self .header .bls() @@ -1296,6 +1396,10 @@ impl VerifiedBlock { self.header.bls().is_none(), "Only StarfishBls blocks may carry BLS fields" ); + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners => { ensure!( @@ -1310,6 +1414,56 @@ impl VerifiedBlock { self.header.bls().is_none(), "Only StarfishBls blocks may carry BLS fields" ); + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); + } + ConsensusProtocol::Bluestreak => { + ensure!( + acknowledgments.is_empty(), + "Bluestreak blocks must not carry acknowledgments" + ); + ensure!( + self.header.bls().is_none(), + "Only StarfishBls blocks may carry BLS fields" + ); + let is_leader = self.authority() == committee.elect_leader(round); + if !is_leader { + ensure!( + self.header.block_references.len() <= 2, + "Bluestreak non-leader may reference at most own_prev + leader" + ); + let _self_ref = *self + .header + .block_references + .iter() + .find(|r| r.authority == self.authority()) + .ok_or_else(|| { + eyre::eyre!( + "Bluestreak non-leader block must reference its own previous block" + ) + })?; + if let Some(cert_ref) = self.header.unprovable_certificate() { + ensure!( + round >= 3 && cert_ref.round + 2 == round, + "unprovable_certificate must reference leader at round r-2" + ); + ensure!( + cert_ref.authority == committee.elect_leader(cert_ref.round), + "unprovable_certificate must reference the elected leader" + ); + } + } else { + ensure!( + self.header.unprovable_certificate().is_none(), + "Bluestreak leaders must not carry unprovable_certificate" + ); + ensure!( + threshold_clock_valid_block_header(&self.header, committee), + "Bluestreak leader must reference 2f+1 blocks from previous round" + ); + } } ConsensusProtocol::SailfishPlusPlus => { ensure!( @@ -1324,6 +1478,10 @@ impl VerifiedBlock { self.header.bls().is_none(), "Only StarfishBls blocks may carry BLS fields" ); + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); if round > 1 { let prev_round = round - 1; let prev_leader = committee.elect_leader(prev_round); @@ -1971,6 +2129,7 @@ mod tests { strong_vote: None, bls: None, sailfish: None, + unprovable_certificate: None, serialized: None, }; @@ -2182,6 +2341,102 @@ mod tests { ); } + #[test] + fn verifies_bluestreak_non_leader_with_unprovable_certificate() { + let committee = Committee::new_for_benchmarks(4); + let signers = Signer::new_for_test(committee.len()); + let authority = committee + .authorities() + .find(|authority| *authority != committee.elect_leader(3)) + .unwrap(); + let round_2_leader = committee.elect_leader(2); + let round_1_leader = committee.elect_leader(1); + let mut block = VerifiedBlock::new_with_signer_and_unprovable( + authority, + 3, + vec![ + BlockReference::new_test(authority, 2), + BlockReference::new_test(round_2_leader, 2), + ], + None, + vec![], + 0, + &signers[authority as usize], + None, + None, + vec![], + vec![], + None, + ConsensusProtocol::Bluestreak, + None, + None, + None, + None, + None, + None, + Some(BlockReference::new_test(round_1_leader, 1)), + ); + + let mut encoder = Encoder::new(2, 4, 2).unwrap(); + block + .verify( + committee.as_ref(), + 0, + 1, + &mut encoder, + ConsensusProtocol::Bluestreak, + ) + .unwrap(); + } + + #[test] + fn rejects_bluestreak_leader_with_unprovable_certificate() { + let committee = Committee::new_for_benchmarks(4); + let signers = Signer::new_for_test(committee.len()); + let leader = committee.elect_leader(3); + let round_1_leader = committee.elect_leader(1); + let mut block = VerifiedBlock::new_with_signer_and_unprovable( + leader, + 3, + (0..committee.quorum_threshold() as AuthorityIndex) + .map(|authority| BlockReference::new_test(authority, 2)) + .collect(), + None, + vec![], + 0, + &signers[leader as usize], + None, + None, + vec![], + vec![], + None, + ConsensusProtocol::Bluestreak, + None, + None, + None, + None, + None, + None, + Some(BlockReference::new_test(round_1_leader, 1)), + ); + + let mut encoder = Encoder::new(2, 4, 2).unwrap(); + let err = block + .verify( + committee.as_ref(), + 0, + 1, + &mut encoder, + ConsensusProtocol::Bluestreak, + ) + .unwrap_err(); + + assert!( + err.to_string() + .contains("Bluestreak leaders must not carry unprovable_certificate") + ); + } + #[test] fn verifies_empty_mysticeti_block_without_transaction_data() { let committee = Committee::new_for_benchmarks(4); diff --git a/crates/starfish-core/src/validator.rs b/crates/starfish-core/src/validator.rs index 9e2d76d..90c24ef 100644 --- a/crates/starfish-core/src/validator.rs +++ b/crates/starfish-core/src/validator.rs @@ -310,11 +310,17 @@ mod smoke_tests { #[test_case("starfish-speed", 80)] #[test_case("starfish-bls", 100)] #[test_case("sailfish++", 120)] + #[test_case("bluestreak", 140)] #[tokio::test] async fn validator_commit(consensus: &str, port_offset: u16) { run_commit_test(consensus, port_offset).await; } + #[tokio::test] + async fn validator_commit_bluestreak_basic() { + run_commit_test("bluestreak", 150).await; + } + async fn run_sync_test(consensus: &str, port_offset: u16) { let committee_size = 4; let committee = Committee::new_for_benchmarks(committee_size); @@ -402,6 +408,7 @@ mod smoke_tests { #[test_case("starfish-speed", 180)] #[test_case("starfish-bls", 200)] #[test_case("sailfish++", 220)] + #[test_case("bluestreak", 260)] #[tokio::test] async fn validator_sync(consensus: &str, port_offset: u16) { run_sync_test(consensus, port_offset).await; @@ -465,6 +472,7 @@ mod smoke_tests { #[test_case("starfish-speed", 280)] #[test_case("starfish-bls", 300)] #[test_case("sailfish++", 320)] + #[test_case("bluestreak", 380)] #[tokio::test] async fn validator_crash_faults(consensus: &str, port_offset: u16) { run_crash_faults_test(consensus, port_offset).await; diff --git a/scripts/dryrun.sh b/scripts/dryrun.sh index aaf5115..6c48cb4 100755 --- a/scripts/dryrun.sh +++ b/scripts/dryrun.sh @@ -7,8 +7,8 @@ NUM_NODES=${NUM_NODES:-10} DESIRED_TPS=${DESIRED_TPS:-1000} # Options: starfish, starfish-speed, starfish-bls, -# cordial-miners, mysticeti -CONSENSUS=${CONSENSUS:-starfish-speed} +# cordial-miners, mysticeti, bluestreak +CONSENSUS=${CONSENSUS:-bluestreak} NUM_BYZANTINE_NODES=${NUM_BYZANTINE_NODES:-0} # Options: timeout-leader, leader-withholding, # equivocating-chains, equivocating-two-chains, @@ -24,7 +24,7 @@ STORAGE_BACKEND=${STORAGE_BACKEND:-rocksdb} TRANSACTION_MODE=${TRANSACTION_MODE:-random} # Dissemination mode: protocol-default (default) | pull | # push-causal | push-useful -DISSEMINATION_MODE=${DISSEMINATION_MODE:-push-causal} +DISSEMINATION_MODE=${DISSEMINATION_MODE:-protocol-default} # Enable lz4 network compression. # Auto-enabled for random transaction mode. # Set COMPRESS_NETWORK=1 or =0 to override.