diff --git a/crates/orchestrator/src/benchmark.rs b/crates/orchestrator/src/benchmark.rs index f8687f4..b9ec6b3 100644 --- a/crates/orchestrator/src/benchmark.rs +++ b/crates/orchestrator/src/benchmark.rs @@ -38,7 +38,8 @@ pub struct BenchmarkParametersGeneric { /// paying for data sent between the nodes. pub use_internal_ip_address: bool, // Consensus protocol to deploy - // (starfish | starfish-speed | starfish-bls | mysticeti | cordial-miners) + // (starfish | starfish-speed | starfish-bls | mysticeti | + // cordial-miners | bluestreak) pub consensus_protocol: String, /// number Byzantine nodes pub byzantine_nodes: usize, diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 4a5ae98..8c5b581 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -115,7 +115,7 @@ pub enum Operation { /// Consensus to deploy. Available options: /// starfish | starfish-speed | starfish-bls | mysticeti | - /// cordial-miners + /// cordial-miners | bluestreak #[clap(long, value_name = "STRING", default_value = "starfish", global = true)] consensus: String, diff --git a/crates/orchestrator/src/measurements.rs b/crates/orchestrator/src/measurements.rs index 918b699..cc4fcb3 100644 --- a/crates/orchestrator/src/measurements.rs +++ b/crates/orchestrator/src/measurements.rs @@ -926,10 +926,6 @@ utilization_timer{proc="Committer::indirect_decide"} 693165 utilization_timer{proc="Core::add_blocks"} 5694911 utilization_timer{proc="Core::run_block_handler"} 198119 utilization_timer{proc="Core::try_new_block"} 1285400 -utilization_timer{proc="Core::try_new_block::build block"} 674913 -utilization_timer{proc="Core::try_new_block::encoding"} 31602 -utilization_timer{proc="Core::try_new_block::serialize block"} 11766 -utilization_timer{proc="Core::try_new_block::writing to disk"} 188706 utilization_timer{proc="Core::try_new_commit"} 6288004 utilization_timer{proc="Network: verify blocks"} 41800099 utilization_timer{proc="Syncer::try_new_commit"} 8128094 diff --git a/crates/starfish-core/src/broadcaster.rs b/crates/starfish-core/src/broadcaster.rs index d772410..b9cd166 100644 --- a/crates/starfish-core/src/broadcaster.rs +++ b/crates/starfish-core/src/broadcaster.rs @@ -85,7 +85,8 @@ impl BroadcasterParameters { ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed | ConsensusProtocol::StarfishBls - | ConsensusProtocol::CordialMiners => Self { + | ConsensusProtocol::CordialMiners + | ConsensusProtocol::Bluestreak => Self { batch_own_block_size: committee_size, batch_other_block_size: committee_size * committee_size, batch_shard_size: committee_size * committee_size, @@ -417,7 +418,8 @@ where } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { let all_blocks = self.inner.dag_state.get_storage_blocks(&block_references); let mut blocks = Vec::new(); @@ -821,7 +823,8 @@ fn push_transport_format(consensus_protocol: ConsensusProtocol) -> PushOtherBloc | ConsensusProtocol::StarfishBls => PushOtherBlocksFormat::HeadersAndShards, ConsensusProtocol::CordialMiners | ConsensusProtocol::Mysticeti - | ConsensusProtocol::SailfishPlusPlus => PushOtherBlocksFormat::FullBlocks, + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => PushOtherBlocksFormat::FullBlocks, } } diff --git a/crates/starfish-core/src/consensus/linearizer.rs b/crates/starfish-core/src/consensus/linearizer.rs index 1c7c40f..fddc58b 100644 --- a/crates/starfish-core/src/consensus/linearizer.rs +++ b/crates/starfish-core/src/consensus/linearizer.rs @@ -100,6 +100,7 @@ impl Linearizer { let mut to_commit = Vec::new(); let leader_block_ref = *leader_block.reference(); let min_round = leader_block_ref.round.saturating_sub(MAX_TRAVERSAL_DEPTH); + let follow_unprovable = dag_state.consensus_protocol == ConsensusProtocol::Bluestreak; assert!(self.committed.insert(leader_block_ref)); let mut current_level = vec![leader_block]; @@ -115,6 +116,13 @@ impl Linearizer { next_refs.push(*reference); } } + if follow_unprovable { + if let Some(cert_ref) = x.unprovable_certificate() { + if cert_ref.round >= min_round && self.committed.insert(*cert_ref) { + next_refs.push(*cert_ref); + } + } + } } if next_refs.is_empty() { break; @@ -259,7 +267,8 @@ impl Linearizer { } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { self.collect_subdag_ancestors(dag_state, leader_block) } }; diff --git a/crates/starfish-core/src/consensus/universal_committer.rs b/crates/starfish-core/src/consensus/universal_committer.rs index bf4568f..66e56a9 100644 --- a/crates/starfish-core/src/consensus/universal_committer.rs +++ b/crates/starfish-core/src/consensus/universal_committer.rs @@ -11,7 +11,7 @@ use crate::{ committee::{Committee, QuorumThreshold, StakeAggregator}, consensus::base_committer::BaseCommitterOptions, dag_state::{ConsensusProtocol, DagState}, - metrics::{Metrics, UtilizationTimerVecExt}, + metrics::Metrics, types::{AuthorityIndex, BlockReference, RoundNumber, Stake, format_authority_round}, }; @@ -43,6 +43,9 @@ impl UniversalCommitter { if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus { return self.try_commit_sailfish(last_decided); } + if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { + return self.try_commit_bluestreak(last_decided); + } let highest_known_round = self.dag_state.highest_round(); let last_decided_round = last_decided.round(); @@ -122,20 +125,11 @@ impl UniversalCommitter { let voter_info = &self.voters_cache[&(leader, round)].1; // Try to directly decide the leader. - let timer_direct_decide = self - .metrics - .utilization_timer - .utilization_timer("Committer::direct_decide"); let mut status = committer.try_direct_decide(leader, round, voter_info); - drop(timer_direct_decide); tracing::debug!("Outcome of direct rule: {status}"); // If the leader is not final (undecided, or Commit(Pending) for StarfishSpeed), // try to resolve via indirect rule. - let timer_indirect_decide = self - .metrics - .utilization_timer - .utilization_timer("Committer::indirect_decide"); if !status.is_final() { status = committer.try_indirect_decide(leader, round, leaders.iter(), voter_info); @@ -144,7 +138,6 @@ impl UniversalCommitter { } tracing::debug!("Outcome of indirect rule: {status}"); } - drop(timer_indirect_decide); if status.is_final() { self.decided.insert((leader, round), status.clone()); @@ -180,6 +173,147 @@ impl UniversalCommitter { .collect() } + fn try_commit_bluestreak(&mut self, last_decided: BlockReference) -> Vec { + let highest_known_round = self.dag_state.highest_round(); + let last_decided_round = last_decided.round(); + let highest_anchor = highest_known_round.saturating_sub(2); + let mut committed = Vec::new(); + let mut newly_committed = AHashSet::new(); + + for round in last_decided_round + 1..=highest_anchor { + let leader = self.committee.elect_leader(round); + + // Fast path: reuse finalized decisions from previous calls. + // Only re-emit Commits; cached Skips still short-circuit evaluation + // but are not pushed to `committed` (matching the original guard + // behavior that never re-emitted Skips). + let key = (leader, round); + if let Some(cached) = self.decided.get(&key) { + if cached.is_final() { + if let LeaderStatus::Commit(..) = cached { + committed.push(cached.clone()); + newly_committed.insert(key); + } + continue; + } + } + + if self.check_direct_skip_bluestreak(leader, round) { + if !self.decided.contains_key(&key) { + let status = LeaderStatus::Skip(leader, round); + self.decided.insert(key, status.clone()); + if self.metrics_emitted.insert(key) { + tracing::debug!("Decided {status}"); + self.update_metrics(&status, true); + } + committed.push(status); + } + continue; + } + + let Some(anchor) = self.try_direct_commit_bluestreak(leader, round) else { + continue; + }; + + let mut chain = vec![anchor.clone()]; + let mut current = anchor; + for prev_round in (last_decided_round + 1..current.round()).rev() { + let prev_leader = self.committee.elect_leader(prev_round); + let prev_key = (prev_leader, prev_round); + if newly_committed.contains(&prev_key) { + continue; + } + + let mut linked_leaders: Vec<_> = self + .dag_state + .get_blocks_at_authority_round(prev_leader, prev_round) + .into_iter() + .filter(|block| self.dag_state.has_vertex_certificate(block.reference())) + .filter(|block| self.dag_state.linked(¤t, block)) + .collect(); + + if linked_leaders.len() > 1 { + panic!( + "[Bluestreak] More than one linked leader for {}", + format_authority_round(prev_leader, prev_round) + ); + } + + if let Some(prev) = linked_leaders.pop() { + current = prev.clone(); + chain.push(prev); + } + } + + chain.reverse(); + for leader_block in chain { + let key = (leader_block.authority(), leader_block.round()); + if !newly_committed.insert(key) { + continue; + } + let direct_decide = key.1 == round; + let status = LeaderStatus::Commit(leader_block, None); + self.decided.insert(key, status.clone()); + if self.metrics_emitted.insert(key) { + tracing::debug!("Decided {status}"); + self.update_metrics(&status, direct_decide); + } + committed.push(status); + } + } + + committed.sort(); + committed + } + + fn try_direct_commit_bluestreak( + &self, + leader: AuthorityIndex, + leader_round: RoundNumber, + ) -> Option> { + let cert_round = leader_round + 2; + let leader_blocks = self + .dag_state + .get_blocks_at_authority_round(leader, leader_round); + + for leader_block in leader_blocks { + if !self + .dag_state + .has_vertex_certificate(leader_block.reference()) + { + continue; + } + let leader_ref = *leader_block.reference(); + let cert_blocks = self.dag_state.get_blocks_by_round_cached(cert_round); + let mut supporters = StakeAggregator::::new(); + for block in cert_blocks.iter() { + if block.unprovable_certificate() == Some(&leader_ref) + && supporters.add(block.authority(), &self.committee) + { + return Some(leader_block); + } + } + } + + None + } + + fn check_direct_skip_bluestreak(&self, leader: AuthorityIndex, round: RoundNumber) -> bool { + let vote_round = round + 1; + let vote_blocks = self.dag_state.get_blocks_by_round_cached(vote_round); + let mut non_voters = StakeAggregator::::new(); + for block in vote_blocks.iter() { + let votes_for = block + .block_references() + .iter() + .any(|r| r.round == round && r.authority == leader); + if !votes_for && non_voters.add(block.authority(), &self.committee) { + return true; + } + } + false + } + fn try_commit_sailfish(&mut self, last_decided: BlockReference) -> Vec { let highest_known_round = self.dag_state.highest_round(); let last_decided_round = last_decided.round(); @@ -432,6 +566,13 @@ impl UniversalCommitterBuilder { wave_length: WAVE_LENGTH, pipeline: false, }, + ConsensusProtocol::Bluestreak => Self { + committee, + dag_state, + metrics, + wave_length: WAVE_LENGTH, + pipeline: true, + }, ConsensusProtocol::CordialMiners => Self { committee, dag_state, @@ -566,4 +707,99 @@ mod tests { "expected round-1 leader to commit with f+1 certified supporters in round 2" ); } + + #[test] + fn bluestreak_direct_commit_uses_unprovable_certificates() { + let dag_state = open_test_dag_state_for("bluestreak", 0); + let committee = Committee::new_for_benchmarks(4); + let registry = Registry::new(); + let (metrics, _reporter) = Metrics::new( + ®istry, + Some(committee.as_ref()), + Some("bluestreak"), + None, + ); + + let leader = make_full_block(1, 1, vec![BlockReference::new_test(1, 0)]); + let leader_ref = *leader.reference(); + let vote_a = make_full_block(0, 2, vec![leader_ref]); + let vote_b = make_full_block(2, 2, vec![leader_ref]); + let vote_c = make_full_block(3, 2, vec![leader_ref]); + + let empty_transactions = Vec::new(); + let commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); + + let mut cert_a = crate::types::VerifiedBlock::new_with_unprovable( + 0, + 3, + vec![*vote_a.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + Some(leader_ref), + ); + cert_a.preserialize(); + let cert_a = Data::new(cert_a); + + let mut cert_b = crate::types::VerifiedBlock::new_with_unprovable( + 2, + 3, + vec![*vote_b.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + Some(leader_ref), + ); + cert_b.preserialize(); + let cert_b = Data::new(cert_b); + + let mut cert_c = crate::types::VerifiedBlock::new_with_unprovable( + 3, + 3, + vec![*vote_c.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions, + commitment, + None, + None, + None, + Some(leader_ref), + ); + cert_c.preserialize(); + let cert_c = Data::new(cert_c); + + dag_state.insert_general_block(leader, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(vote_a, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(vote_b, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(vote_c, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(cert_a, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(cert_b, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(cert_c, DataSource::BlockBundleStreaming); + + let mut committer = UniversalCommitterBuilder::new(committee, dag_state, metrics).build(); + let decided = committer.try_commit(BlockReference::new_test(0, 0)); + + assert!( + decided.iter().any(|status| { + matches!( + status, + LeaderStatus::Commit(block, None) + if block.authority() == 1 && block.round() == 1 + ) + }), + "expected round-1 leader to commit from round-3 unprovable certificates" + ); + } } diff --git a/crates/starfish-core/src/core.rs b/crates/starfish-core/src/core.rs index 8834c4d..3fe07be 100644 --- a/crates/starfish-core/src/core.rs +++ b/crates/starfish-core/src/core.rs @@ -232,10 +232,6 @@ impl Core { Vec, Vec>, ) { - let _timer = self - .metrics - .utilization_timer - .utilization_timer("Core::add_blocks"); let mut block_shards = Vec::new(); let blocks: Vec<_> = blocks .into_iter() @@ -322,10 +318,6 @@ impl Core { Vec, Vec>, ) { - let _timer = self - .metrics - .utilization_timer - .utilization_timer("Core::add_headers"); let (processed, _, missing_references) = timed!( self.metrics, "BlockManager::add_headers", @@ -466,7 +458,7 @@ impl Core { let _block_timer = self .metrics .utilization_timer - .utilization_timer("Core::new_block::try_new_block"); + .utilization_timer("Core::try_new_block"); // Check if we're ready for a new block let clock_round = self.dag_state.proposal_round(); @@ -480,7 +472,7 @@ impl Core { } // SailfishPlusPlus: require certified parent quorum before creating a block. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + if self.dag_state.consensus_protocol.uses_dual_dag() && clock_round > 1 && !self.dag_state.certified_parent_quorum(clock_round - 1) { @@ -508,24 +500,21 @@ impl Core { None }; - let pending_transactions = timed!( - self.metrics, - "Core::new_block::get_pending_transactions", - self.get_pending_transactions(clock_round) - ); - let (mut transactions, block_references, raw_refs) = timed!( - self.metrics, - "Core::new_block::collect_transactions_and_references", - self.collect_transactions_and_references(pending_transactions, clock_round) - ); + let pending_transactions = self.get_pending_transactions(clock_round); + let (mut transactions, block_references, raw_refs) = + self.collect_transactions_and_references(pending_transactions, clock_round); // SailfishPlusPlus: if the certified-parent filter reduced the parent // set below threshold-clock quorum, we cannot build a valid block yet. // Requeue both transactions and include refs so the next attempt sees // them again. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + let allow_own_prev_only = self.dag_state.consensus_protocol + == ConsensusProtocol::Bluestreak + && self.committee.elect_leader(clock_round.saturating_sub(1)) == self.authority; + if self.dag_state.consensus_protocol.uses_dual_dag() && clock_round > 1 && block_references.is_empty() + && !allow_own_prev_only { for r in raw_refs { self.pending.push(MetaTransaction::Include(r)); @@ -552,23 +541,11 @@ impl Core { if starfish_speed_excluded_authors.contains(self.authority) { self.requeue_transactions(std::mem::take(&mut transactions)); } - timed!( - self.metrics, - "Core::new_block::prepare_last_blocks", - self.prepare_last_blocks() - ); - let mut encoded_transactions = timed!( - self.metrics, - "Core::new_block::prepare_encoded_transactions", - self.prepare_encoded_transactions(&transactions) - ); + self.prepare_last_blocks(); + let mut encoded_transactions = self.prepare_encoded_transactions(&transactions); let acknowledgment_references = if self.dag_state.consensus_protocol.supports_acknowledgments() { - timed!( - self.metrics, - "Core::new_block::get_pending_acknowledgment", - self.dag_state.get_pending_acknowledgment(clock_round) - ) + self.dag_state.get_pending_acknowledgment(clock_round) } else { Vec::new() }; @@ -577,11 +554,7 @@ impl Core { acknowledgment_references, ); let number_of_blocks_to_create = self.last_own_block.len(); - let authority_bounds = timed!( - self.metrics, - "Core::new_block::calculate_authority_bounds", - self.calculate_authority_bounds(number_of_blocks_to_create) - ); + let authority_bounds = self.calculate_authority_bounds(number_of_blocks_to_create); let certified_leader = if self.dag_state.consensus_protocol == ConsensusProtocol::StarfishBls { @@ -613,30 +586,22 @@ impl Core { transactions = vec![]; encoded_transactions = self.prepare_encoded_transactions(&transactions); } - let block_data = timed!( - self.metrics, - "Core::new_block::build_block", - self.build_block( - &block_references, - voted_leader_ref, - &transactions, - &encoded_transactions, - &acknowledgment_references, - clock_round, - block_id, - aggregate_round_sig, - certified_leader, - ) + let block_data = self.build_block( + &block_references, + voted_leader_ref, + &transactions, + &encoded_transactions, + &acknowledgment_references, + clock_round, + block_id, + aggregate_round_sig, + certified_leader, ); tracing::debug!("Created block {:?}", block_data); if first_block.is_none() { first_block = Some(block_data.clone()); } - timed!( - self.metrics, - "Core::new_block::store_block", - self.store_block(block_data, &authority_bounds, block_id) - ); + self.store_block(block_data, &authority_bounds, block_id); } self.metrics @@ -688,15 +653,19 @@ impl Core { self.compress_pending_block_references(&pending_refs, block_round); // SailfishPlusPlus: filter parents to only include certified blocks. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus { + if self.dag_state.consensus_protocol.uses_dual_dag() { block_references.retain(|r| r.round == 0 || self.dag_state.has_vertex_certificate(r)); } // SailfishPlusPlus: verify the filtered parent set, together with the // creator's own previous block (always included by build_block), still // has quorum stake at round-1. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + let compress_non_leader = self.dag_state.consensus_protocol + == ConsensusProtocol::Bluestreak + && self.committee.elect_leader(block_round) != self.authority; + if self.dag_state.consensus_protocol.uses_dual_dag() && block_round > 1 + && !compress_non_leader { let prev_round = block_round - 1; let mut prev_round_stake: u64 = 0; @@ -746,7 +715,8 @@ impl Core { )), ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => None, + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => None, } } @@ -912,8 +882,17 @@ impl Core { } else { None }; + let unprovable_certificate = if self.dag_state.consensus_protocol + == ConsensusProtocol::Bluestreak + && self.committee.elect_leader(clock_round) != self.authority + && clock_round >= 3 + { + self.compute_unprovable_certificate(clock_round, &block_references) + } else { + None + }; - let mut block = VerifiedBlock::new_with_signer( + let mut block = VerifiedBlock::new_with_signer_and_unprovable( self.authority, clock_round, block_references, @@ -933,6 +912,7 @@ impl Core { precomputed_round_sig, precomputed_leader_sig, sailfish_fields, + unprovable_certificate, ); self.metrics @@ -1024,6 +1004,31 @@ impl Core { true } + fn compute_unprovable_certificate( + &self, + clock_round: RoundNumber, + _block_references: &[BlockReference], + ) -> Option { + let leader_round = clock_round.checked_sub(2)?; + let leader = self.committee.elect_leader(leader_round); + let leader_blocks = self + .dag_state + .get_blocks_at_authority_round(leader, leader_round) + .into_iter(); + + for leader_block in leader_blocks { + let leader_ref = *leader_block.reference(); + if self + .dag_state + .has_bluestreak_certificate_evidence(clock_round, &leader_ref) + { + return Some(leader_ref); + } + } + + None + } + fn prepare_last_blocks(&mut self) { let target = match self.dag_state.byzantine_strategy { Some( @@ -1059,6 +1064,27 @@ impl Core { pending_refs: &[BlockReference], block_round: RoundNumber, ) -> Vec { + if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { + let is_leader = self.committee.elect_leader(block_round) == self.authority; + if !is_leader { + let prev_round = block_round.saturating_sub(1); + let leader = self.committee.elect_leader(prev_round); + return pending_refs + .iter() + .copied() + .filter(|r| r.authority == leader && r.round == prev_round) + .take(1) + .collect(); + } + + let mut seen = AHashSet::new(); + return pending_refs + .iter() + .copied() + .filter(|r| r.authority != self.authority && seen.insert(*r)) + .collect(); + } + // SailfishPlusPlus: keep all previous-round references unconditionally // so that certified-parent filtering doesn't drop below quorum. // Only older-round references go through normal compression. @@ -1194,6 +1220,10 @@ impl Core { #[allow(clippy::type_complexity)] pub fn try_commit(&mut self) -> (Vec<(Data, Option)>, bool) { + let _timer = self + .metrics + .utilization_timer + .utilization_timer("Core::try_commit"); let leaders = self.committer.try_commit(self.last_commit_leader); let any_decided = !leaders.is_empty(); let sequence: Vec<_> = leaders @@ -1209,6 +1239,10 @@ impl Core { } pub fn cleanup(&mut self) -> RoundNumber { + let _timer = self + .metrics + .utilization_timer + .utilization_timer("Core::cleanup"); self.dag_state.cleanup(); let threshold = self.dag_state.gc_round(); self.block_manager @@ -1265,6 +1299,10 @@ impl Core { } pub fn handle_committed_subdag(&mut self, committed: Vec, _any_decided: bool) { + let _timer = self + .metrics + .utilization_timer + .utilization_timer("Core::handle_committed_subdag"); let mut commit_data = vec![]; for commit in &committed { let committed_rounds = self.dag_state.update_commit_state(commit); @@ -1300,7 +1338,7 @@ impl Core { } fn flush_pending_sailfish_certified_refs(&self) { - if self.dag_state.consensus_protocol != ConsensusProtocol::SailfishPlusPlus { + if !self.dag_state.consensus_protocol.uses_dual_dag() { return; } self.dag_state.flush_pending_sailfish_certified_refs(); diff --git a/crates/starfish-core/src/crypto.rs b/crates/starfish-core/src/crypto.rs index a1583c5..f5ca401 100644 --- a/crates/starfish-core/src/crypto.rs +++ b/crates/starfish-core/src/crypto.rs @@ -184,6 +184,30 @@ impl BlockDigest { signature: &SignatureBytes, merkle_root: TransactionsCommitment, strong_vote: Option, + ) -> Self { + Self::new_without_transactions_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + signature, + merkle_root, + strong_vote, + None, + ) + } + + pub fn new_without_transactions_with_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: &[BlockReference], + acknowledgment_references: &[BlockReference], + meta_creation_time_ns: TimestampNs, + signature: &SignatureBytes, + merkle_root: TransactionsCommitment, + strong_vote: Option, + unprovable_certificate: Option<&BlockReference>, ) -> Self { let mut hasher = Blake3Hasher::new(); Self::digest_without_signature( @@ -196,6 +220,7 @@ impl BlockDigest { merkle_root, strong_vote, ); + Self::hash_unprovable_certificate(&mut hasher, unprovable_certificate); hasher.update(signature.as_bytes()); Self(hasher.finalize().into()) } @@ -209,6 +234,30 @@ impl BlockDigest { signature: &SignatureBytes, transactions_commitment: TransactionsCommitment, strong_vote: Option, + ) -> Self { + Self::new_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + signature, + transactions_commitment, + strong_vote, + None, + ) + } + + pub fn new_with_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: &[BlockReference], + acknowledgment_references: &[BlockReference], + meta_creation_time_ns: TimestampNs, + signature: &SignatureBytes, + transactions_commitment: TransactionsCommitment, + strong_vote: Option, + unprovable_certificate: Option<&BlockReference>, ) -> Self { let mut hasher = Blake3Hasher::new(); Self::digest_without_signature( @@ -221,6 +270,7 @@ impl BlockDigest { transactions_commitment, strong_vote, ); + Self::hash_unprovable_certificate(&mut hasher, unprovable_certificate); hasher.update(signature.as_bytes()); Self(hasher.finalize().into()) } @@ -250,6 +300,19 @@ impl BlockDigest { CryptoHash::crypto_hash(&mask, hasher); } } + + /// Extend a block digest hasher with the Bluestreak unprovable + /// certificate reference. Called after `digest_without_signature` and + /// before finalizing. No-op when `None`, preserving backward + /// compatibility. + pub(crate) fn hash_unprovable_certificate( + hasher: &mut Blake3Hasher, + unprovable_certificate: Option<&BlockReference>, + ) { + if let Some(cert_ref) = unprovable_certificate { + cert_ref.crypto_hash(hasher); + } + } } impl From<[u8; BLOCK_DIGEST_SIZE]> for BlockDigest { @@ -418,6 +481,7 @@ impl PublicKey { header.merkle_root(), header.strong_vote(), ); + BlockDigest::hash_unprovable_certificate(&mut hasher, header.unprovable_certificate()); let digest: [u8; BLOCK_DIGEST_SIZE] = hasher.finalize().into(); self.0.verify(&signature, digest.as_ref()) } @@ -461,6 +525,29 @@ impl Signer { meta_creation_time_ns: TimestampNs, transactions_commitment: TransactionsCommitment, strong_vote: Option, + ) -> SignatureBytes { + self.sign_block_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + transactions_commitment, + strong_vote, + None, + ) + } + + pub fn sign_block_with_unprovable( + &self, + authority: AuthorityIndex, + round: RoundNumber, + block_references: &[BlockReference], + acknowledgment_references: &[BlockReference], + meta_creation_time_ns: TimestampNs, + transactions_commitment: TransactionsCommitment, + strong_vote: Option, + unprovable_certificate: Option<&BlockReference>, ) -> SignatureBytes { let mut hasher = Blake3Hasher::new(); BlockDigest::digest_without_signature( @@ -473,6 +560,7 @@ impl Signer { transactions_commitment, strong_vote, ); + BlockDigest::hash_unprovable_certificate(&mut hasher, unprovable_certificate); let digest: [u8; BLOCK_DIGEST_SIZE] = hasher.finalize().into(); let signature = self.0.sign(digest.as_ref()); SignatureBytes(signature.to_bytes()) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index d9ed13c..91f06ae 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -43,6 +43,16 @@ type AuthorityBitmask = AuthoritySet; pub type PendingSubDag = (CommittedSubDag, Vec>); +#[derive(Default)] +struct BluestreakCertificateState { + /// f+1 propagation support from round r for the same unprovable + /// certificate reference. + propagation_support: StakeAggregator, + /// Blocks waiting for this certificate to become provable in the dirty + /// DAG before they can join the clean DAG. + waiting_blocks: BTreeSet, +} + /// Tracks where block headers and transaction data originate. /// Carried on the wire inside `BlockBatch` so the receiver can distinguish /// proactive dissemination from sync responses. @@ -101,6 +111,7 @@ pub enum ConsensusProtocol { StarfishSpeed, StarfishBls, SailfishPlusPlus, + Bluestreak, } impl ConsensusProtocol { @@ -112,6 +123,7 @@ impl ConsensusProtocol { "starfish-bls" | "starfish-l" => ConsensusProtocol::StarfishBls, "starfish-speed" | "starfish-s" => ConsensusProtocol::StarfishSpeed, "sailfish++" | "sailfish-pp" => ConsensusProtocol::SailfishPlusPlus, + "bluestreak" => ConsensusProtocol::Bluestreak, _ => ConsensusProtocol::Starfish, } } @@ -129,13 +141,25 @@ impl ConsensusProtocol { matches!(self, ConsensusProtocol::SailfishPlusPlus) } + pub fn is_bluestreak(self) -> bool { + matches!(self, ConsensusProtocol::Bluestreak) + } + + /// Protocols that use a dual dirty/clean DAG with vertex certification. + pub fn uses_dual_dag(self) -> bool { + matches!( + self, + ConsensusProtocol::SailfishPlusPlus | ConsensusProtocol::Bluestreak + ) + } + pub fn default_dissemination_mode(self) -> DisseminationMode { match self { - ConsensusProtocol::Mysticeti | ConsensusProtocol::SailfishPlusPlus => { - DisseminationMode::Pull - } - ConsensusProtocol::CordialMiners => DisseminationMode::PushCausal, - ConsensusProtocol::Starfish + ConsensusProtocol::Mysticeti + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => DisseminationMode::Pull, + ConsensusProtocol::CordialMiners + | ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed | ConsensusProtocol::StarfishBls => DisseminationMode::PushCausal, } @@ -327,12 +351,36 @@ struct DagStateInner { /// Once this reaches f+1 distinct authorities, we can infer that some /// honest node had the referenced block in its active-certified view. inferred_vertex_support: Vec>>, + /// Direct next-round support for elected leader references. Kept generic + /// so other protocols can reuse leader vote aggregation without rescanning + /// dirty DAG rounds. + leader_vote_support: BTreeMap>, + /// Bluestreak-specific propagation support and blocks waiting for a given + /// unprovable certificate reference to become provable. + bluestreak_certificate_state: BTreeMap, /// Sailfish++ timeout certificates, indexed by round. sailfish_timeout_certs: BTreeMap, /// Sailfish++ no-vote certificates, indexed by (round, leader). sailfish_novote_certs: BTreeMap<(RoundNumber, AuthorityIndex), SailfishNoVoteCert>, } +/// O(log n) check whether `certs` contains any entry at the given round. +/// Exploits `BlockReference` ordering `(round, authority, digest)` to +/// binary-search instead of scanning the entire set. +fn has_certificate_at_round(certs: &BTreeSet, round: RoundNumber) -> bool { + let lo = BlockReference { + round, + authority: 0, + digest: BlockDigest::default(), + }; + let hi = BlockReference { + round: round + 1, + authority: 0, + digest: BlockDigest::default(), + }; + certs.range(lo..hi).next().is_some() +} + impl DagState { pub fn open( authority: AuthorityIndex, @@ -403,6 +451,8 @@ impl DagState { pending_vertex_certificate_children: BTreeMap::new(), pending_persisted_vertex_certificates: (0..n).map(|_| BTreeSet::new()).collect(), inferred_vertex_support: (0..n).map(|_| BTreeMap::new()).collect(), + leader_vote_support: BTreeMap::new(), + bluestreak_certificate_state: BTreeMap::new(), sailfish_timeout_certs: BTreeMap::new(), sailfish_novote_certs: BTreeMap::new(), }; @@ -536,10 +586,10 @@ impl DagState { } } - // Recover persisted active Sailfish++ certifications. Only the active, + // Recover persisted active certified vertices. Only the active, // ancestor-closed frontier is persisted; preliminary waiting state is // intentionally not recovered. - if consensus_protocol == ConsensusProtocol::SailfishPlusPlus { + if consensus_protocol.uses_dual_dag() { let certified_from_round = if use_windowed { inner.evicted_rounds.iter().copied().min().unwrap_or(0) } else { @@ -562,6 +612,19 @@ impl DagState { } } } + if consensus_protocol == ConsensusProtocol::Bluestreak { + let mut recovered_blocks: Vec<_> = inner + .index + .iter() + .flat_map(|auth_map| auth_map.values()) + .flat_map(|blocks| blocks.values().cloned()) + .collect(); + recovered_blocks.sort_by_key(|block| *block.reference()); + let mut activated = Vec::new(); + for block in recovered_blocks { + inner.check_bluestreak_pre_clean(&block, &committee, &mut activated); + } + } metrics.dag_state_entries.inc_by(block_count); metrics.dag_highest_round.set(inner.highest_round as i64); @@ -611,6 +674,9 @@ impl DagState { ConsensusProtocol::SailfishPlusPlus => { tracing::info!("Starting Sailfish++ protocol") } + ConsensusProtocol::Bluestreak => { + tracing::info!("Starting Bluestreak protocol") + } } let dag_state = Self { store: store.clone(), @@ -643,7 +709,7 @@ impl DagState { /// clock progress to `r` and quorum stake of RBC-certified vertices in /// round `r - 1`. Other protocols use the raw threshold clock directly. pub fn proposal_round(&self) -> RoundNumber { - if self.consensus_protocol != ConsensusProtocol::SailfishPlusPlus { + if !self.consensus_protocol.uses_dual_dag() { return self.threshold_clock_round(); } @@ -657,10 +723,7 @@ impl DagState { let prev_round = round - 1; let mut stake: Stake = 0; for auth in 0..inner.committee_size { - if inner.vertex_certificates[auth] - .iter() - .any(|reference| reference.round == prev_round) - { + if has_certificate_at_round(&inner.vertex_certificates[auth], prev_round) { stake += self .committee .get_stake(auth as AuthorityIndex) @@ -1112,6 +1175,18 @@ impl DagState { .contains(block_ref) } + /// Check whether the local dirty DAG has enough evidence to prove a + /// Bluestreak unprovable certificate for `leader_ref` at `block_round`. + pub fn has_bluestreak_certificate_evidence( + &self, + block_round: RoundNumber, + leader_ref: &BlockReference, + ) -> bool { + self.dag_state_inner + .read() + .has_bluestreak_certificate_evidence(block_round, leader_ref, &self.committee) + } + /// Check whether 2f+1 stake is certified at the given round /// (SailfishPlusPlus certified parent quorum gate). pub fn certified_parent_quorum(&self, round: RoundNumber) -> bool { @@ -1119,10 +1194,7 @@ impl DagState { let mut stake: Stake = 0; for auth in 0..inner.committee_size { // Check if this authority has any certified block at the round. - if inner.vertex_certificates[auth] - .iter() - .any(|r| r.round == round) - { + if has_certificate_at_round(&inner.vertex_certificates[auth], round) { stake += self .committee .get_stake(auth as AuthorityIndex) @@ -1479,7 +1551,7 @@ impl DagState { } } - if self.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + if self.consensus_protocol.uses_dual_dag() && quorum_round > 1 && !self.certified_parent_quorum(quorum_round - 1) { @@ -2261,19 +2333,43 @@ impl DagStateInner { later_block: &Data, earlier_block: &Data, ) -> bool { - let mut parents = AHashSet::from([later_block.clone()]); - for _round_number in (earlier_block.round()..later_block.round()).rev() { - parents = parents - .iter() - .flat_map(|block| block.block_references()) - .map(|block_reference| { - self.get_storage_block(*block_reference) - .expect("Block should be in DagState") - }) - .filter(|included_block| included_block.round() >= earlier_block.round()) - .collect(); + let target = *earlier_block.reference(); + let mut frontier = vec![later_block.clone()]; + let mut visited = AHashSet::new(); + + while let Some(block) = frontier.pop() { + let block_ref = *block.reference(); + if !visited.insert(block_ref) { + continue; + } + if block_ref == target { + return true; + } + if block.round() <= earlier_block.round() { + continue; + } + + for parent_ref in block.block_references() { + let parent = self + .get_storage_block(*parent_ref) + .expect("Block should be in DagState"); + if parent.round() >= earlier_block.round() { + frontier.push(parent); + } + } + if self.consensus_protocol == ConsensusProtocol::Bluestreak { + if let Some(cert_ref) = block.unprovable_certificate() { + let parent = self + .get_storage_block(*cert_ref) + .expect("Block should be in DagState"); + if parent.round() >= earlier_block.round() { + frontier.push(parent); + } + } + } } - parents.contains(earlier_block) + + false } /// Compute all block references reachable from `later_block` at @@ -2284,16 +2380,42 @@ impl DagStateInner { later_block: &Data, target_round: RoundNumber, ) -> AHashSet { - let mut frontier = AHashSet::from([later_block.clone()]); - for _ in (target_round..later_block.round()).rev() { - frontier = frontier - .iter() - .flat_map(|block| block.block_references()) - .filter_map(|r| self.get_storage_block(*r)) - .filter(|b| b.round() >= target_round) - .collect(); + let mut reachable = AHashSet::new(); + let mut frontier = vec![later_block.clone()]; + let mut visited = AHashSet::new(); + + while let Some(block) = frontier.pop() { + let block_ref = *block.reference(); + if !visited.insert(block_ref) { + continue; + } + if block.round() == target_round { + reachable.insert(block_ref); + continue; + } + if block.round() < target_round { + continue; + } + + for reference in block.block_references() { + if let Some(parent) = self.get_storage_block(*reference) { + if parent.round() >= target_round { + frontier.push(parent); + } + } + } + if self.consensus_protocol == ConsensusProtocol::Bluestreak { + if let Some(cert_ref) = block.unprovable_certificate() { + if let Some(parent) = self.get_storage_block(*cert_ref) { + if parent.round() >= target_round { + frontier.push(parent); + } + } + } + } } - frontier.iter().map(|b| *b.reference()).collect() + + reachable } /// Per-authority eviction using BTreeMap::split_off. @@ -2375,6 +2497,18 @@ impl DagStateInner { .retain(|child| child.round >= self.evicted_rounds[child.authority as usize]); !children.is_empty() }); + let split_ref = BlockReference { + authority: 0, + round: min_evicted, + digest: BlockDigest::default(), + }; + self.leader_vote_support = self.leader_vote_support.split_off(&split_ref); + self.bluestreak_certificate_state = self.bluestreak_certificate_state.split_off(&split_ref); + for state in self.bluestreak_certificate_state.values_mut() { + state.waiting_blocks.retain(|block_ref| { + block_ref.round >= self.evicted_rounds[block_ref.authority as usize] + }); + } // Prune Sailfish++ timeout and no-vote certificates. self.sailfish_timeout_certs = self.sailfish_timeout_certs.split_off(&min_evicted); self.sailfish_novote_certs = self.sailfish_novote_certs.split_off(&(min_evicted, 0)); @@ -2422,6 +2556,18 @@ impl DagStateInner { missing.push(*parent); } } + // Bluestreak: the unprovable_certificate target is also a + // causal dependency that must be ancestor-closed. + if self.consensus_protocol == ConsensusProtocol::Bluestreak { + if let Some(cert_ref) = block.unprovable_certificate() { + if cert_ref.round > 0 + && !self.vertex_certificates[cert_ref.authority as usize].contains(cert_ref) + && !missing.contains(cert_ref) + { + missing.push(*cert_ref); + } + } + } missing } @@ -2450,6 +2596,17 @@ impl DagStateInner { } } } + if let Some(cert_ref) = block.unprovable_certificate() { + if let Some(state) = self.bluestreak_certificate_state.get_mut(cert_ref) { + state.waiting_blocks.remove(&block_ref); + } + if let Some(children) = self.pending_vertex_certificate_children.get_mut(cert_ref) { + children.remove(&block_ref); + if children.is_empty() { + self.pending_vertex_certificate_children.remove(cert_ref); + } + } + } activated.push(block_ref); self.pending_persisted_vertex_certificates[auth].insert(block_ref); @@ -2500,6 +2657,7 @@ impl DagStateInner { self.update_data_availability(&block); self.update_starfish_speed_leader_hints(&block); self.update_inferred_sailfish_support(&block, committee, activated); + self.check_bluestreak_pre_clean(&block, committee, activated); } fn update_inferred_sailfish_support( @@ -2508,7 +2666,7 @@ impl DagStateInner { committee: &Committee, activated: &mut Vec, ) { - if self.consensus_protocol != ConsensusProtocol::SailfishPlusPlus || block.round() <= 1 { + if !self.consensus_protocol.uses_dual_dag() || block.round() <= 1 { return; } @@ -2532,6 +2690,111 @@ impl DagStateInner { } } + fn has_bluestreak_certificate_evidence( + &self, + block_round: RoundNumber, + leader_ref: &BlockReference, + committee: &Committee, + ) -> bool { + if leader_ref.round + 2 != block_round { + return false; + } + if self + .leader_vote_support + .get(leader_ref) + .is_some_and(|support| support.is_quorum(committee)) + { + return true; + } + self.bluestreak_certificate_state + .get(leader_ref) + .is_some_and(|state| state.propagation_support.is_quorum(committee)) + } + + fn record_leader_vote_support( + &mut self, + block: &VerifiedBlock, + committee: &Committee, + ready_certificates: &mut BTreeSet, + ) { + for parent in block.block_references() { + if parent.round == 0 || parent.round + 1 != block.round() { + continue; + } + if parent.authority != committee.elect_leader(parent.round) { + continue; + } + if self + .leader_vote_support + .entry(*parent) + .or_default() + .add(block.authority(), committee) + { + ready_certificates.insert(*parent); + } + } + } + + fn activate_bluestreak_waiters( + &mut self, + leader_ref: BlockReference, + activated: &mut Vec, + ) { + let waiting_blocks = self + .bluestreak_certificate_state + .get_mut(&leader_ref) + .map(|state| std::mem::take(&mut state.waiting_blocks)) + .unwrap_or_default(); + for block_ref in waiting_blocks { + self.note_vertex_certified(block_ref, activated); + } + } + + /// Bluestreak pre-clean check: a block is pre-clean if it has no + /// `unprovable_certificate`, or its certificate is verified via the dirty + /// DAG. Pre-clean blocks feed into the vertex certification pipeline. + fn check_bluestreak_pre_clean( + &mut self, + block: &VerifiedBlock, + committee: &Committee, + activated: &mut Vec, + ) { + if self.consensus_protocol != ConsensusProtocol::Bluestreak { + return; + } + let mut ready_certificates = BTreeSet::new(); + self.record_leader_vote_support(block, committee, &mut ready_certificates); + + let block_ref = *block.reference(); + match block.unprovable_certificate() { + None => { + self.note_vertex_certified(block_ref, activated); + } + Some(leader_ref) + if self.has_bluestreak_certificate_evidence( + block_ref.round, + leader_ref, + committee, + ) => + { + self.note_vertex_certified(block_ref, activated); + } + Some(leader_ref) => { + let state = self + .bluestreak_certificate_state + .entry(*leader_ref) + .or_default(); + state.waiting_blocks.insert(block_ref); + if state.propagation_support.add(block.authority(), committee) { + ready_certificates.insert(*leader_ref); + } + } + } + for leader_ref in ready_certificates { + self.activate_bluestreak_waiters(leader_ref, activated); + } + } + fn infer_vertex_certificate_closure( &mut self, root: BlockReference, @@ -2565,6 +2828,15 @@ impl DagStateInner { stack.push(*parent); } } + if self.consensus_protocol == ConsensusProtocol::Bluestreak { + if let Some(cert_ref) = block.unprovable_certificate() { + if cert_ref.round > 0 + && !self.vertex_certificates[cert_ref.authority as usize].contains(cert_ref) + { + stack.push(*cert_ref); + } + } + } } for block_ref in to_activate { @@ -2702,6 +2974,7 @@ impl DagStateInner { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak ) && block.transactions().is_none() && block.merkle_root() == TransactionsCommitment::new_from_transactions(&Vec::new()); if block.has_empty_payload() || is_empty_full_block { @@ -3295,7 +3568,8 @@ mod tests { let merkle_root = match consensus_protocol { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { TransactionsCommitment::new_from_transactions(&empty_transactions) } ConsensusProtocol::Starfish @@ -3329,7 +3603,8 @@ mod tests { let merkle_root = match consensus_protocol { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { TransactionsCommitment::new_from_transactions(&empty_transactions) } ConsensusProtocol::Starfish @@ -3883,6 +4158,91 @@ mod tests { assert!(!dag_state.mark_dac_certified(rejected, cert)); } + #[test] + fn bluestreak_reachability_follows_unprovable_certificate() { + let dag_state = open_test_dag_state_for("bluestreak", 0); + let empty_transactions = Vec::new(); + let commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); + + let mut leader = VerifiedBlock::new( + 1, + 1, + vec![BlockReference::new_test(1, 0)], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + ); + leader.preserialize(); + let leader = Data::new(leader); + let leader_ref = *leader.reference(); + + let mut own_round_one = VerifiedBlock::new( + 0, + 1, + vec![BlockReference::new_test(0, 0)], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + ); + own_round_one.preserialize(); + let own_round_one = Data::new(own_round_one); + + let mut own_prev = VerifiedBlock::new( + 0, + 2, + vec![*own_round_one.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + ); + own_prev.preserialize(); + let own_prev = Data::new(own_prev); + + let mut compress = VerifiedBlock::new_with_unprovable( + 0, + 3, + vec![*own_prev.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions, + commitment, + None, + None, + None, + Some(leader_ref), + ); + compress.preserialize(); + let compress = Data::new(compress); + + dag_state.insert_general_block(leader.clone(), DataSource::BlockBundleStreaming); + dag_state.insert_general_block(own_round_one, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(own_prev, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(compress.clone(), DataSource::BlockBundleStreaming); + + assert!(dag_state.linked(&compress, &leader)); + assert!( + dag_state + .reachable_at_round(&compress, 1) + .contains(&leader_ref) + ); + } + #[test] fn consensus_protocol_resolves_dissemination_defaults() { assert_eq!( diff --git a/crates/starfish-core/src/net_sync.rs b/crates/starfish-core/src/net_sync.rs index 73c2c2a..929732b 100644 --- a/crates/starfish-core/src/net_sync.rs +++ b/crates/starfish-core/src/net_sync.rs @@ -504,7 +504,8 @@ impl ConnectionHandler { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { blocks_with_transactions.push(block); } ConsensusProtocol::Starfish @@ -1605,6 +1606,7 @@ impl NetworkSyncer loop { let notified = inner.threshold_clock_notify.notified(); let round = if inner.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + || inner.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { inner.dag_state.proposal_round().saturating_sub(1) } else { diff --git a/crates/starfish-core/src/syncer.rs b/crates/starfish-core/src/syncer.rs index 578cb67..e2030b0 100644 --- a/crates/starfish-core/src/syncer.rs +++ b/crates/starfish-core/src/syncer.rs @@ -16,7 +16,7 @@ use crate::{ core::Core, dag_state::{ConsensusProtocol, DagState, DataSource}, data::Data, - metrics::{Metrics, UtilizationTimerVecExt}, + metrics::Metrics, runtime::timestamp_utc, sailfish_service::SailfishServiceMessage, types::{ @@ -388,16 +388,7 @@ impl Syncer { } pub fn try_new_commit(&mut self) { - let _timer = self - .metrics - .utilization_timer - .utilization_timer("Syncer::try_new_commit"); - let timer_core_commit = self - .metrics - .utilization_timer - .utilization_timer("Core::try_new_commit"); let (newly_committed, any_decided) = self.core.try_commit(); - drop(timer_core_commit); let utc_now = timestamp_utc(); if !newly_committed.is_empty() { let committed_refs: Vec<_> = newly_committed diff --git a/crates/starfish-core/src/threshold_clock.rs b/crates/starfish-core/src/threshold_clock.rs index 7490458..288b7b1 100644 --- a/crates/starfish-core/src/threshold_clock.rs +++ b/crates/starfish-core/src/threshold_clock.rs @@ -116,6 +116,7 @@ mod tests { strong_vote: None, bls: None, sailfish: None, + unprovable_certificate: None, serialized: None, } } diff --git a/crates/starfish-core/src/types.rs b/crates/starfish-core/src/types.rs index f3e2331..1f465a6 100644 --- a/crates/starfish-core/src/types.rs +++ b/crates/starfish-core/src/types.rs @@ -283,6 +283,9 @@ pub struct BlockHeader { /// Sailfish++ control certificates (timeout/no-vote). None for all other /// protocols. pub(crate) sailfish: Option>, + /// Bluestreak unprovable certificate: optional reference to a + /// leader at round r-2 certified by 2f+1 votes at r-1. + pub(crate) unprovable_certificate: Option, // -- Cache (not serialized) ----------------------------------------------- /// Cached bincode-serialized bytes. Populated by `preserialize()` off the @@ -415,6 +418,10 @@ impl BlockHeader { .unwrap_or(&[]) } + pub fn unprovable_certificate(&self) -> Option<&BlockReference> { + self.unprovable_certificate.as_ref() + } + pub fn sailfish(&self) -> Option<&SailfishFields> { self.sailfish.as_deref() } @@ -694,6 +701,36 @@ impl VerifiedBlock { strong_vote: Option, bls: Option, sailfish: Option, + ) -> Self { + Self::new_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + signature, + transactions, + merkle_root, + strong_vote, + bls, + sailfish, + None, + ) + } + + pub fn new_with_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: Vec, + acknowledgment_references: Vec, + meta_creation_time_ns: TimestampNs, + signature: SignatureBytes, + transactions: Vec, + merkle_root: TransactionsCommitment, + strong_vote: Option, + bls: Option, + sailfish: Option, + unprovable_certificate: Option, ) -> Self { let (acknowledgment_intersection, acknowledgment_references) = compress_acknowledgments(&block_references, &acknowledgment_references); @@ -706,7 +743,7 @@ impl VerifiedBlock { reference: BlockReference { authority, round, - digest: BlockDigest::new_without_transactions( + digest: BlockDigest::new_without_transactions_with_unprovable( authority, round, &block_references, @@ -715,6 +752,7 @@ impl VerifiedBlock { &signature, merkle_root, strong_vote, + unprovable_certificate.as_ref(), ), }, block_references, @@ -728,6 +766,7 @@ impl VerifiedBlock { strong_vote, bls: bls.map(Box::new), sailfish: sailfish.map(Box::new), + unprovable_certificate, serialized: None, }; @@ -771,6 +810,7 @@ impl VerifiedBlock { strong_vote: None, bls: None, sailfish: None, + unprovable_certificate: None, serialized: None, }; let mut block = Self { @@ -803,6 +843,53 @@ impl VerifiedBlock { precomputed_round_sig: Option, precomputed_leader_sig: Option, sailfish: Option, + ) -> Self { + Self::new_with_signer_and_unprovable( + authority, + round, + block_references, + voted_leader_ref, + acknowledgment_references, + meta_creation_time_ns, + signer, + bls_signer, + committee_opt, + aggregate_dac_sigs, + transactions, + encoded_transactions, + consensus_protocol, + strong_vote, + aggregate_round_sig, + certified_leader, + precomputed_round_sig, + precomputed_leader_sig, + sailfish, + None, + ) + } + + #[allow(clippy::too_many_arguments)] + pub fn new_with_signer_and_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: Vec, + voted_leader_ref: Option, + acknowledgment_references: Vec, + meta_creation_time_ns: TimestampNs, + signer: &Signer, + bls_signer: Option<&BlsSigner>, + committee_opt: Option<&Committee>, + aggregate_dac_sigs: Vec, + transactions: Vec, + encoded_transactions: Option>, + consensus_protocol: ConsensusProtocol, + strong_vote: Option, + aggregate_round_sig: Option, + certified_leader: Option<(BlockReference, BlsAggregateCertificate)>, + precomputed_round_sig: Option, + precomputed_leader_sig: Option, + sailfish: Option, + unprovable_certificate: Option, ) -> Self { let transactions_commitment = match consensus_protocol { ConsensusProtocol::Starfish @@ -820,7 +907,8 @@ impl VerifiedBlock { } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { TransactionsCommitment::new_from_transactions(&transactions) } }; @@ -836,7 +924,7 @@ impl VerifiedBlock { &acknowledgment_references, aggregate_dac_sigs, ); - let signature = signer.sign_block( + let signature = signer.sign_block_with_unprovable( authority, round, &block_references, @@ -844,6 +932,7 @@ impl VerifiedBlock { meta_creation_time_ns, transactions_commitment, strong_vote, + unprovable_certificate.as_ref(), ); // Build BLS fields when the StarfishBls path is active. Partial round @@ -872,7 +961,7 @@ impl VerifiedBlock { } }); - Self::new( + Self::new_with_unprovable( authority, round, block_references, @@ -884,6 +973,7 @@ impl VerifiedBlock { strong_vote, bls, sailfish, + unprovable_certificate, ) } @@ -965,6 +1055,10 @@ impl VerifiedBlock { self.header.is_strong_blame() } + pub fn unprovable_certificate(&self) -> Option<&BlockReference> { + self.header.unprovable_certificate() + } + pub fn sailfish(&self) -> Option<&SailfishFields> { self.header.sailfish() } @@ -1111,7 +1205,8 @@ impl VerifiedBlock { } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { let empty_transactions = Vec::new(); let empty_transactions_commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); @@ -1148,7 +1243,7 @@ impl VerifiedBlock { ); } let acknowledgments = self.header.acknowledgments(); - let digest = BlockDigest::new( + let digest = BlockDigest::new_with_unprovable( self.authority(), round, &self.header.block_references, @@ -1157,6 +1252,7 @@ impl VerifiedBlock { &self.header.signature, self.header.transactions_commitment, self.header.strong_vote, + self.header.unprovable_certificate(), ); ensure!( digest == self.digest(), @@ -1189,6 +1285,10 @@ impl VerifiedBlock { } match consensus_protocol { ConsensusProtocol::StarfishBls => { + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); let bls = self .header .bls() @@ -1296,6 +1396,10 @@ impl VerifiedBlock { self.header.bls().is_none(), "Only StarfishBls blocks may carry BLS fields" ); + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners => { ensure!( @@ -1310,6 +1414,56 @@ impl VerifiedBlock { self.header.bls().is_none(), "Only StarfishBls blocks may carry BLS fields" ); + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); + } + ConsensusProtocol::Bluestreak => { + ensure!( + acknowledgments.is_empty(), + "Bluestreak blocks must not carry acknowledgments" + ); + ensure!( + self.header.bls().is_none(), + "Only StarfishBls blocks may carry BLS fields" + ); + let is_leader = self.authority() == committee.elect_leader(round); + if !is_leader { + ensure!( + self.header.block_references.len() <= 2, + "Bluestreak non-leader may reference at most own_prev + leader" + ); + let _self_ref = *self + .header + .block_references + .iter() + .find(|r| r.authority == self.authority()) + .ok_or_else(|| { + eyre::eyre!( + "Bluestreak non-leader block must reference its own previous block" + ) + })?; + if let Some(cert_ref) = self.header.unprovable_certificate() { + ensure!( + round >= 3 && cert_ref.round + 2 == round, + "unprovable_certificate must reference leader at round r-2" + ); + ensure!( + cert_ref.authority == committee.elect_leader(cert_ref.round), + "unprovable_certificate must reference the elected leader" + ); + } + } else { + ensure!( + self.header.unprovable_certificate().is_none(), + "Bluestreak leaders must not carry unprovable_certificate" + ); + ensure!( + threshold_clock_valid_block_header(&self.header, committee), + "Bluestreak leader must reference 2f+1 blocks from previous round" + ); + } } ConsensusProtocol::SailfishPlusPlus => { ensure!( @@ -1324,6 +1478,10 @@ impl VerifiedBlock { self.header.bls().is_none(), "Only StarfishBls blocks may carry BLS fields" ); + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); if round > 1 { let prev_round = round - 1; let prev_leader = committee.elect_leader(prev_round); @@ -1971,6 +2129,7 @@ mod tests { strong_vote: None, bls: None, sailfish: None, + unprovable_certificate: None, serialized: None, }; @@ -2182,6 +2341,102 @@ mod tests { ); } + #[test] + fn verifies_bluestreak_non_leader_with_unprovable_certificate() { + let committee = Committee::new_for_benchmarks(4); + let signers = Signer::new_for_test(committee.len()); + let authority = committee + .authorities() + .find(|authority| *authority != committee.elect_leader(3)) + .unwrap(); + let round_2_leader = committee.elect_leader(2); + let round_1_leader = committee.elect_leader(1); + let mut block = VerifiedBlock::new_with_signer_and_unprovable( + authority, + 3, + vec![ + BlockReference::new_test(authority, 2), + BlockReference::new_test(round_2_leader, 2), + ], + None, + vec![], + 0, + &signers[authority as usize], + None, + None, + vec![], + vec![], + None, + ConsensusProtocol::Bluestreak, + None, + None, + None, + None, + None, + None, + Some(BlockReference::new_test(round_1_leader, 1)), + ); + + let mut encoder = Encoder::new(2, 4, 2).unwrap(); + block + .verify( + committee.as_ref(), + 0, + 1, + &mut encoder, + ConsensusProtocol::Bluestreak, + ) + .unwrap(); + } + + #[test] + fn rejects_bluestreak_leader_with_unprovable_certificate() { + let committee = Committee::new_for_benchmarks(4); + let signers = Signer::new_for_test(committee.len()); + let leader = committee.elect_leader(3); + let round_1_leader = committee.elect_leader(1); + let mut block = VerifiedBlock::new_with_signer_and_unprovable( + leader, + 3, + (0..committee.quorum_threshold() as AuthorityIndex) + .map(|authority| BlockReference::new_test(authority, 2)) + .collect(), + None, + vec![], + 0, + &signers[leader as usize], + None, + None, + vec![], + vec![], + None, + ConsensusProtocol::Bluestreak, + None, + None, + None, + None, + None, + None, + Some(BlockReference::new_test(round_1_leader, 1)), + ); + + let mut encoder = Encoder::new(2, 4, 2).unwrap(); + let err = block + .verify( + committee.as_ref(), + 0, + 1, + &mut encoder, + ConsensusProtocol::Bluestreak, + ) + .unwrap_err(); + + assert!( + err.to_string() + .contains("Bluestreak leaders must not carry unprovable_certificate") + ); + } + #[test] fn verifies_empty_mysticeti_block_without_transaction_data() { let committee = Committee::new_for_benchmarks(4); diff --git a/crates/starfish-core/src/validator.rs b/crates/starfish-core/src/validator.rs index 9e2d76d..90c24ef 100644 --- a/crates/starfish-core/src/validator.rs +++ b/crates/starfish-core/src/validator.rs @@ -310,11 +310,17 @@ mod smoke_tests { #[test_case("starfish-speed", 80)] #[test_case("starfish-bls", 100)] #[test_case("sailfish++", 120)] + #[test_case("bluestreak", 140)] #[tokio::test] async fn validator_commit(consensus: &str, port_offset: u16) { run_commit_test(consensus, port_offset).await; } + #[tokio::test] + async fn validator_commit_bluestreak_basic() { + run_commit_test("bluestreak", 150).await; + } + async fn run_sync_test(consensus: &str, port_offset: u16) { let committee_size = 4; let committee = Committee::new_for_benchmarks(committee_size); @@ -402,6 +408,7 @@ mod smoke_tests { #[test_case("starfish-speed", 180)] #[test_case("starfish-bls", 200)] #[test_case("sailfish++", 220)] + #[test_case("bluestreak", 260)] #[tokio::test] async fn validator_sync(consensus: &str, port_offset: u16) { run_sync_test(consensus, port_offset).await; @@ -465,6 +472,7 @@ mod smoke_tests { #[test_case("starfish-speed", 280)] #[test_case("starfish-bls", 300)] #[test_case("sailfish++", 320)] + #[test_case("bluestreak", 380)] #[tokio::test] async fn validator_crash_faults(consensus: &str, port_offset: u16) { run_crash_faults_test(consensus, port_offset).await; diff --git a/monitoring/grafana/grafana-dashboard.json b/monitoring/grafana/grafana-dashboard.json index 85215c7..c783ab3 100644 --- a/monitoring/grafana/grafana-dashboard.json +++ b/monitoring/grafana/grafana-dashboard.json @@ -222,8 +222,8 @@ "type": "prometheus", "uid": "Fixed-UID-testbed" }, - "expr": "quantile(0.5, transaction_mode_info{node=~\"$node\"}) == 0", - "legendFormat": "AllZero", + "expr": "max by (mode) (dissemination_mode_info{node=~\"$node\"})", + "legendFormat": "{{mode}}", "refId": "B", "instant": true }, @@ -232,8 +232,8 @@ "type": "prometheus", "uid": "Fixed-UID-testbed" }, - "expr": "quantile(0.5, transaction_mode_info{node=~\"$node\"}) == 1", - "legendFormat": "Random", + "expr": "quantile(0.5, transaction_mode_info{node=~\"$node\"}) == 0", + "legendFormat": "AllZero", "refId": "C", "instant": true }, @@ -242,8 +242,8 @@ "type": "prometheus", "uid": "Fixed-UID-testbed" }, - "expr": "quantile(0.5, storage_backend_info{node=~\"$node\"}) == 0", - "legendFormat": "RocksDB", + "expr": "quantile(0.5, transaction_mode_info{node=~\"$node\"}) == 1", + "legendFormat": "Random", "refId": "D", "instant": true }, @@ -252,8 +252,8 @@ "type": "prometheus", "uid": "Fixed-UID-testbed" }, - "expr": "quantile(0.5, storage_backend_info{node=~\"$node\"}) == 1", - "legendFormat": "TideHunter", + "expr": "quantile(0.5, storage_backend_info{node=~\"$node\"}) == 0", + "legendFormat": "RocksDB", "refId": "E", "instant": true }, @@ -262,8 +262,8 @@ "type": "prometheus", "uid": "Fixed-UID-testbed" }, - "expr": "max by (mode) (dissemination_mode_info{node=~\"$node\"})", - "legendFormat": "{{mode}}", + "expr": "quantile(0.5, storage_backend_info{node=~\"$node\"}) == 1", + "legendFormat": "TideHunter", "refId": "F", "instant": true } @@ -1500,7 +1500,7 @@ "barAlignment": 0, "barWidthFactor": 0.6, "drawStyle": "line", - "fillOpacity": 0, + "fillOpacity": 20, "gradientMode": "none", "hideFrom": { "legend": false, @@ -1518,7 +1518,7 @@ "spanNulls": false, "stacking": { "group": "A", - "mode": "none" + "mode": "normal" }, "thresholdsStyle": { "mode": "off" @@ -1570,10 +1570,10 @@ }, "disableTextWrap": false, "editorMode": "code", - "expr": "rate(utilization_timer{node=~\"$node\", proc=~\"Core::.*|BlockManager::.*|Committer::.*|Syncer::.*\"}[1m]) / 1000", + "expr": "avg by (proc) (rate(utilization_timer{node=~\"$node\", proc=~\"Core::.*|BlockManager::.*\"}[1m])) / 1000", "fullMetaSearch": false, "includeNullMetadata": true, - "legendFormat": "{{node}} - {{proc}}", + "legendFormat": "{{proc}}", "range": true, "refId": "A", "useBackend": false diff --git a/scripts/dryrun.sh b/scripts/dryrun.sh index aaf5115..6c48cb4 100755 --- a/scripts/dryrun.sh +++ b/scripts/dryrun.sh @@ -7,8 +7,8 @@ NUM_NODES=${NUM_NODES:-10} DESIRED_TPS=${DESIRED_TPS:-1000} # Options: starfish, starfish-speed, starfish-bls, -# cordial-miners, mysticeti -CONSENSUS=${CONSENSUS:-starfish-speed} +# cordial-miners, mysticeti, bluestreak +CONSENSUS=${CONSENSUS:-bluestreak} NUM_BYZANTINE_NODES=${NUM_BYZANTINE_NODES:-0} # Options: timeout-leader, leader-withholding, # equivocating-chains, equivocating-two-chains, @@ -24,7 +24,7 @@ STORAGE_BACKEND=${STORAGE_BACKEND:-rocksdb} TRANSACTION_MODE=${TRANSACTION_MODE:-random} # Dissemination mode: protocol-default (default) | pull | # push-causal | push-useful -DISSEMINATION_MODE=${DISSEMINATION_MODE:-push-causal} +DISSEMINATION_MODE=${DISSEMINATION_MODE:-protocol-default} # Enable lz4 network compression. # Auto-enabled for random transaction mode. # Set COMPRESS_NETWORK=1 or =0 to override.