From 9723f394fbe53a65b15fa891c44d7d2176841fe6 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 17:45:27 +0100 Subject: [PATCH 01/18] feat(core): add MysticetiCompress protocol variant with basic wiring Add MysticetiCompress to ConsensusProtocol enum with from_str parsing, helper methods (is_mysticeti_compress, uses_dual_dag), and wire into all match arms: PushCausal dissemination, no erasure coding, Blake3 transaction commitment, ancestor-based subdag collection, full-block format, wave_length=3 with pipeline=true (3 pipelined committers). --- crates/starfish-core/src/broadcaster.rs | 9 ++++-- .../starfish-core/src/consensus/linearizer.rs | 3 +- .../src/consensus/universal_committer.rs | 7 +++++ crates/starfish-core/src/core.rs | 3 +- crates/starfish-core/src/dag_state.rs | 28 ++++++++++++++++--- crates/starfish-core/src/net_sync.rs | 3 +- crates/starfish-core/src/types.rs | 10 +++++-- 7 files changed, 50 insertions(+), 13 deletions(-) diff --git a/crates/starfish-core/src/broadcaster.rs b/crates/starfish-core/src/broadcaster.rs index d772410..f4a519a 100644 --- a/crates/starfish-core/src/broadcaster.rs +++ b/crates/starfish-core/src/broadcaster.rs @@ -85,7 +85,8 @@ impl BroadcasterParameters { ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed | ConsensusProtocol::StarfishBls - | ConsensusProtocol::CordialMiners => Self { + | ConsensusProtocol::CordialMiners + | ConsensusProtocol::MysticetiCompress => Self { batch_own_block_size: committee_size, batch_other_block_size: committee_size * committee_size, batch_shard_size: committee_size * committee_size, @@ -417,7 +418,8 @@ where } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { let all_blocks = self.inner.dag_state.get_storage_blocks(&block_references); let mut blocks = Vec::new(); @@ -821,7 +823,8 @@ fn push_transport_format(consensus_protocol: ConsensusProtocol) -> PushOtherBloc | ConsensusProtocol::StarfishBls => PushOtherBlocksFormat::HeadersAndShards, ConsensusProtocol::CordialMiners | ConsensusProtocol::Mysticeti - | ConsensusProtocol::SailfishPlusPlus => PushOtherBlocksFormat::FullBlocks, + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => PushOtherBlocksFormat::FullBlocks, } } diff --git a/crates/starfish-core/src/consensus/linearizer.rs b/crates/starfish-core/src/consensus/linearizer.rs index 1c7c40f..0e44eb7 100644 --- a/crates/starfish-core/src/consensus/linearizer.rs +++ b/crates/starfish-core/src/consensus/linearizer.rs @@ -259,7 +259,8 @@ impl Linearizer { } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { self.collect_subdag_ancestors(dag_state, leader_block) } }; diff --git a/crates/starfish-core/src/consensus/universal_committer.rs b/crates/starfish-core/src/consensus/universal_committer.rs index bf4568f..ca0536e 100644 --- a/crates/starfish-core/src/consensus/universal_committer.rs +++ b/crates/starfish-core/src/consensus/universal_committer.rs @@ -432,6 +432,13 @@ impl UniversalCommitterBuilder { wave_length: WAVE_LENGTH, pipeline: false, }, + ConsensusProtocol::MysticetiCompress => Self { + committee, + dag_state, + metrics, + wave_length: WAVE_LENGTH, + pipeline: true, + }, ConsensusProtocol::CordialMiners => Self { committee, dag_state, diff --git a/crates/starfish-core/src/core.rs b/crates/starfish-core/src/core.rs index 8834c4d..7e9fb52 100644 --- a/crates/starfish-core/src/core.rs +++ b/crates/starfish-core/src/core.rs @@ -746,7 +746,8 @@ impl Core { )), ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => None, + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => None, } } diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index d9ed13c..1bcc678 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -101,6 +101,7 @@ pub enum ConsensusProtocol { StarfishSpeed, StarfishBls, SailfishPlusPlus, + MysticetiCompress, } impl ConsensusProtocol { @@ -112,6 +113,7 @@ impl ConsensusProtocol { "starfish-bls" | "starfish-l" => ConsensusProtocol::StarfishBls, "starfish-speed" | "starfish-s" => ConsensusProtocol::StarfishSpeed, "sailfish++" | "sailfish-pp" => ConsensusProtocol::SailfishPlusPlus, + "mysticeti-compress" => ConsensusProtocol::MysticetiCompress, _ => ConsensusProtocol::Starfish, } } @@ -129,13 +131,26 @@ impl ConsensusProtocol { matches!(self, ConsensusProtocol::SailfishPlusPlus) } + pub fn is_mysticeti_compress(self) -> bool { + matches!(self, ConsensusProtocol::MysticetiCompress) + } + + /// Protocols that use a dual dirty/clean DAG with vertex certification. + pub fn uses_dual_dag(self) -> bool { + matches!( + self, + ConsensusProtocol::SailfishPlusPlus | ConsensusProtocol::MysticetiCompress + ) + } + pub fn default_dissemination_mode(self) -> DisseminationMode { match self { ConsensusProtocol::Mysticeti | ConsensusProtocol::SailfishPlusPlus => { DisseminationMode::Pull } - ConsensusProtocol::CordialMiners => DisseminationMode::PushCausal, - ConsensusProtocol::Starfish + ConsensusProtocol::MysticetiCompress + | ConsensusProtocol::CordialMiners + | ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed | ConsensusProtocol::StarfishBls => DisseminationMode::PushCausal, } @@ -611,6 +626,9 @@ impl DagState { ConsensusProtocol::SailfishPlusPlus => { tracing::info!("Starting Sailfish++ protocol") } + ConsensusProtocol::MysticetiCompress => { + tracing::info!("Starting MysticetiCompress protocol") + } } let dag_state = Self { store: store.clone(), @@ -3295,7 +3313,8 @@ mod tests { let merkle_root = match consensus_protocol { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { TransactionsCommitment::new_from_transactions(&empty_transactions) } ConsensusProtocol::Starfish @@ -3329,7 +3348,8 @@ mod tests { let merkle_root = match consensus_protocol { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { TransactionsCommitment::new_from_transactions(&empty_transactions) } ConsensusProtocol::Starfish diff --git a/crates/starfish-core/src/net_sync.rs b/crates/starfish-core/src/net_sync.rs index 73c2c2a..e1bd378 100644 --- a/crates/starfish-core/src/net_sync.rs +++ b/crates/starfish-core/src/net_sync.rs @@ -504,7 +504,8 @@ impl ConnectionHandler { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { blocks_with_transactions.push(block); } ConsensusProtocol::Starfish diff --git a/crates/starfish-core/src/types.rs b/crates/starfish-core/src/types.rs index f3e2331..05a3f64 100644 --- a/crates/starfish-core/src/types.rs +++ b/crates/starfish-core/src/types.rs @@ -820,7 +820,8 @@ impl VerifiedBlock { } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { TransactionsCommitment::new_from_transactions(&transactions) } }; @@ -1111,7 +1112,8 @@ impl VerifiedBlock { } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { let empty_transactions = Vec::new(); let empty_transactions_commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); @@ -1297,7 +1299,9 @@ impl VerifiedBlock { "Only StarfishBls blocks may carry BLS fields" ); } - ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners => { + ConsensusProtocol::Mysticeti + | ConsensusProtocol::CordialMiners + | ConsensusProtocol::MysticetiCompress => { ensure!( acknowledgments.is_empty(), "{consensus_protocol:?} blocks must not carry acknowledgments" From b0bd2a3debe39e1ae258097069ef113607d5fe34 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 18:08:01 +0100 Subject: [PATCH 02/18] feat(core): add unprovable_certificate field to BlockHeader Add Optional field for MysticetiCompress unprovable certificate mechanism. Includes accessors on BlockHeader and VerifiedBlock. All existing callers pass None. Digest integration deferred to block building step. --- crates/starfish-core/src/threshold_clock.rs | 1 + crates/starfish-core/src/types.rs | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/crates/starfish-core/src/threshold_clock.rs b/crates/starfish-core/src/threshold_clock.rs index 7490458..288b7b1 100644 --- a/crates/starfish-core/src/threshold_clock.rs +++ b/crates/starfish-core/src/threshold_clock.rs @@ -116,6 +116,7 @@ mod tests { strong_vote: None, bls: None, sailfish: None, + unprovable_certificate: None, serialized: None, } } diff --git a/crates/starfish-core/src/types.rs b/crates/starfish-core/src/types.rs index 05a3f64..5ee6f94 100644 --- a/crates/starfish-core/src/types.rs +++ b/crates/starfish-core/src/types.rs @@ -283,6 +283,9 @@ pub struct BlockHeader { /// Sailfish++ control certificates (timeout/no-vote). None for all other /// protocols. pub(crate) sailfish: Option>, + /// MysticetiCompress unprovable certificate: optional reference to a + /// leader at round r-2 certified by 2f+1 votes at r-1. + pub(crate) unprovable_certificate: Option, // -- Cache (not serialized) ----------------------------------------------- /// Cached bincode-serialized bytes. Populated by `preserialize()` off the @@ -415,6 +418,10 @@ impl BlockHeader { .unwrap_or(&[]) } + pub fn unprovable_certificate(&self) -> Option<&BlockReference> { + self.unprovable_certificate.as_ref() + } + pub fn sailfish(&self) -> Option<&SailfishFields> { self.sailfish.as_deref() } @@ -728,6 +735,7 @@ impl VerifiedBlock { strong_vote, bls: bls.map(Box::new), sailfish: sailfish.map(Box::new), + unprovable_certificate: None, serialized: None, }; @@ -966,6 +974,10 @@ impl VerifiedBlock { self.header.is_strong_blame() } + pub fn unprovable_certificate(&self) -> Option<&BlockReference> { + self.header.unprovable_certificate() + } + pub fn sailfish(&self) -> Option<&SailfishFields> { self.header.sailfish() } @@ -1975,6 +1987,7 @@ mod tests { strong_vote: None, bls: None, sailfish: None, + unprovable_certificate: None, serialized: None, }; From 874fde2e015869a6ee4e9239cfacff17dcc7801b Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 18:11:11 +0100 Subject: [PATCH 03/18] feat(core): add unprovable_certificate field to BlockHeader for MysticetiCompress Add optional unprovable_certificate field to BlockHeader to support the MysticetiCompress protocol. This field references a leader block at round r-2 that is claimed to have been seen by 2f+1 validators at round r-1, and is part of the signed block hash for compression. Changes: - Add unprovable_certificate: Option field to BlockHeader - Add accessor methods unprovable_certificate() to BlockHeader and VerifiedBlock - Initialize field to None in all BlockHeader constructors - Update BlockDigest hashing to include unprovable_certificate in digest - Update all Block Digest::new and new_without_transactions calls to pass parameter - Update Signer::sign_block and PublicKey::verify_signature_in_block to handle field Co-Authored-By: Claude --- crates/starfish-core/src/types.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/starfish-core/src/types.rs b/crates/starfish-core/src/types.rs index 5ee6f94..a940967 100644 --- a/crates/starfish-core/src/types.rs +++ b/crates/starfish-core/src/types.rs @@ -779,6 +779,7 @@ impl VerifiedBlock { strong_vote: None, bls: None, sailfish: None, + unprovable_certificate: None, serialized: None, }; let mut block = Self { From 3b1bc0c66194c9900d6e8b814b7c449e36b7896a Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 18:12:58 +0100 Subject: [PATCH 04/18] feat(core): non-leader reference compression and unprovable_certificate MysticetiCompress non-leaders include at most own_prev + leader at r-1. Leaders keep the full clean-DAG frontier. compute_unprovable_certificate checks direct evidence (2f+1 votes at r-1) and propagation (f+1 at r). --- crates/starfish-core/src/core.rs | 95 ++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/crates/starfish-core/src/core.rs b/crates/starfish-core/src/core.rs index 7e9fb52..8838f28 100644 --- a/crates/starfish-core/src/core.rs +++ b/crates/starfish-core/src/core.rs @@ -936,6 +936,19 @@ impl Core { sailfish_fields, ); + // MysticetiCompress: compute and set unprovable_certificate for + // non-leader blocks. + if self.dag_state.consensus_protocol == ConsensusProtocol::MysticetiCompress + && self.committee.elect_leader(clock_round) != self.authority + && clock_round >= 3 + { + let cert = self.compute_unprovable_certificate( + clock_round, + block.block_references(), + ); + block.header.unprovable_certificate = cert; + } + self.metrics .proposed_block_refs .observe(block_ref_count as f64); @@ -947,6 +960,60 @@ impl Core { Data::new(block) } + /// Compute the unprovable certificate for a MysticetiCompress non-leader + /// block at `clock_round`. Returns a reference to the leader at r-2 if + /// 2f+1 blocks at r-1 reference it (direct evidence) or f+1 blocks at r + /// already propagate the same certificate. + fn compute_unprovable_certificate( + &self, + clock_round: RoundNumber, + _block_references: &[BlockReference], + ) -> Option { + let leader_round = clock_round - 2; + let leader = self.committee.elect_leader(leader_round); + let leader_blocks = + self.dag_state + .get_blocks_at_authority_round(leader, leader_round); + let leader_ref = *leader_blocks.first()?.reference(); + + // Direct evidence: 2f+1 blocks at r-1 reference this leader. + let vote_round = clock_round - 1; + let vote_blocks = self.dag_state.get_blocks_by_round(vote_round); + let mut vote_stake: u64 = 0; + for block in &vote_blocks { + if block + .block_references() + .iter() + .any(|r| *r == leader_ref) + { + vote_stake += self + .committee + .get_stake(block.authority()) + .unwrap_or(0); + } + } + if self.committee.is_quorum(vote_stake) { + return Some(leader_ref); + } + + // Propagation: f+1 blocks at round r carry the same certificate. + let current_blocks = self.dag_state.get_blocks_by_round(clock_round); + let mut prop_stake: u64 = 0; + for block in ¤t_blocks { + if block.unprovable_certificate() == Some(&leader_ref) { + prop_stake += self + .committee + .get_stake(block.authority()) + .unwrap_or(0); + } + } + if prop_stake >= self.committee.validity_threshold() { + return Some(leader_ref); + } + + None + } + /// Compute SailfishPlusPlus control fields for a new block at /// `clock_round`. /// @@ -1074,6 +1141,34 @@ impl Core { }) .collect(); } + // MysticetiCompress: non-leaders include only the leader at r-1 + // (own_prev is always prepended by build_block). Leaders keep the + // full clean-DAG frontier for the quorum requirement. + if self.dag_state.consensus_protocol == ConsensusProtocol::MysticetiCompress { + let is_leader = self.committee.elect_leader(block_round) == self.authority; + if !is_leader { + let prev_round = block_round.saturating_sub(1); + let leader = self.committee.elect_leader(prev_round); + return pending_refs + .iter() + .copied() + .filter(|r| r.authority == leader && r.round == prev_round) + .take(1) + .collect(); + } + // Leaders: keep all previous-round refs (same as SailfishPP). + let prev_round = block_round.saturating_sub(1); + let mut seen = AHashSet::new(); + return pending_refs + .iter() + .copied() + .filter(|r| { + r.authority != self.authority + && seen.insert(*r) + && r.round >= prev_round + }) + .collect(); + } if self.dag_state.consensus_protocol == ConsensusProtocol::StarfishBls { if self.committee.elect_leader(block_round) != self.authority { return Vec::new(); From fe4d4bb2705972baeecd1be72aaf2b6db6007ee5 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 18:14:07 +0100 Subject: [PATCH 05/18] refactor(core): generalize dual DAG logic for MysticetiCompress Replace SailfishPlusPlus-specific checks with uses_dual_dag() helper: - Certified ref recovery now applies to both SailfishPlusPlus and MysticetiCompress - proposal_round logic for vertex certification applies to both dual DAG protocols This enables MysticetiCompress to benefit from the same vertex certification and proposal timing mechanisms as SailfishPlusPlus. Co-Authored-By: Claude --- crates/starfish-core/src/dag_state.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index 1bcc678..6241021 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -554,7 +554,7 @@ impl DagState { // Recover persisted active Sailfish++ certifications. Only the active, // ancestor-closed frontier is persisted; preliminary waiting state is // intentionally not recovered. - if consensus_protocol == ConsensusProtocol::SailfishPlusPlus { + if consensus_protocol.uses_dual_dag() { let certified_from_round = if use_windowed { inner.evicted_rounds.iter().copied().min().unwrap_or(0) } else { @@ -661,7 +661,7 @@ impl DagState { /// clock progress to `r` and quorum stake of RBC-certified vertices in /// round `r - 1`. Other protocols use the raw threshold clock directly. pub fn proposal_round(&self) -> RoundNumber { - if self.consensus_protocol != ConsensusProtocol::SailfishPlusPlus { + if !self.consensus_protocol.uses_dual_dag() { return self.threshold_clock_round(); } @@ -1497,7 +1497,7 @@ impl DagState { } } - if self.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + if self.consensus_protocol.uses_dual_dag() && quorum_round > 1 && !self.certified_parent_quorum(quorum_round - 1) { From d63c403c9cdceba11ab5fb3808c04be161e89e01 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 18:14:19 +0100 Subject: [PATCH 06/18] refactor(core): use uses_dual_dag() helper for protocol checks Replace explicit SailfishPlusPlus checks with uses_dual_dag() helper in dag_state.rs to support both SailfishPlusPlus and MysticetiCompress protocols for: - Vertex certification recovery from storage - Proposal round calculation (requires certified vertices) - Certified parent quorum checks This enables MysticetiCompress to benefit from the dual DAG infrastructure (clean/dirty DAG with vertex certification). Co-Authored-By: Claude --- crates/starfish-core/src/dag_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index 6241021..6a8c0fd 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -2526,7 +2526,7 @@ impl DagStateInner { committee: &Committee, activated: &mut Vec, ) { - if self.consensus_protocol != ConsensusProtocol::SailfishPlusPlus || block.round() <= 1 { + if !self.consensus_protocol.uses_dual_dag() || block.round() <= 1 { return; } From e357c3d877c96859ca1c64e2aa1ed81fb34c095d Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 18:15:36 +0100 Subject: [PATCH 07/18] feat(core): add unprovable_certificate dependency tracking for MysticetiCompress Ensure unprovable_certificate targets are included in missing dependencies check. When a MysticetiCompress block references an unprovable certificate, that certificate block must be present in the vertex certification set before the block can be processed. This maintains ancestor-closure invariant for the clean DAG. Co-Authored-By: Claude --- crates/starfish-core/src/dag_state.rs | 88 +++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index 6a8c0fd..d9f4a7b 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -2440,6 +2440,19 @@ impl DagStateInner { missing.push(*parent); } } + // MysticetiCompress: the unprovable_certificate target is also a + // causal dependency that must be ancestor-closed. + if self.consensus_protocol == ConsensusProtocol::MysticetiCompress { + if let Some(cert_ref) = block.unprovable_certificate() { + if cert_ref.round > 0 + && !self.vertex_certificates[cert_ref.authority as usize] + .contains(cert_ref) + && !missing.contains(cert_ref) + { + missing.push(*cert_ref); + } + } + } missing } @@ -2518,6 +2531,7 @@ impl DagStateInner { self.update_data_availability(&block); self.update_starfish_speed_leader_hints(&block); self.update_inferred_sailfish_support(&block, committee, activated); + self.check_compress_pre_clean(&block, committee, activated); } fn update_inferred_sailfish_support( @@ -2550,6 +2564,80 @@ impl DagStateInner { } } + /// MysticetiCompress pre-clean check: a block is pre-clean if it has no + /// `unprovable_certificate`, or its certificate is verified via the dirty + /// DAG. Pre-clean blocks feed into the vertex certification pipeline. + fn check_compress_pre_clean( + &mut self, + block: &VerifiedBlock, + committee: &Committee, + activated: &mut Vec, + ) { + if self.consensus_protocol != ConsensusProtocol::MysticetiCompress { + return; + } + let block_ref = *block.reference(); + let is_pre_clean = match block.unprovable_certificate() { + None => true, + Some(leader_ref) => { + self.verify_unprovable_certificate_local( + block_ref.round, + leader_ref, + committee, + ) + } + }; + if is_pre_clean { + self.note_vertex_certified(block_ref, activated); + } + } + + /// Verify an unprovable certificate against the local dirty DAG. + /// Returns true if 2f+1 blocks at r-1 reference the leader (direct) + /// or f+1 blocks at r propagate the same certificate. + fn verify_unprovable_certificate_local( + &self, + block_round: RoundNumber, + leader_ref: &BlockReference, + committee: &Committee, + ) -> bool { + // Direct: 2f+1 at r-1 reference the leader. + let vote_round = block_round.saturating_sub(1); + let mut stake: Stake = 0; + for auth_dag in &self.index { + if let Some(round_blocks) = auth_dag.get(&vote_round) { + for block in round_blocks.values() { + if block + .block_references() + .iter() + .any(|r| r == leader_ref) + { + stake += committee + .get_stake(block.authority()) + .unwrap_or(0); + } + } + } + } + if committee.is_quorum(stake) { + return true; + } + // Propagation: f+1 at round r carry the same certificate. + let mut prop_stake: Stake = 0; + for auth_dag in &self.index { + if let Some(round_blocks) = auth_dag.get(&block_round) { + for block in round_blocks.values() { + if block.unprovable_certificate() == Some(leader_ref) { + prop_stake += committee + .get_stake(block.authority()) + .unwrap_or(0); + } + } + } + } + prop_stake >= committee.validity_threshold() + } + fn infer_vertex_certificate_closure( &mut self, root: BlockReference, From 67ae40f6c942405f13274bf0e9aba51e64dca4ed Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 20:38:08 +0100 Subject: [PATCH 08/18] feat(core): add Bluestreak protocol --- crates/orchestrator/src/benchmark.rs | 3 +- crates/orchestrator/src/main.rs | 2 +- crates/starfish-core/src/broadcaster.rs | 6 +- .../starfish-core/src/consensus/linearizer.rs | 10 +- .../src/consensus/universal_committer.rs | 227 ++++++++++++- crates/starfish-core/src/core.rs | 173 +++++----- crates/starfish-core/src/crypto.rs | 88 +++++ crates/starfish-core/src/dag_state.rs | 316 ++++++++++++++---- crates/starfish-core/src/net_sync.rs | 3 +- crates/starfish-core/src/types.rs | 259 +++++++++++++- crates/starfish-core/src/validator.rs | 8 + scripts/dryrun.sh | 2 +- 12 files changed, 925 insertions(+), 172 deletions(-) diff --git a/crates/orchestrator/src/benchmark.rs b/crates/orchestrator/src/benchmark.rs index f8687f4..b9ec6b3 100644 --- a/crates/orchestrator/src/benchmark.rs +++ b/crates/orchestrator/src/benchmark.rs @@ -38,7 +38,8 @@ pub struct BenchmarkParametersGeneric { /// paying for data sent between the nodes. pub use_internal_ip_address: bool, // Consensus protocol to deploy - // (starfish | starfish-speed | starfish-bls | mysticeti | cordial-miners) + // (starfish | starfish-speed | starfish-bls | mysticeti | + // cordial-miners | bluestreak) pub consensus_protocol: String, /// number Byzantine nodes pub byzantine_nodes: usize, diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 4a5ae98..8c5b581 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -115,7 +115,7 @@ pub enum Operation { /// Consensus to deploy. Available options: /// starfish | starfish-speed | starfish-bls | mysticeti | - /// cordial-miners + /// cordial-miners | bluestreak #[clap(long, value_name = "STRING", default_value = "starfish", global = true)] consensus: String, diff --git a/crates/starfish-core/src/broadcaster.rs b/crates/starfish-core/src/broadcaster.rs index f4a519a..b9cd166 100644 --- a/crates/starfish-core/src/broadcaster.rs +++ b/crates/starfish-core/src/broadcaster.rs @@ -86,7 +86,7 @@ impl BroadcasterParameters { | ConsensusProtocol::StarfishSpeed | ConsensusProtocol::StarfishBls | ConsensusProtocol::CordialMiners - | ConsensusProtocol::MysticetiCompress => Self { + | ConsensusProtocol::Bluestreak => Self { batch_own_block_size: committee_size, batch_other_block_size: committee_size * committee_size, batch_shard_size: committee_size * committee_size, @@ -419,7 +419,7 @@ where ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => { + | ConsensusProtocol::Bluestreak => { let all_blocks = self.inner.dag_state.get_storage_blocks(&block_references); let mut blocks = Vec::new(); @@ -824,7 +824,7 @@ fn push_transport_format(consensus_protocol: ConsensusProtocol) -> PushOtherBloc ConsensusProtocol::CordialMiners | ConsensusProtocol::Mysticeti | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => PushOtherBlocksFormat::FullBlocks, + | ConsensusProtocol::Bluestreak => PushOtherBlocksFormat::FullBlocks, } } diff --git a/crates/starfish-core/src/consensus/linearizer.rs b/crates/starfish-core/src/consensus/linearizer.rs index 0e44eb7..fddc58b 100644 --- a/crates/starfish-core/src/consensus/linearizer.rs +++ b/crates/starfish-core/src/consensus/linearizer.rs @@ -100,6 +100,7 @@ impl Linearizer { let mut to_commit = Vec::new(); let leader_block_ref = *leader_block.reference(); let min_round = leader_block_ref.round.saturating_sub(MAX_TRAVERSAL_DEPTH); + let follow_unprovable = dag_state.consensus_protocol == ConsensusProtocol::Bluestreak; assert!(self.committed.insert(leader_block_ref)); let mut current_level = vec![leader_block]; @@ -115,6 +116,13 @@ impl Linearizer { next_refs.push(*reference); } } + if follow_unprovable { + if let Some(cert_ref) = x.unprovable_certificate() { + if cert_ref.round >= min_round && self.committed.insert(*cert_ref) { + next_refs.push(*cert_ref); + } + } + } } if next_refs.is_empty() { break; @@ -260,7 +268,7 @@ impl Linearizer { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => { + | ConsensusProtocol::Bluestreak => { self.collect_subdag_ancestors(dag_state, leader_block) } }; diff --git a/crates/starfish-core/src/consensus/universal_committer.rs b/crates/starfish-core/src/consensus/universal_committer.rs index ca0536e..636365a 100644 --- a/crates/starfish-core/src/consensus/universal_committer.rs +++ b/crates/starfish-core/src/consensus/universal_committer.rs @@ -43,6 +43,9 @@ impl UniversalCommitter { if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus { return self.try_commit_sailfish(last_decided); } + if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { + return self.try_commit_bluestreak(last_decided); + } let highest_known_round = self.dag_state.highest_round(); let last_decided_round = last_decided.round(); @@ -180,6 +183,133 @@ impl UniversalCommitter { .collect() } + fn try_commit_bluestreak(&mut self, last_decided: BlockReference) -> Vec { + let highest_known_round = self.dag_state.highest_round(); + let last_decided_round = last_decided.round(); + let highest_anchor = highest_known_round.saturating_sub(2); + let mut committed = Vec::new(); + let mut newly_committed = AHashSet::new(); + + for round in last_decided_round + 1..=highest_anchor { + let leader = self.committee.elect_leader(round); + + if self.check_direct_skip_bluestreak(leader, round) { + let key = (leader, round); + if !self.decided.contains_key(&key) { + let status = LeaderStatus::Skip(leader, round); + self.decided.insert(key, status.clone()); + if self.metrics_emitted.insert(key) { + tracing::debug!("Decided {status}"); + self.update_metrics(&status, true); + } + committed.push(status); + } + continue; + } + + let Some(anchor) = self.try_direct_commit_bluestreak(leader, round) else { + continue; + }; + + let mut chain = vec![anchor.clone()]; + let mut current = anchor; + for prev_round in (last_decided_round + 1..current.round()).rev() { + let prev_leader = self.committee.elect_leader(prev_round); + let prev_key = (prev_leader, prev_round); + if newly_committed.contains(&prev_key) { + continue; + } + + let mut linked_leaders: Vec<_> = self + .dag_state + .get_blocks_at_authority_round(prev_leader, prev_round) + .into_iter() + .filter(|block| self.dag_state.has_vertex_certificate(block.reference())) + .filter(|block| self.dag_state.linked(¤t, block)) + .collect(); + + if linked_leaders.len() > 1 { + panic!( + "[Bluestreak] More than one linked leader for {}", + format_authority_round(prev_leader, prev_round) + ); + } + + if let Some(prev) = linked_leaders.pop() { + current = prev.clone(); + chain.push(prev); + } + } + + chain.reverse(); + for leader_block in chain { + let key = (leader_block.authority(), leader_block.round()); + if !newly_committed.insert(key) { + continue; + } + let direct_decide = key.1 == round; + let status = LeaderStatus::Commit(leader_block, None); + self.decided.insert(key, status.clone()); + if self.metrics_emitted.insert(key) { + tracing::debug!("Decided {status}"); + self.update_metrics(&status, direct_decide); + } + committed.push(status); + } + } + + committed.sort(); + committed + } + + fn try_direct_commit_bluestreak( + &self, + leader: AuthorityIndex, + leader_round: RoundNumber, + ) -> Option> { + let cert_round = leader_round + 2; + let leader_blocks = self + .dag_state + .get_blocks_at_authority_round(leader, leader_round); + + for leader_block in leader_blocks { + if !self + .dag_state + .has_vertex_certificate(leader_block.reference()) + { + continue; + } + let leader_ref = *leader_block.reference(); + let cert_blocks = self.dag_state.get_blocks_by_round_cached(cert_round); + let mut supporters = StakeAggregator::::new(); + for block in cert_blocks.iter() { + if block.unprovable_certificate() == Some(&leader_ref) + && supporters.add(block.authority(), &self.committee) + { + return Some(leader_block); + } + } + } + + None + } + + fn check_direct_skip_bluestreak(&self, leader: AuthorityIndex, round: RoundNumber) -> bool { + let vote_round = round + 1; + let vote_blocks = self.dag_state.get_blocks_by_round_cached(vote_round); + let mut non_voters = StakeAggregator::::new(); + for block in vote_blocks.iter() { + let votes_for = block + .block_references() + .iter() + .any(|r| r.round == round && r.authority == leader); + if !votes_for && non_voters.add(block.authority(), &self.committee) { + return true; + } + } + false + } + fn try_commit_sailfish(&mut self, last_decided: BlockReference) -> Vec { let highest_known_round = self.dag_state.highest_round(); let last_decided_round = last_decided.round(); @@ -432,7 +562,7 @@ impl UniversalCommitterBuilder { wave_length: WAVE_LENGTH, pipeline: false, }, - ConsensusProtocol::MysticetiCompress => Self { + ConsensusProtocol::Bluestreak => Self { committee, dag_state, metrics, @@ -573,4 +703,99 @@ mod tests { "expected round-1 leader to commit with f+1 certified supporters in round 2" ); } + + #[test] + fn bluestreak_direct_commit_uses_unprovable_certificates() { + let dag_state = open_test_dag_state_for("bluestreak", 0); + let committee = Committee::new_for_benchmarks(4); + let registry = Registry::new(); + let (metrics, _reporter) = Metrics::new( + ®istry, + Some(committee.as_ref()), + Some("bluestreak"), + None, + ); + + let leader = make_full_block(1, 1, vec![BlockReference::new_test(1, 0)]); + let leader_ref = *leader.reference(); + let vote_a = make_full_block(0, 2, vec![leader_ref]); + let vote_b = make_full_block(2, 2, vec![leader_ref]); + let vote_c = make_full_block(3, 2, vec![leader_ref]); + + let empty_transactions = Vec::new(); + let commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); + + let mut cert_a = crate::types::VerifiedBlock::new_with_unprovable( + 0, + 3, + vec![*vote_a.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + Some(leader_ref), + ); + cert_a.preserialize(); + let cert_a = Data::new(cert_a); + + let mut cert_b = crate::types::VerifiedBlock::new_with_unprovable( + 2, + 3, + vec![*vote_b.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + Some(leader_ref), + ); + cert_b.preserialize(); + let cert_b = Data::new(cert_b); + + let mut cert_c = crate::types::VerifiedBlock::new_with_unprovable( + 3, + 3, + vec![*vote_c.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions, + commitment, + None, + None, + None, + Some(leader_ref), + ); + cert_c.preserialize(); + let cert_c = Data::new(cert_c); + + dag_state.insert_general_block(leader, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(vote_a, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(vote_b, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(vote_c, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(cert_a, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(cert_b, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(cert_c, DataSource::BlockBundleStreaming); + + let mut committer = UniversalCommitterBuilder::new(committee, dag_state, metrics).build(); + let decided = committer.try_commit(BlockReference::new_test(0, 0)); + + assert!( + decided.iter().any(|status| { + matches!( + status, + LeaderStatus::Commit(block, None) + if block.authority() == 1 && block.round() == 1 + ) + }), + "expected round-1 leader to commit from round-3 unprovable certificates" + ); + } } diff --git a/crates/starfish-core/src/core.rs b/crates/starfish-core/src/core.rs index 8838f28..22af544 100644 --- a/crates/starfish-core/src/core.rs +++ b/crates/starfish-core/src/core.rs @@ -13,7 +13,7 @@ use crate::{ block_handler::BlockHandler, block_manager::BlockManager, bls_certificate_aggregator::{BlsCertificateAggregator, apply_certificate_events}, - committee::Committee, + committee::{Committee, QuorumThreshold, StakeAggregator, ValidityThreshold}, config::NodePrivateConfig, consensus::{ CommitMetastate, @@ -480,7 +480,7 @@ impl Core { } // SailfishPlusPlus: require certified parent quorum before creating a block. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + if self.dag_state.consensus_protocol.uses_dual_dag() && clock_round > 1 && !self.dag_state.certified_parent_quorum(clock_round - 1) { @@ -523,9 +523,13 @@ impl Core { // set below threshold-clock quorum, we cannot build a valid block yet. // Requeue both transactions and include refs so the next attempt sees // them again. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + let allow_own_prev_only = self.dag_state.consensus_protocol + == ConsensusProtocol::Bluestreak + && self.committee.elect_leader(clock_round.saturating_sub(1)) == self.authority; + if self.dag_state.consensus_protocol.uses_dual_dag() && clock_round > 1 && block_references.is_empty() + && !allow_own_prev_only { for r in raw_refs { self.pending.push(MetaTransaction::Include(r)); @@ -688,15 +692,19 @@ impl Core { self.compress_pending_block_references(&pending_refs, block_round); // SailfishPlusPlus: filter parents to only include certified blocks. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus { + if self.dag_state.consensus_protocol.uses_dual_dag() { block_references.retain(|r| r.round == 0 || self.dag_state.has_vertex_certificate(r)); } // SailfishPlusPlus: verify the filtered parent set, together with the // creator's own previous block (always included by build_block), still // has quorum stake at round-1. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + let compress_non_leader = self.dag_state.consensus_protocol + == ConsensusProtocol::Bluestreak + && self.committee.elect_leader(block_round) != self.authority; + if self.dag_state.consensus_protocol.uses_dual_dag() && block_round > 1 + && !compress_non_leader { let prev_round = block_round - 1; let mut prev_round_stake: u64 = 0; @@ -747,7 +755,7 @@ impl Core { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => None, + | ConsensusProtocol::Bluestreak => None, } } @@ -913,8 +921,17 @@ impl Core { } else { None }; + let unprovable_certificate = if self.dag_state.consensus_protocol + == ConsensusProtocol::Bluestreak + && self.committee.elect_leader(clock_round) != self.authority + && clock_round >= 3 + { + self.compute_unprovable_certificate(clock_round, &block_references) + } else { + None + }; - let mut block = VerifiedBlock::new_with_signer( + let mut block = VerifiedBlock::new_with_signer_and_unprovable( self.authority, clock_round, block_references, @@ -934,21 +951,9 @@ impl Core { precomputed_round_sig, precomputed_leader_sig, sailfish_fields, + unprovable_certificate, ); - // MysticetiCompress: compute and set unprovable_certificate for - // non-leader blocks. - if self.dag_state.consensus_protocol == ConsensusProtocol::MysticetiCompress - && self.committee.elect_leader(clock_round) != self.authority - && clock_round >= 3 - { - let cert = self.compute_unprovable_certificate( - clock_round, - block.block_references(), - ); - block.header.unprovable_certificate = cert; - } - self.metrics .proposed_block_refs .observe(block_ref_count as f64); @@ -960,60 +965,6 @@ impl Core { Data::new(block) } - /// Compute the unprovable certificate for a MysticetiCompress non-leader - /// block at `clock_round`. Returns a reference to the leader at r-2 if - /// 2f+1 blocks at r-1 reference it (direct evidence) or f+1 blocks at r - /// already propagate the same certificate. - fn compute_unprovable_certificate( - &self, - clock_round: RoundNumber, - _block_references: &[BlockReference], - ) -> Option { - let leader_round = clock_round - 2; - let leader = self.committee.elect_leader(leader_round); - let leader_blocks = - self.dag_state - .get_blocks_at_authority_round(leader, leader_round); - let leader_ref = *leader_blocks.first()?.reference(); - - // Direct evidence: 2f+1 blocks at r-1 reference this leader. - let vote_round = clock_round - 1; - let vote_blocks = self.dag_state.get_blocks_by_round(vote_round); - let mut vote_stake: u64 = 0; - for block in &vote_blocks { - if block - .block_references() - .iter() - .any(|r| *r == leader_ref) - { - vote_stake += self - .committee - .get_stake(block.authority()) - .unwrap_or(0); - } - } - if self.committee.is_quorum(vote_stake) { - return Some(leader_ref); - } - - // Propagation: f+1 blocks at round r carry the same certificate. - let current_blocks = self.dag_state.get_blocks_by_round(clock_round); - let mut prop_stake: u64 = 0; - for block in ¤t_blocks { - if block.unprovable_certificate() == Some(&leader_ref) { - prop_stake += self - .committee - .get_stake(block.authority()) - .unwrap_or(0); - } - } - if prop_stake >= self.committee.validity_threshold() { - return Some(leader_ref); - } - - None - } - /// Compute SailfishPlusPlus control fields for a new block at /// `clock_round`. /// @@ -1092,6 +1043,46 @@ impl Core { true } + fn compute_unprovable_certificate( + &self, + clock_round: RoundNumber, + _block_references: &[BlockReference], + ) -> Option { + let leader_round = clock_round.checked_sub(2)?; + let leader = self.committee.elect_leader(leader_round); + let leader_block = self + .dag_state + .get_blocks_at_authority_round(leader, leader_round) + .into_iter() + .next()?; + let leader_ref = *leader_block.reference(); + + let vote_round = clock_round.checked_sub(1)?; + let vote_blocks = self.dag_state.get_blocks_by_round_cached(vote_round); + let mut vote_stake = StakeAggregator::::new(); + for block in vote_blocks.iter() { + if block.block_references().iter().any(|r| *r == leader_ref) { + vote_stake.add(block.authority(), &self.committee); + } + } + if vote_stake.is_quorum(&self.committee) { + return Some(leader_ref); + } + + let current_round_blocks = self.dag_state.get_blocks_by_round_cached(clock_round); + let mut propagation_stake = StakeAggregator::::new(); + for block in current_round_blocks.iter() { + if block.unprovable_certificate() == Some(&leader_ref) { + propagation_stake.add(block.authority(), &self.committee); + } + } + if propagation_stake.get_stake() >= self.committee.validity_threshold() { + return Some(leader_ref); + } + + None + } + fn prepare_last_blocks(&mut self) { let target = match self.dag_state.byzantine_strategy { Some( @@ -1127,6 +1118,30 @@ impl Core { pending_refs: &[BlockReference], block_round: RoundNumber, ) -> Vec { + if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { + let is_leader = self.committee.elect_leader(block_round) == self.authority; + if !is_leader { + let prev_round = block_round.saturating_sub(1); + let leader = self.committee.elect_leader(prev_round); + return pending_refs + .iter() + .copied() + .filter(|r| r.authority == leader && r.round == prev_round) + .take(1) + .collect(); + } + + let prev_round = block_round.saturating_sub(1); + let mut seen = AHashSet::new(); + return pending_refs + .iter() + .copied() + .filter(|r| { + r.authority != self.authority && seen.insert(*r) && r.round >= prev_round + }) + .collect(); + } + // SailfishPlusPlus: keep all previous-round references unconditionally // so that certified-parent filtering doesn't drop below quorum. // Only older-round references go through normal compression. @@ -1141,10 +1156,10 @@ impl Core { }) .collect(); } - // MysticetiCompress: non-leaders include only the leader at r-1 + // Bluestreak: non-leaders include only the leader at r-1 // (own_prev is always prepended by build_block). Leaders keep the // full clean-DAG frontier for the quorum requirement. - if self.dag_state.consensus_protocol == ConsensusProtocol::MysticetiCompress { + if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { let is_leader = self.committee.elect_leader(block_round) == self.authority; if !is_leader { let prev_round = block_round.saturating_sub(1); @@ -1163,9 +1178,7 @@ impl Core { .iter() .copied() .filter(|r| { - r.authority != self.authority - && seen.insert(*r) - && r.round >= prev_round + r.authority != self.authority && seen.insert(*r) && r.round >= prev_round }) .collect(); } @@ -1396,7 +1409,7 @@ impl Core { } fn flush_pending_sailfish_certified_refs(&self) { - if self.dag_state.consensus_protocol != ConsensusProtocol::SailfishPlusPlus { + if !self.dag_state.consensus_protocol.uses_dual_dag() { return; } self.dag_state.flush_pending_sailfish_certified_refs(); diff --git a/crates/starfish-core/src/crypto.rs b/crates/starfish-core/src/crypto.rs index a1583c5..f5ca401 100644 --- a/crates/starfish-core/src/crypto.rs +++ b/crates/starfish-core/src/crypto.rs @@ -184,6 +184,30 @@ impl BlockDigest { signature: &SignatureBytes, merkle_root: TransactionsCommitment, strong_vote: Option, + ) -> Self { + Self::new_without_transactions_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + signature, + merkle_root, + strong_vote, + None, + ) + } + + pub fn new_without_transactions_with_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: &[BlockReference], + acknowledgment_references: &[BlockReference], + meta_creation_time_ns: TimestampNs, + signature: &SignatureBytes, + merkle_root: TransactionsCommitment, + strong_vote: Option, + unprovable_certificate: Option<&BlockReference>, ) -> Self { let mut hasher = Blake3Hasher::new(); Self::digest_without_signature( @@ -196,6 +220,7 @@ impl BlockDigest { merkle_root, strong_vote, ); + Self::hash_unprovable_certificate(&mut hasher, unprovable_certificate); hasher.update(signature.as_bytes()); Self(hasher.finalize().into()) } @@ -209,6 +234,30 @@ impl BlockDigest { signature: &SignatureBytes, transactions_commitment: TransactionsCommitment, strong_vote: Option, + ) -> Self { + Self::new_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + signature, + transactions_commitment, + strong_vote, + None, + ) + } + + pub fn new_with_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: &[BlockReference], + acknowledgment_references: &[BlockReference], + meta_creation_time_ns: TimestampNs, + signature: &SignatureBytes, + transactions_commitment: TransactionsCommitment, + strong_vote: Option, + unprovable_certificate: Option<&BlockReference>, ) -> Self { let mut hasher = Blake3Hasher::new(); Self::digest_without_signature( @@ -221,6 +270,7 @@ impl BlockDigest { transactions_commitment, strong_vote, ); + Self::hash_unprovable_certificate(&mut hasher, unprovable_certificate); hasher.update(signature.as_bytes()); Self(hasher.finalize().into()) } @@ -250,6 +300,19 @@ impl BlockDigest { CryptoHash::crypto_hash(&mask, hasher); } } + + /// Extend a block digest hasher with the Bluestreak unprovable + /// certificate reference. Called after `digest_without_signature` and + /// before finalizing. No-op when `None`, preserving backward + /// compatibility. + pub(crate) fn hash_unprovable_certificate( + hasher: &mut Blake3Hasher, + unprovable_certificate: Option<&BlockReference>, + ) { + if let Some(cert_ref) = unprovable_certificate { + cert_ref.crypto_hash(hasher); + } + } } impl From<[u8; BLOCK_DIGEST_SIZE]> for BlockDigest { @@ -418,6 +481,7 @@ impl PublicKey { header.merkle_root(), header.strong_vote(), ); + BlockDigest::hash_unprovable_certificate(&mut hasher, header.unprovable_certificate()); let digest: [u8; BLOCK_DIGEST_SIZE] = hasher.finalize().into(); self.0.verify(&signature, digest.as_ref()) } @@ -461,6 +525,29 @@ impl Signer { meta_creation_time_ns: TimestampNs, transactions_commitment: TransactionsCommitment, strong_vote: Option, + ) -> SignatureBytes { + self.sign_block_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + transactions_commitment, + strong_vote, + None, + ) + } + + pub fn sign_block_with_unprovable( + &self, + authority: AuthorityIndex, + round: RoundNumber, + block_references: &[BlockReference], + acknowledgment_references: &[BlockReference], + meta_creation_time_ns: TimestampNs, + transactions_commitment: TransactionsCommitment, + strong_vote: Option, + unprovable_certificate: Option<&BlockReference>, ) -> SignatureBytes { let mut hasher = Blake3Hasher::new(); BlockDigest::digest_without_signature( @@ -473,6 +560,7 @@ impl Signer { transactions_commitment, strong_vote, ); + BlockDigest::hash_unprovable_certificate(&mut hasher, unprovable_certificate); let digest: [u8; BLOCK_DIGEST_SIZE] = hasher.finalize().into(); let signature = self.0.sign(digest.as_ref()); SignatureBytes(signature.to_bytes()) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index d9f4a7b..6e0db05 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -101,7 +101,7 @@ pub enum ConsensusProtocol { StarfishSpeed, StarfishBls, SailfishPlusPlus, - MysticetiCompress, + Bluestreak, } impl ConsensusProtocol { @@ -113,7 +113,7 @@ impl ConsensusProtocol { "starfish-bls" | "starfish-l" => ConsensusProtocol::StarfishBls, "starfish-speed" | "starfish-s" => ConsensusProtocol::StarfishSpeed, "sailfish++" | "sailfish-pp" => ConsensusProtocol::SailfishPlusPlus, - "mysticeti-compress" => ConsensusProtocol::MysticetiCompress, + "bluestreak" => ConsensusProtocol::Bluestreak, _ => ConsensusProtocol::Starfish, } } @@ -131,15 +131,15 @@ impl ConsensusProtocol { matches!(self, ConsensusProtocol::SailfishPlusPlus) } - pub fn is_mysticeti_compress(self) -> bool { - matches!(self, ConsensusProtocol::MysticetiCompress) + pub fn is_bluestreak(self) -> bool { + matches!(self, ConsensusProtocol::Bluestreak) } /// Protocols that use a dual dirty/clean DAG with vertex certification. pub fn uses_dual_dag(self) -> bool { matches!( self, - ConsensusProtocol::SailfishPlusPlus | ConsensusProtocol::MysticetiCompress + ConsensusProtocol::SailfishPlusPlus | ConsensusProtocol::Bluestreak ) } @@ -148,7 +148,7 @@ impl ConsensusProtocol { ConsensusProtocol::Mysticeti | ConsensusProtocol::SailfishPlusPlus => { DisseminationMode::Pull } - ConsensusProtocol::MysticetiCompress + ConsensusProtocol::Bluestreak | ConsensusProtocol::CordialMiners | ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed @@ -551,7 +551,7 @@ impl DagState { } } - // Recover persisted active Sailfish++ certifications. Only the active, + // Recover persisted active certified vertices. Only the active, // ancestor-closed frontier is persisted; preliminary waiting state is // intentionally not recovered. if consensus_protocol.uses_dual_dag() { @@ -577,6 +577,19 @@ impl DagState { } } } + if consensus_protocol == ConsensusProtocol::Bluestreak { + let mut recovered_blocks: Vec<_> = inner + .index + .iter() + .flat_map(|auth_map| auth_map.values()) + .flat_map(|blocks| blocks.values().cloned()) + .collect(); + recovered_blocks.sort_by_key(|block| *block.reference()); + let mut activated = Vec::new(); + for block in recovered_blocks { + inner.check_bluestreak_pre_clean(&block, &committee, &mut activated); + } + } metrics.dag_state_entries.inc_by(block_count); metrics.dag_highest_round.set(inner.highest_round as i64); @@ -626,8 +639,8 @@ impl DagState { ConsensusProtocol::SailfishPlusPlus => { tracing::info!("Starting Sailfish++ protocol") } - ConsensusProtocol::MysticetiCompress => { - tracing::info!("Starting MysticetiCompress protocol") + ConsensusProtocol::Bluestreak => { + tracing::info!("Starting Bluestreak protocol") } } let dag_state = Self { @@ -2279,19 +2292,43 @@ impl DagStateInner { later_block: &Data, earlier_block: &Data, ) -> bool { - let mut parents = AHashSet::from([later_block.clone()]); - for _round_number in (earlier_block.round()..later_block.round()).rev() { - parents = parents - .iter() - .flat_map(|block| block.block_references()) - .map(|block_reference| { - self.get_storage_block(*block_reference) - .expect("Block should be in DagState") - }) - .filter(|included_block| included_block.round() >= earlier_block.round()) - .collect(); + let target = *earlier_block.reference(); + let mut frontier = vec![later_block.clone()]; + let mut visited = AHashSet::new(); + + while let Some(block) = frontier.pop() { + let block_ref = *block.reference(); + if !visited.insert(block_ref) { + continue; + } + if block_ref == target { + return true; + } + if block.round() <= earlier_block.round() { + continue; + } + + for parent_ref in block.block_references() { + let parent = self + .get_storage_block(*parent_ref) + .expect("Block should be in DagState"); + if parent.round() >= earlier_block.round() { + frontier.push(parent); + } + } + if self.consensus_protocol == ConsensusProtocol::Bluestreak { + if let Some(cert_ref) = block.unprovable_certificate() { + let parent = self + .get_storage_block(*cert_ref) + .expect("Block should be in DagState"); + if parent.round() >= earlier_block.round() { + frontier.push(parent); + } + } + } } - parents.contains(earlier_block) + + false } /// Compute all block references reachable from `later_block` at @@ -2302,16 +2339,42 @@ impl DagStateInner { later_block: &Data, target_round: RoundNumber, ) -> AHashSet { - let mut frontier = AHashSet::from([later_block.clone()]); - for _ in (target_round..later_block.round()).rev() { - frontier = frontier - .iter() - .flat_map(|block| block.block_references()) - .filter_map(|r| self.get_storage_block(*r)) - .filter(|b| b.round() >= target_round) - .collect(); + let mut reachable = AHashSet::new(); + let mut frontier = vec![later_block.clone()]; + let mut visited = AHashSet::new(); + + while let Some(block) = frontier.pop() { + let block_ref = *block.reference(); + if !visited.insert(block_ref) { + continue; + } + if block.round() == target_round { + reachable.insert(block_ref); + continue; + } + if block.round() < target_round { + continue; + } + + for reference in block.block_references() { + if let Some(parent) = self.get_storage_block(*reference) { + if parent.round() >= target_round { + frontier.push(parent); + } + } + } + if self.consensus_protocol == ConsensusProtocol::Bluestreak { + if let Some(cert_ref) = block.unprovable_certificate() { + if let Some(parent) = self.get_storage_block(*cert_ref) { + if parent.round() >= target_round { + frontier.push(parent); + } + } + } + } } - frontier.iter().map(|b| *b.reference()).collect() + + reachable } /// Per-authority eviction using BTreeMap::split_off. @@ -2440,13 +2503,12 @@ impl DagStateInner { missing.push(*parent); } } - // MysticetiCompress: the unprovable_certificate target is also a + // Bluestreak: the unprovable_certificate target is also a // causal dependency that must be ancestor-closed. - if self.consensus_protocol == ConsensusProtocol::MysticetiCompress { + if self.consensus_protocol == ConsensusProtocol::Bluestreak { if let Some(cert_ref) = block.unprovable_certificate() { if cert_ref.round > 0 - && !self.vertex_certificates[cert_ref.authority as usize] - .contains(cert_ref) + && !self.vertex_certificates[cert_ref.authority as usize].contains(cert_ref) && !missing.contains(cert_ref) { missing.push(*cert_ref); @@ -2481,6 +2543,14 @@ impl DagStateInner { } } } + if let Some(cert_ref) = block.unprovable_certificate() { + if let Some(children) = self.pending_vertex_certificate_children.get_mut(cert_ref) { + children.remove(&block_ref); + if children.is_empty() { + self.pending_vertex_certificate_children.remove(cert_ref); + } + } + } activated.push(block_ref); self.pending_persisted_vertex_certificates[auth].insert(block_ref); @@ -2531,7 +2601,7 @@ impl DagStateInner { self.update_data_availability(&block); self.update_starfish_speed_leader_hints(&block); self.update_inferred_sailfish_support(&block, committee, activated); - self.check_compress_pre_clean(&block, committee, activated); + self.check_bluestreak_pre_clean(&block, committee, activated); } fn update_inferred_sailfish_support( @@ -2564,78 +2634,94 @@ impl DagStateInner { } } - /// MysticetiCompress pre-clean check: a block is pre-clean if it has no + /// Bluestreak pre-clean check: a block is pre-clean if it has no /// `unprovable_certificate`, or its certificate is verified via the dirty /// DAG. Pre-clean blocks feed into the vertex certification pipeline. - fn check_compress_pre_clean( + fn check_bluestreak_pre_clean( &mut self, block: &VerifiedBlock, committee: &Committee, activated: &mut Vec, ) { - if self.consensus_protocol != ConsensusProtocol::MysticetiCompress { + if self.consensus_protocol != ConsensusProtocol::Bluestreak { return; } let block_ref = *block.reference(); let is_pre_clean = match block.unprovable_certificate() { None => true, Some(leader_ref) => { - self.verify_unprovable_certificate_local( - block_ref.round, - leader_ref, - committee, - ) + self.verify_unprovable_certificate(block_ref.round, leader_ref, committee) } }; if is_pre_clean { self.note_vertex_certified(block_ref, activated); } + self.reevaluate_bluestreak_pending(committee, activated); } /// Verify an unprovable certificate against the local dirty DAG. /// Returns true if 2f+1 blocks at r-1 reference the leader (direct) /// or f+1 blocks at r propagate the same certificate. - fn verify_unprovable_certificate_local( + fn verify_unprovable_certificate( &self, block_round: RoundNumber, leader_ref: &BlockReference, committee: &Committee, ) -> bool { - // Direct: 2f+1 at r-1 reference the leader. - let vote_round = block_round.saturating_sub(1); - let mut stake: Stake = 0; - for auth_dag in &self.index { - if let Some(round_blocks) = auth_dag.get(&vote_round) { - for block in round_blocks.values() { - if block - .block_references() - .iter() - .any(|r| r == leader_ref) - { - stake += committee - .get_stake(block.authority()) - .unwrap_or(0); - } - } + if leader_ref.round + 2 != block_round { + return false; + } + + let vote_round = match block_round.checked_sub(1) { + Some(round) => round, + None => return false, + }; + let mut stake = StakeAggregator::::new(); + for block in self.get_blocks_by_round(vote_round) { + if block.block_references().iter().any(|r| r == leader_ref) { + stake.add(block.authority(), committee); } } - if committee.is_quorum(stake) { + if stake.is_quorum(committee) { return true; } - // Propagation: f+1 at round r carry the same certificate. - let mut prop_stake: Stake = 0; - for auth_dag in &self.index { - if let Some(round_blocks) = auth_dag.get(&block_round) { + + let mut propagation = StakeAggregator::::new(); + for block in self.get_blocks_by_round(block_round) { + if block.unprovable_certificate() == Some(leader_ref) { + propagation.add(block.authority(), committee); + } + } + propagation.get_stake() >= committee.validity_threshold() + } + + fn reevaluate_bluestreak_pending( + &mut self, + committee: &Committee, + activated: &mut Vec, + ) { + let mut ready = Vec::new(); + for auth_map in &self.index { + for round_blocks in auth_map.values() { for block in round_blocks.values() { - if block.unprovable_certificate() == Some(leader_ref) { - prop_stake += committee - .get_stake(block.authority()) - .unwrap_or(0); + let block_ref = *block.reference(); + if self.vertex_certificates[block_ref.authority as usize].contains(&block_ref) { + continue; + } + let Some(leader_ref) = block.unprovable_certificate() else { + continue; + }; + if self.verify_unprovable_certificate(block.round(), leader_ref, committee) { + ready.push(block_ref); } } } } - prop_stake >= committee.validity_threshold() + ready.sort(); + ready.dedup(); + for block_ref in ready { + self.note_vertex_certified(block_ref, activated); + } } fn infer_vertex_certificate_closure( @@ -2808,6 +2894,7 @@ impl DagStateInner { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak ) && block.transactions().is_none() && block.merkle_root() == TransactionsCommitment::new_from_transactions(&Vec::new()); if block.has_empty_payload() || is_empty_full_block { @@ -3402,7 +3489,7 @@ mod tests { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => { + | ConsensusProtocol::Bluestreak => { TransactionsCommitment::new_from_transactions(&empty_transactions) } ConsensusProtocol::Starfish @@ -3437,7 +3524,7 @@ mod tests { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => { + | ConsensusProtocol::Bluestreak => { TransactionsCommitment::new_from_transactions(&empty_transactions) } ConsensusProtocol::Starfish @@ -3991,6 +4078,91 @@ mod tests { assert!(!dag_state.mark_dac_certified(rejected, cert)); } + #[test] + fn bluestreak_reachability_follows_unprovable_certificate() { + let dag_state = open_test_dag_state_for("bluestreak", 0); + let empty_transactions = Vec::new(); + let commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); + + let mut leader = VerifiedBlock::new( + 1, + 1, + vec![BlockReference::new_test(1, 0)], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + ); + leader.preserialize(); + let leader = Data::new(leader); + let leader_ref = *leader.reference(); + + let mut own_round_one = VerifiedBlock::new( + 0, + 1, + vec![BlockReference::new_test(0, 0)], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + ); + own_round_one.preserialize(); + let own_round_one = Data::new(own_round_one); + + let mut own_prev = VerifiedBlock::new( + 0, + 2, + vec![*own_round_one.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + ); + own_prev.preserialize(); + let own_prev = Data::new(own_prev); + + let mut compress = VerifiedBlock::new_with_unprovable( + 0, + 3, + vec![*own_prev.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions, + commitment, + None, + None, + None, + Some(leader_ref), + ); + compress.preserialize(); + let compress = Data::new(compress); + + dag_state.insert_general_block(leader.clone(), DataSource::BlockBundleStreaming); + dag_state.insert_general_block(own_round_one, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(own_prev, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(compress.clone(), DataSource::BlockBundleStreaming); + + assert!(dag_state.linked(&compress, &leader)); + assert!( + dag_state + .reachable_at_round(&compress, 1) + .contains(&leader_ref) + ); + } + #[test] fn consensus_protocol_resolves_dissemination_defaults() { assert_eq!( diff --git a/crates/starfish-core/src/net_sync.rs b/crates/starfish-core/src/net_sync.rs index e1bd378..929732b 100644 --- a/crates/starfish-core/src/net_sync.rs +++ b/crates/starfish-core/src/net_sync.rs @@ -505,7 +505,7 @@ impl ConnectionHandler { + | ConsensusProtocol::Bluestreak => { blocks_with_transactions.push(block); } ConsensusProtocol::Starfish @@ -1606,6 +1606,7 @@ impl NetworkSyncer loop { let notified = inner.threshold_clock_notify.notified(); let round = if inner.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + || inner.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { inner.dag_state.proposal_round().saturating_sub(1) } else { diff --git a/crates/starfish-core/src/types.rs b/crates/starfish-core/src/types.rs index a940967..1f465a6 100644 --- a/crates/starfish-core/src/types.rs +++ b/crates/starfish-core/src/types.rs @@ -283,7 +283,7 @@ pub struct BlockHeader { /// Sailfish++ control certificates (timeout/no-vote). None for all other /// protocols. pub(crate) sailfish: Option>, - /// MysticetiCompress unprovable certificate: optional reference to a + /// Bluestreak unprovable certificate: optional reference to a /// leader at round r-2 certified by 2f+1 votes at r-1. pub(crate) unprovable_certificate: Option, @@ -701,6 +701,36 @@ impl VerifiedBlock { strong_vote: Option, bls: Option, sailfish: Option, + ) -> Self { + Self::new_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + signature, + transactions, + merkle_root, + strong_vote, + bls, + sailfish, + None, + ) + } + + pub fn new_with_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: Vec, + acknowledgment_references: Vec, + meta_creation_time_ns: TimestampNs, + signature: SignatureBytes, + transactions: Vec, + merkle_root: TransactionsCommitment, + strong_vote: Option, + bls: Option, + sailfish: Option, + unprovable_certificate: Option, ) -> Self { let (acknowledgment_intersection, acknowledgment_references) = compress_acknowledgments(&block_references, &acknowledgment_references); @@ -713,7 +743,7 @@ impl VerifiedBlock { reference: BlockReference { authority, round, - digest: BlockDigest::new_without_transactions( + digest: BlockDigest::new_without_transactions_with_unprovable( authority, round, &block_references, @@ -722,6 +752,7 @@ impl VerifiedBlock { &signature, merkle_root, strong_vote, + unprovable_certificate.as_ref(), ), }, block_references, @@ -735,7 +766,7 @@ impl VerifiedBlock { strong_vote, bls: bls.map(Box::new), sailfish: sailfish.map(Box::new), - unprovable_certificate: None, + unprovable_certificate, serialized: None, }; @@ -812,6 +843,53 @@ impl VerifiedBlock { precomputed_round_sig: Option, precomputed_leader_sig: Option, sailfish: Option, + ) -> Self { + Self::new_with_signer_and_unprovable( + authority, + round, + block_references, + voted_leader_ref, + acknowledgment_references, + meta_creation_time_ns, + signer, + bls_signer, + committee_opt, + aggregate_dac_sigs, + transactions, + encoded_transactions, + consensus_protocol, + strong_vote, + aggregate_round_sig, + certified_leader, + precomputed_round_sig, + precomputed_leader_sig, + sailfish, + None, + ) + } + + #[allow(clippy::too_many_arguments)] + pub fn new_with_signer_and_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: Vec, + voted_leader_ref: Option, + acknowledgment_references: Vec, + meta_creation_time_ns: TimestampNs, + signer: &Signer, + bls_signer: Option<&BlsSigner>, + committee_opt: Option<&Committee>, + aggregate_dac_sigs: Vec, + transactions: Vec, + encoded_transactions: Option>, + consensus_protocol: ConsensusProtocol, + strong_vote: Option, + aggregate_round_sig: Option, + certified_leader: Option<(BlockReference, BlsAggregateCertificate)>, + precomputed_round_sig: Option, + precomputed_leader_sig: Option, + sailfish: Option, + unprovable_certificate: Option, ) -> Self { let transactions_commitment = match consensus_protocol { ConsensusProtocol::Starfish @@ -830,7 +908,7 @@ impl VerifiedBlock { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => { + | ConsensusProtocol::Bluestreak => { TransactionsCommitment::new_from_transactions(&transactions) } }; @@ -846,7 +924,7 @@ impl VerifiedBlock { &acknowledgment_references, aggregate_dac_sigs, ); - let signature = signer.sign_block( + let signature = signer.sign_block_with_unprovable( authority, round, &block_references, @@ -854,6 +932,7 @@ impl VerifiedBlock { meta_creation_time_ns, transactions_commitment, strong_vote, + unprovable_certificate.as_ref(), ); // Build BLS fields when the StarfishBls path is active. Partial round @@ -882,7 +961,7 @@ impl VerifiedBlock { } }); - Self::new( + Self::new_with_unprovable( authority, round, block_references, @@ -894,6 +973,7 @@ impl VerifiedBlock { strong_vote, bls, sailfish, + unprovable_certificate, ) } @@ -1126,7 +1206,7 @@ impl VerifiedBlock { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => { + | ConsensusProtocol::Bluestreak => { let empty_transactions = Vec::new(); let empty_transactions_commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); @@ -1163,7 +1243,7 @@ impl VerifiedBlock { ); } let acknowledgments = self.header.acknowledgments(); - let digest = BlockDigest::new( + let digest = BlockDigest::new_with_unprovable( self.authority(), round, &self.header.block_references, @@ -1172,6 +1252,7 @@ impl VerifiedBlock { &self.header.signature, self.header.transactions_commitment, self.header.strong_vote, + self.header.unprovable_certificate(), ); ensure!( digest == self.digest(), @@ -1204,6 +1285,10 @@ impl VerifiedBlock { } match consensus_protocol { ConsensusProtocol::StarfishBls => { + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); let bls = self .header .bls() @@ -1311,10 +1396,12 @@ impl VerifiedBlock { self.header.bls().is_none(), "Only StarfishBls blocks may carry BLS fields" ); + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); } - ConsensusProtocol::Mysticeti - | ConsensusProtocol::CordialMiners - | ConsensusProtocol::MysticetiCompress => { + ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners => { ensure!( acknowledgments.is_empty(), "{consensus_protocol:?} blocks must not carry acknowledgments" @@ -1327,6 +1414,56 @@ impl VerifiedBlock { self.header.bls().is_none(), "Only StarfishBls blocks may carry BLS fields" ); + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); + } + ConsensusProtocol::Bluestreak => { + ensure!( + acknowledgments.is_empty(), + "Bluestreak blocks must not carry acknowledgments" + ); + ensure!( + self.header.bls().is_none(), + "Only StarfishBls blocks may carry BLS fields" + ); + let is_leader = self.authority() == committee.elect_leader(round); + if !is_leader { + ensure!( + self.header.block_references.len() <= 2, + "Bluestreak non-leader may reference at most own_prev + leader" + ); + let _self_ref = *self + .header + .block_references + .iter() + .find(|r| r.authority == self.authority()) + .ok_or_else(|| { + eyre::eyre!( + "Bluestreak non-leader block must reference its own previous block" + ) + })?; + if let Some(cert_ref) = self.header.unprovable_certificate() { + ensure!( + round >= 3 && cert_ref.round + 2 == round, + "unprovable_certificate must reference leader at round r-2" + ); + ensure!( + cert_ref.authority == committee.elect_leader(cert_ref.round), + "unprovable_certificate must reference the elected leader" + ); + } + } else { + ensure!( + self.header.unprovable_certificate().is_none(), + "Bluestreak leaders must not carry unprovable_certificate" + ); + ensure!( + threshold_clock_valid_block_header(&self.header, committee), + "Bluestreak leader must reference 2f+1 blocks from previous round" + ); + } } ConsensusProtocol::SailfishPlusPlus => { ensure!( @@ -1341,6 +1478,10 @@ impl VerifiedBlock { self.header.bls().is_none(), "Only StarfishBls blocks may carry BLS fields" ); + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); if round > 1 { let prev_round = round - 1; let prev_leader = committee.elect_leader(prev_round); @@ -2200,6 +2341,102 @@ mod tests { ); } + #[test] + fn verifies_bluestreak_non_leader_with_unprovable_certificate() { + let committee = Committee::new_for_benchmarks(4); + let signers = Signer::new_for_test(committee.len()); + let authority = committee + .authorities() + .find(|authority| *authority != committee.elect_leader(3)) + .unwrap(); + let round_2_leader = committee.elect_leader(2); + let round_1_leader = committee.elect_leader(1); + let mut block = VerifiedBlock::new_with_signer_and_unprovable( + authority, + 3, + vec![ + BlockReference::new_test(authority, 2), + BlockReference::new_test(round_2_leader, 2), + ], + None, + vec![], + 0, + &signers[authority as usize], + None, + None, + vec![], + vec![], + None, + ConsensusProtocol::Bluestreak, + None, + None, + None, + None, + None, + None, + Some(BlockReference::new_test(round_1_leader, 1)), + ); + + let mut encoder = Encoder::new(2, 4, 2).unwrap(); + block + .verify( + committee.as_ref(), + 0, + 1, + &mut encoder, + ConsensusProtocol::Bluestreak, + ) + .unwrap(); + } + + #[test] + fn rejects_bluestreak_leader_with_unprovable_certificate() { + let committee = Committee::new_for_benchmarks(4); + let signers = Signer::new_for_test(committee.len()); + let leader = committee.elect_leader(3); + let round_1_leader = committee.elect_leader(1); + let mut block = VerifiedBlock::new_with_signer_and_unprovable( + leader, + 3, + (0..committee.quorum_threshold() as AuthorityIndex) + .map(|authority| BlockReference::new_test(authority, 2)) + .collect(), + None, + vec![], + 0, + &signers[leader as usize], + None, + None, + vec![], + vec![], + None, + ConsensusProtocol::Bluestreak, + None, + None, + None, + None, + None, + None, + Some(BlockReference::new_test(round_1_leader, 1)), + ); + + let mut encoder = Encoder::new(2, 4, 2).unwrap(); + let err = block + .verify( + committee.as_ref(), + 0, + 1, + &mut encoder, + ConsensusProtocol::Bluestreak, + ) + .unwrap_err(); + + assert!( + err.to_string() + .contains("Bluestreak leaders must not carry unprovable_certificate") + ); + } + #[test] fn verifies_empty_mysticeti_block_without_transaction_data() { let committee = Committee::new_for_benchmarks(4); diff --git a/crates/starfish-core/src/validator.rs b/crates/starfish-core/src/validator.rs index 9e2d76d..90c24ef 100644 --- a/crates/starfish-core/src/validator.rs +++ b/crates/starfish-core/src/validator.rs @@ -310,11 +310,17 @@ mod smoke_tests { #[test_case("starfish-speed", 80)] #[test_case("starfish-bls", 100)] #[test_case("sailfish++", 120)] + #[test_case("bluestreak", 140)] #[tokio::test] async fn validator_commit(consensus: &str, port_offset: u16) { run_commit_test(consensus, port_offset).await; } + #[tokio::test] + async fn validator_commit_bluestreak_basic() { + run_commit_test("bluestreak", 150).await; + } + async fn run_sync_test(consensus: &str, port_offset: u16) { let committee_size = 4; let committee = Committee::new_for_benchmarks(committee_size); @@ -402,6 +408,7 @@ mod smoke_tests { #[test_case("starfish-speed", 180)] #[test_case("starfish-bls", 200)] #[test_case("sailfish++", 220)] + #[test_case("bluestreak", 260)] #[tokio::test] async fn validator_sync(consensus: &str, port_offset: u16) { run_sync_test(consensus, port_offset).await; @@ -465,6 +472,7 @@ mod smoke_tests { #[test_case("starfish-speed", 280)] #[test_case("starfish-bls", 300)] #[test_case("sailfish++", 320)] + #[test_case("bluestreak", 380)] #[tokio::test] async fn validator_crash_faults(consensus: &str, port_offset: u16) { run_crash_faults_test(consensus, port_offset).await; diff --git a/scripts/dryrun.sh b/scripts/dryrun.sh index aaf5115..287eb51 100755 --- a/scripts/dryrun.sh +++ b/scripts/dryrun.sh @@ -7,7 +7,7 @@ NUM_NODES=${NUM_NODES:-10} DESIRED_TPS=${DESIRED_TPS:-1000} # Options: starfish, starfish-speed, starfish-bls, -# cordial-miners, mysticeti +# cordial-miners, mysticeti, bluestreak CONSENSUS=${CONSENSUS:-starfish-speed} NUM_BYZANTINE_NODES=${NUM_BYZANTINE_NODES:-0} # Options: timeout-leader, leader-withholding, From 16164074743679e2e0400d2a4c0e3ff3315f88b8 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 20:41:52 +0100 Subject: [PATCH 09/18] fix(core): default Bluestreak dissemination to pull --- crates/starfish-core/src/dag_state.rs | 7 ++++--- scripts/dryrun.sh | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index 6e0db05..93fdfe2 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -145,11 +145,12 @@ impl ConsensusProtocol { pub fn default_dissemination_mode(self) -> DisseminationMode { match self { - ConsensusProtocol::Mysticeti | ConsensusProtocol::SailfishPlusPlus => { + ConsensusProtocol::Mysticeti + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { DisseminationMode::Pull } - ConsensusProtocol::Bluestreak - | ConsensusProtocol::CordialMiners + ConsensusProtocol::CordialMiners | ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed | ConsensusProtocol::StarfishBls => DisseminationMode::PushCausal, diff --git a/scripts/dryrun.sh b/scripts/dryrun.sh index 287eb51..6c48cb4 100755 --- a/scripts/dryrun.sh +++ b/scripts/dryrun.sh @@ -8,7 +8,7 @@ NUM_NODES=${NUM_NODES:-10} DESIRED_TPS=${DESIRED_TPS:-1000} # Options: starfish, starfish-speed, starfish-bls, # cordial-miners, mysticeti, bluestreak -CONSENSUS=${CONSENSUS:-starfish-speed} +CONSENSUS=${CONSENSUS:-bluestreak} NUM_BYZANTINE_NODES=${NUM_BYZANTINE_NODES:-0} # Options: timeout-leader, leader-withholding, # equivocating-chains, equivocating-two-chains, @@ -24,7 +24,7 @@ STORAGE_BACKEND=${STORAGE_BACKEND:-rocksdb} TRANSACTION_MODE=${TRANSACTION_MODE:-random} # Dissemination mode: protocol-default (default) | pull | # push-causal | push-useful -DISSEMINATION_MODE=${DISSEMINATION_MODE:-push-causal} +DISSEMINATION_MODE=${DISSEMINATION_MODE:-protocol-default} # Enable lz4 network compression. # Auto-enabled for random transaction mode. # Set COMPRESS_NETWORK=1 or =0 to override. From 40e9a3c0998daa3ea35debb4e3f1a730f843195d Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Fri, 20 Mar 2026 09:59:13 +0100 Subject: [PATCH 10/18] perf(core): make Bluestreak clean transitions incremental --- crates/starfish-core/src/core.rs | 35 ++--- crates/starfish-core/src/dag_state.rs | 195 +++++++++++++++++--------- 2 files changed, 137 insertions(+), 93 deletions(-) diff --git a/crates/starfish-core/src/core.rs b/crates/starfish-core/src/core.rs index 22af544..70679ca 100644 --- a/crates/starfish-core/src/core.rs +++ b/crates/starfish-core/src/core.rs @@ -13,7 +13,7 @@ use crate::{ block_handler::BlockHandler, block_manager::BlockManager, bls_certificate_aggregator::{BlsCertificateAggregator, apply_certificate_events}, - committee::{Committee, QuorumThreshold, StakeAggregator, ValidityThreshold}, + committee::Committee, config::NodePrivateConfig, consensus::{ CommitMetastate, @@ -1050,35 +1050,20 @@ impl Core { ) -> Option { let leader_round = clock_round.checked_sub(2)?; let leader = self.committee.elect_leader(leader_round); - let leader_block = self + let leader_blocks = self .dag_state .get_blocks_at_authority_round(leader, leader_round) - .into_iter() - .next()?; - let leader_ref = *leader_block.reference(); - - let vote_round = clock_round.checked_sub(1)?; - let vote_blocks = self.dag_state.get_blocks_by_round_cached(vote_round); - let mut vote_stake = StakeAggregator::::new(); - for block in vote_blocks.iter() { - if block.block_references().iter().any(|r| *r == leader_ref) { - vote_stake.add(block.authority(), &self.committee); - } - } - if vote_stake.is_quorum(&self.committee) { - return Some(leader_ref); - } + .into_iter(); - let current_round_blocks = self.dag_state.get_blocks_by_round_cached(clock_round); - let mut propagation_stake = StakeAggregator::::new(); - for block in current_round_blocks.iter() { - if block.unprovable_certificate() == Some(&leader_ref) { - propagation_stake.add(block.authority(), &self.committee); + for leader_block in leader_blocks { + let leader_ref = *leader_block.reference(); + if self + .dag_state + .has_bluestreak_certificate_evidence(clock_round, &leader_ref) + { + return Some(leader_ref); } } - if propagation_stake.get_stake() >= self.committee.validity_threshold() { - return Some(leader_ref); - } None } diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index 93fdfe2..45aeca6 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -43,6 +43,16 @@ type AuthorityBitmask = AuthoritySet; pub type PendingSubDag = (CommittedSubDag, Vec>); +#[derive(Default)] +struct BluestreakCertificateState { + /// f+1 propagation support from round r for the same unprovable + /// certificate reference. + propagation_support: StakeAggregator, + /// Blocks waiting for this certificate to become provable in the dirty + /// DAG before they can join the clean DAG. + waiting_blocks: BTreeSet, +} + /// Tracks where block headers and transaction data originate. /// Carried on the wire inside `BlockBatch` so the receiver can distinguish /// proactive dissemination from sync responses. @@ -147,9 +157,7 @@ impl ConsensusProtocol { match self { ConsensusProtocol::Mysticeti | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::Bluestreak => { - DisseminationMode::Pull - } + | ConsensusProtocol::Bluestreak => DisseminationMode::Pull, ConsensusProtocol::CordialMiners | ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed @@ -343,6 +351,13 @@ struct DagStateInner { /// Once this reaches f+1 distinct authorities, we can infer that some /// honest node had the referenced block in its active-certified view. inferred_vertex_support: Vec>>, + /// Direct next-round support for elected leader references. Kept generic + /// so other protocols can reuse leader vote aggregation without rescanning + /// dirty DAG rounds. + leader_vote_support: BTreeMap>, + /// Bluestreak-specific propagation support and blocks waiting for a given + /// unprovable certificate reference to become provable. + bluestreak_certificate_state: BTreeMap, /// Sailfish++ timeout certificates, indexed by round. sailfish_timeout_certs: BTreeMap, /// Sailfish++ no-vote certificates, indexed by (round, leader). @@ -419,6 +434,8 @@ impl DagState { pending_vertex_certificate_children: BTreeMap::new(), pending_persisted_vertex_certificates: (0..n).map(|_| BTreeSet::new()).collect(), inferred_vertex_support: (0..n).map(|_| BTreeMap::new()).collect(), + leader_vote_support: BTreeMap::new(), + bluestreak_certificate_state: BTreeMap::new(), sailfish_timeout_certs: BTreeMap::new(), sailfish_novote_certs: BTreeMap::new(), }; @@ -1144,6 +1161,18 @@ impl DagState { .contains(block_ref) } + /// Check whether the local dirty DAG has enough evidence to prove a + /// Bluestreak unprovable certificate for `leader_ref` at `block_round`. + pub fn has_bluestreak_certificate_evidence( + &self, + block_round: RoundNumber, + leader_ref: &BlockReference, + ) -> bool { + self.dag_state_inner + .read() + .has_bluestreak_certificate_evidence(block_round, leader_ref, &self.committee) + } + /// Check whether 2f+1 stake is certified at the given round /// (SailfishPlusPlus certified parent quorum gate). pub fn certified_parent_quorum(&self, round: RoundNumber) -> bool { @@ -2457,6 +2486,18 @@ impl DagStateInner { .retain(|child| child.round >= self.evicted_rounds[child.authority as usize]); !children.is_empty() }); + let split_ref = BlockReference { + authority: 0, + round: min_evicted, + digest: BlockDigest::default(), + }; + self.leader_vote_support = self.leader_vote_support.split_off(&split_ref); + self.bluestreak_certificate_state = self.bluestreak_certificate_state.split_off(&split_ref); + for state in self.bluestreak_certificate_state.values_mut() { + state.waiting_blocks.retain(|block_ref| { + block_ref.round >= self.evicted_rounds[block_ref.authority as usize] + }); + } // Prune Sailfish++ timeout and no-vote certificates. self.sailfish_timeout_certs = self.sailfish_timeout_certs.split_off(&min_evicted); self.sailfish_novote_certs = self.sailfish_novote_certs.split_off(&(min_evicted, 0)); @@ -2545,6 +2586,9 @@ impl DagStateInner { } } if let Some(cert_ref) = block.unprovable_certificate() { + if let Some(state) = self.bluestreak_certificate_state.get_mut(cert_ref) { + state.waiting_blocks.remove(&block_ref); + } if let Some(children) = self.pending_vertex_certificate_children.get_mut(cert_ref) { children.remove(&block_ref); if children.is_empty() { @@ -2635,35 +2679,7 @@ impl DagStateInner { } } - /// Bluestreak pre-clean check: a block is pre-clean if it has no - /// `unprovable_certificate`, or its certificate is verified via the dirty - /// DAG. Pre-clean blocks feed into the vertex certification pipeline. - fn check_bluestreak_pre_clean( - &mut self, - block: &VerifiedBlock, - committee: &Committee, - activated: &mut Vec, - ) { - if self.consensus_protocol != ConsensusProtocol::Bluestreak { - return; - } - let block_ref = *block.reference(); - let is_pre_clean = match block.unprovable_certificate() { - None => true, - Some(leader_ref) => { - self.verify_unprovable_certificate(block_ref.round, leader_ref, committee) - } - }; - if is_pre_clean { - self.note_vertex_certified(block_ref, activated); - } - self.reevaluate_bluestreak_pending(committee, activated); - } - - /// Verify an unprovable certificate against the local dirty DAG. - /// Returns true if 2f+1 blocks at r-1 reference the leader (direct) - /// or f+1 blocks at r propagate the same certificate. - fn verify_unprovable_certificate( + fn has_bluestreak_certificate_evidence( &self, block_round: RoundNumber, leader_ref: &BlockReference, @@ -2672,56 +2688,99 @@ impl DagStateInner { if leader_ref.round + 2 != block_round { return false; } - - let vote_round = match block_round.checked_sub(1) { - Some(round) => round, - None => return false, - }; - let mut stake = StakeAggregator::::new(); - for block in self.get_blocks_by_round(vote_round) { - if block.block_references().iter().any(|r| r == leader_ref) { - stake.add(block.authority(), committee); - } - } - if stake.is_quorum(committee) { + if self + .leader_vote_support + .get(leader_ref) + .is_some_and(|support| support.is_quorum(committee)) + { return true; } + self.bluestreak_certificate_state + .get(leader_ref) + .is_some_and(|state| state.propagation_support.is_quorum(committee)) + } - let mut propagation = StakeAggregator::::new(); - for block in self.get_blocks_by_round(block_round) { - if block.unprovable_certificate() == Some(leader_ref) { - propagation.add(block.authority(), committee); + fn record_leader_vote_support( + &mut self, + block: &VerifiedBlock, + committee: &Committee, + ready_certificates: &mut BTreeSet, + ) { + for parent in block.block_references() { + if parent.round == 0 || parent.round + 1 != block.round() { + continue; + } + if parent.authority != committee.elect_leader(parent.round) { + continue; + } + if self + .leader_vote_support + .entry(*parent) + .or_default() + .add(block.authority(), committee) + { + ready_certificates.insert(*parent); } } - propagation.get_stake() >= committee.validity_threshold() } - fn reevaluate_bluestreak_pending( + fn activate_bluestreak_waiters( &mut self, + leader_ref: BlockReference, + activated: &mut Vec, + ) { + let waiting_blocks = self + .bluestreak_certificate_state + .get_mut(&leader_ref) + .map(|state| std::mem::take(&mut state.waiting_blocks)) + .unwrap_or_default(); + for block_ref in waiting_blocks { + self.note_vertex_certified(block_ref, activated); + } + } + + /// Bluestreak pre-clean check: a block is pre-clean if it has no + /// `unprovable_certificate`, or its certificate is verified via the dirty + /// DAG. Pre-clean blocks feed into the vertex certification pipeline. + fn check_bluestreak_pre_clean( + &mut self, + block: &VerifiedBlock, committee: &Committee, activated: &mut Vec, ) { - let mut ready = Vec::new(); - for auth_map in &self.index { - for round_blocks in auth_map.values() { - for block in round_blocks.values() { - let block_ref = *block.reference(); - if self.vertex_certificates[block_ref.authority as usize].contains(&block_ref) { - continue; - } - let Some(leader_ref) = block.unprovable_certificate() else { - continue; - }; - if self.verify_unprovable_certificate(block.round(), leader_ref, committee) { - ready.push(block_ref); - } + if self.consensus_protocol != ConsensusProtocol::Bluestreak { + return; + } + let mut ready_certificates = BTreeSet::new(); + self.record_leader_vote_support(block, committee, &mut ready_certificates); + + let block_ref = *block.reference(); + match block.unprovable_certificate() { + None => { + self.note_vertex_certified(block_ref, activated); + } + Some(leader_ref) + if self.has_bluestreak_certificate_evidence( + block_ref.round, + leader_ref, + committee, + ) => + { + self.note_vertex_certified(block_ref, activated); + } + Some(leader_ref) => { + let state = self + .bluestreak_certificate_state + .entry(*leader_ref) + .or_default(); + state.waiting_blocks.insert(block_ref); + if state.propagation_support.add(block.authority(), committee) { + ready_certificates.insert(*leader_ref); } } } - ready.sort(); - ready.dedup(); - for block_ref in ready { - self.note_vertex_certified(block_ref, activated); + for leader_ref in ready_certificates { + self.activate_bluestreak_waiters(leader_ref, activated); } } From 27e052c96863fb536e203cd58b0527b20b7dec2c Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Fri, 20 Mar 2026 10:19:31 +0100 Subject: [PATCH 11/18] fix(core): audit fixes for Bluestreak protocol - Add unprovable_certificate edge traversal to infer_vertex_certificate_closure, matching the linearizer pattern. Without this, inferred certification could activate a block whose unprovable_certificate target was not yet certified. - Remove dead duplicate Bluestreak branch in compress_pending_block_references (unreachable after the early-return block added at the top of the function). - Remove timed!() wrappers from try_new_block internals (sub-function timers removed from measurements fixture accordingly). - Update grafana dashboard stacking mode for utilization panel. --- crates/orchestrator/src/measurements.rs | 4 - crates/starfish-core/src/core.rs | 95 +++++------------------ crates/starfish-core/src/dag_state.rs | 9 +++ monitoring/grafana/grafana-dashboard.json | 4 +- 4 files changed, 30 insertions(+), 82 deletions(-) diff --git a/crates/orchestrator/src/measurements.rs b/crates/orchestrator/src/measurements.rs index 918b699..cc4fcb3 100644 --- a/crates/orchestrator/src/measurements.rs +++ b/crates/orchestrator/src/measurements.rs @@ -926,10 +926,6 @@ utilization_timer{proc="Committer::indirect_decide"} 693165 utilization_timer{proc="Core::add_blocks"} 5694911 utilization_timer{proc="Core::run_block_handler"} 198119 utilization_timer{proc="Core::try_new_block"} 1285400 -utilization_timer{proc="Core::try_new_block::build block"} 674913 -utilization_timer{proc="Core::try_new_block::encoding"} 31602 -utilization_timer{proc="Core::try_new_block::serialize block"} 11766 -utilization_timer{proc="Core::try_new_block::writing to disk"} 188706 utilization_timer{proc="Core::try_new_commit"} 6288004 utilization_timer{proc="Network: verify blocks"} 41800099 utilization_timer{proc="Syncer::try_new_commit"} 8128094 diff --git a/crates/starfish-core/src/core.rs b/crates/starfish-core/src/core.rs index 70679ca..1405288 100644 --- a/crates/starfish-core/src/core.rs +++ b/crates/starfish-core/src/core.rs @@ -466,7 +466,7 @@ impl Core { let _block_timer = self .metrics .utilization_timer - .utilization_timer("Core::new_block::try_new_block"); + .utilization_timer("Core::try_new_block"); // Check if we're ready for a new block let clock_round = self.dag_state.proposal_round(); @@ -508,16 +508,9 @@ impl Core { None }; - let pending_transactions = timed!( - self.metrics, - "Core::new_block::get_pending_transactions", - self.get_pending_transactions(clock_round) - ); - let (mut transactions, block_references, raw_refs) = timed!( - self.metrics, - "Core::new_block::collect_transactions_and_references", - self.collect_transactions_and_references(pending_transactions, clock_round) - ); + let pending_transactions = self.get_pending_transactions(clock_round); + let (mut transactions, block_references, raw_refs) = + self.collect_transactions_and_references(pending_transactions, clock_round); // SailfishPlusPlus: if the certified-parent filter reduced the parent // set below threshold-clock quorum, we cannot build a valid block yet. @@ -556,23 +549,11 @@ impl Core { if starfish_speed_excluded_authors.contains(self.authority) { self.requeue_transactions(std::mem::take(&mut transactions)); } - timed!( - self.metrics, - "Core::new_block::prepare_last_blocks", - self.prepare_last_blocks() - ); - let mut encoded_transactions = timed!( - self.metrics, - "Core::new_block::prepare_encoded_transactions", - self.prepare_encoded_transactions(&transactions) - ); + self.prepare_last_blocks(); + let mut encoded_transactions = self.prepare_encoded_transactions(&transactions); let acknowledgment_references = if self.dag_state.consensus_protocol.supports_acknowledgments() { - timed!( - self.metrics, - "Core::new_block::get_pending_acknowledgment", - self.dag_state.get_pending_acknowledgment(clock_round) - ) + self.dag_state.get_pending_acknowledgment(clock_round) } else { Vec::new() }; @@ -581,11 +562,7 @@ impl Core { acknowledgment_references, ); let number_of_blocks_to_create = self.last_own_block.len(); - let authority_bounds = timed!( - self.metrics, - "Core::new_block::calculate_authority_bounds", - self.calculate_authority_bounds(number_of_blocks_to_create) - ); + let authority_bounds = self.calculate_authority_bounds(number_of_blocks_to_create); let certified_leader = if self.dag_state.consensus_protocol == ConsensusProtocol::StarfishBls { @@ -617,30 +594,22 @@ impl Core { transactions = vec![]; encoded_transactions = self.prepare_encoded_transactions(&transactions); } - let block_data = timed!( - self.metrics, - "Core::new_block::build_block", - self.build_block( - &block_references, - voted_leader_ref, - &transactions, - &encoded_transactions, - &acknowledgment_references, - clock_round, - block_id, - aggregate_round_sig, - certified_leader, - ) + let block_data = self.build_block( + &block_references, + voted_leader_ref, + &transactions, + &encoded_transactions, + &acknowledgment_references, + clock_round, + block_id, + aggregate_round_sig, + certified_leader, ); tracing::debug!("Created block {:?}", block_data); if first_block.is_none() { first_block = Some(block_data.clone()); } - timed!( - self.metrics, - "Core::new_block::store_block", - self.store_block(block_data, &authority_bounds, block_id) - ); + self.store_block(block_data, &authority_bounds, block_id); } self.metrics @@ -1141,32 +1110,6 @@ impl Core { }) .collect(); } - // Bluestreak: non-leaders include only the leader at r-1 - // (own_prev is always prepended by build_block). Leaders keep the - // full clean-DAG frontier for the quorum requirement. - if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { - let is_leader = self.committee.elect_leader(block_round) == self.authority; - if !is_leader { - let prev_round = block_round.saturating_sub(1); - let leader = self.committee.elect_leader(prev_round); - return pending_refs - .iter() - .copied() - .filter(|r| r.authority == leader && r.round == prev_round) - .take(1) - .collect(); - } - // Leaders: keep all previous-round refs (same as SailfishPP). - let prev_round = block_round.saturating_sub(1); - let mut seen = AHashSet::new(); - return pending_refs - .iter() - .copied() - .filter(|r| { - r.authority != self.authority && seen.insert(*r) && r.round >= prev_round - }) - .collect(); - } if self.dag_state.consensus_protocol == ConsensusProtocol::StarfishBls { if self.committee.elect_leader(block_round) != self.authority { return Vec::new(); diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index 45aeca6..8f54173 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -2817,6 +2817,15 @@ impl DagStateInner { stack.push(*parent); } } + if self.consensus_protocol == ConsensusProtocol::Bluestreak { + if let Some(cert_ref) = block.unprovable_certificate() { + if cert_ref.round > 0 + && !self.vertex_certificates[cert_ref.authority as usize].contains(cert_ref) + { + stack.push(*cert_ref); + } + } + } } for block_ref in to_activate { diff --git a/monitoring/grafana/grafana-dashboard.json b/monitoring/grafana/grafana-dashboard.json index 85215c7..429aaef 100644 --- a/monitoring/grafana/grafana-dashboard.json +++ b/monitoring/grafana/grafana-dashboard.json @@ -1500,7 +1500,7 @@ "barAlignment": 0, "barWidthFactor": 0.6, "drawStyle": "line", - "fillOpacity": 0, + "fillOpacity": 20, "gradientMode": "none", "hideFrom": { "legend": false, @@ -1518,7 +1518,7 @@ "spanNulls": false, "stacking": { "group": "A", - "mode": "none" + "mode": "normal" }, "thresholdsStyle": { "mode": "off" From e7f6a5244c26e1769acbe273b01e1b541fb1bfda Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Fri, 20 Mar 2026 12:01:06 +0100 Subject: [PATCH 12/18] perf(core): make utilization timers non-overlapping Remove outer wrapping timers (Core::add_blocks, Core::add_headers, Syncer::try_new_commit, Core::try_new_commit, Committer::direct_decide, Committer::indirect_decide) that double-counted time with their inner leaf timers. Add new leaf timers for Core::try_commit, Core::cleanup, and Core::handle_committed_subdag. The remaining timer set partitions core thread time so their sum approximates core_lock_util. Update Grafana dashboard query to match only active timer prefixes. --- .../src/consensus/universal_committer.rs | 12 +---------- crates/starfish-core/src/core.rs | 20 +++++++++++-------- crates/starfish-core/src/syncer.rs | 11 +--------- monitoring/grafana/grafana-dashboard.json | 2 +- 4 files changed, 15 insertions(+), 30 deletions(-) diff --git a/crates/starfish-core/src/consensus/universal_committer.rs b/crates/starfish-core/src/consensus/universal_committer.rs index 636365a..db8c1ea 100644 --- a/crates/starfish-core/src/consensus/universal_committer.rs +++ b/crates/starfish-core/src/consensus/universal_committer.rs @@ -11,7 +11,7 @@ use crate::{ committee::{Committee, QuorumThreshold, StakeAggregator}, consensus::base_committer::BaseCommitterOptions, dag_state::{ConsensusProtocol, DagState}, - metrics::{Metrics, UtilizationTimerVecExt}, + metrics::Metrics, types::{AuthorityIndex, BlockReference, RoundNumber, Stake, format_authority_round}, }; @@ -125,20 +125,11 @@ impl UniversalCommitter { let voter_info = &self.voters_cache[&(leader, round)].1; // Try to directly decide the leader. - let timer_direct_decide = self - .metrics - .utilization_timer - .utilization_timer("Committer::direct_decide"); let mut status = committer.try_direct_decide(leader, round, voter_info); - drop(timer_direct_decide); tracing::debug!("Outcome of direct rule: {status}"); // If the leader is not final (undecided, or Commit(Pending) for StarfishSpeed), // try to resolve via indirect rule. - let timer_indirect_decide = self - .metrics - .utilization_timer - .utilization_timer("Committer::indirect_decide"); if !status.is_final() { status = committer.try_indirect_decide(leader, round, leaders.iter(), voter_info); @@ -147,7 +138,6 @@ impl UniversalCommitter { } tracing::debug!("Outcome of indirect rule: {status}"); } - drop(timer_indirect_decide); if status.is_final() { self.decided.insert((leader, round), status.clone()); diff --git a/crates/starfish-core/src/core.rs b/crates/starfish-core/src/core.rs index 1405288..ffc0bc6 100644 --- a/crates/starfish-core/src/core.rs +++ b/crates/starfish-core/src/core.rs @@ -232,10 +232,6 @@ impl Core { Vec, Vec>, ) { - let _timer = self - .metrics - .utilization_timer - .utilization_timer("Core::add_blocks"); let mut block_shards = Vec::new(); let blocks: Vec<_> = blocks .into_iter() @@ -322,10 +318,6 @@ impl Core { Vec, Vec>, ) { - let _timer = self - .metrics - .utilization_timer - .utilization_timer("Core::add_headers"); let (processed, _, missing_references) = timed!( self.metrics, "BlockManager::add_headers", @@ -1231,6 +1223,10 @@ impl Core { #[allow(clippy::type_complexity)] pub fn try_commit(&mut self) -> (Vec<(Data, Option)>, bool) { + let _timer = self + .metrics + .utilization_timer + .utilization_timer("Core::try_commit"); let leaders = self.committer.try_commit(self.last_commit_leader); let any_decided = !leaders.is_empty(); let sequence: Vec<_> = leaders @@ -1246,6 +1242,10 @@ impl Core { } pub fn cleanup(&mut self) -> RoundNumber { + let _timer = self + .metrics + .utilization_timer + .utilization_timer("Core::cleanup"); self.dag_state.cleanup(); let threshold = self.dag_state.gc_round(); self.block_manager @@ -1302,6 +1302,10 @@ impl Core { } pub fn handle_committed_subdag(&mut self, committed: Vec, _any_decided: bool) { + let _timer = self + .metrics + .utilization_timer + .utilization_timer("Core::handle_committed_subdag"); let mut commit_data = vec![]; for commit in &committed { let committed_rounds = self.dag_state.update_commit_state(commit); diff --git a/crates/starfish-core/src/syncer.rs b/crates/starfish-core/src/syncer.rs index 578cb67..e2030b0 100644 --- a/crates/starfish-core/src/syncer.rs +++ b/crates/starfish-core/src/syncer.rs @@ -16,7 +16,7 @@ use crate::{ core::Core, dag_state::{ConsensusProtocol, DagState, DataSource}, data::Data, - metrics::{Metrics, UtilizationTimerVecExt}, + metrics::Metrics, runtime::timestamp_utc, sailfish_service::SailfishServiceMessage, types::{ @@ -388,16 +388,7 @@ impl Syncer { } pub fn try_new_commit(&mut self) { - let _timer = self - .metrics - .utilization_timer - .utilization_timer("Syncer::try_new_commit"); - let timer_core_commit = self - .metrics - .utilization_timer - .utilization_timer("Core::try_new_commit"); let (newly_committed, any_decided) = self.core.try_commit(); - drop(timer_core_commit); let utc_now = timestamp_utc(); if !newly_committed.is_empty() { let committed_refs: Vec<_> = newly_committed diff --git a/monitoring/grafana/grafana-dashboard.json b/monitoring/grafana/grafana-dashboard.json index 429aaef..4dd6a3c 100644 --- a/monitoring/grafana/grafana-dashboard.json +++ b/monitoring/grafana/grafana-dashboard.json @@ -1570,7 +1570,7 @@ }, "disableTextWrap": false, "editorMode": "code", - "expr": "rate(utilization_timer{node=~\"$node\", proc=~\"Core::.*|BlockManager::.*|Committer::.*|Syncer::.*\"}[1m]) / 1000", + "expr": "rate(utilization_timer{node=~\"$node\", proc=~\"Core::.*|BlockManager::.*\"}[1m]) / 1000", "fullMetaSearch": false, "includeNullMetadata": true, "legendFormat": "{{node}} - {{proc}}", From 9c900829c67b0c7083df3f8356f8e6637448e293 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Fri, 20 Mar 2026 12:14:41 +0100 Subject: [PATCH 13/18] perf(core): use BTreeSet::range() for certified round lookups Replace linear .iter().any() scans in proposal_round() and certified_parent_quorum() with O(log n) BTreeSet::range() queries, exploiting BlockReference's (round, authority, digest) ordering. --- crates/starfish-core/src/dag_state.rs | 33 ++++++++++++++++++++------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index 8f54173..c7566cf 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -364,6 +364,26 @@ struct DagStateInner { sailfish_novote_certs: BTreeMap<(RoundNumber, AuthorityIndex), SailfishNoVoteCert>, } +/// O(log n) check whether `certs` contains any entry at the given round. +/// Exploits `BlockReference` ordering `(round, authority, digest)` to +/// binary-search instead of scanning the entire set. +fn has_certificate_at_round( + certs: &BTreeSet, + round: RoundNumber, +) -> bool { + let lo = BlockReference { + round, + authority: 0, + digest: BlockDigest::default(), + }; + let hi = BlockReference { + round: round + 1, + authority: 0, + digest: BlockDigest::default(), + }; + certs.range(lo..hi).next().is_some() +} + impl DagState { pub fn open( authority: AuthorityIndex, @@ -706,10 +726,10 @@ impl DagState { let prev_round = round - 1; let mut stake: Stake = 0; for auth in 0..inner.committee_size { - if inner.vertex_certificates[auth] - .iter() - .any(|reference| reference.round == prev_round) - { + if has_certificate_at_round( + &inner.vertex_certificates[auth], + prev_round, + ) { stake += self .committee .get_stake(auth as AuthorityIndex) @@ -1180,10 +1200,7 @@ impl DagState { let mut stake: Stake = 0; for auth in 0..inner.committee_size { // Check if this authority has any certified block at the round. - if inner.vertex_certificates[auth] - .iter() - .any(|r| r.round == round) - { + if has_certificate_at_round(&inner.vertex_certificates[auth], round) { stake += self .committee .get_stake(auth as AuthorityIndex) From bb3341390fa0b412ca5751eac427081632d4cb53 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Fri, 20 Mar 2026 12:14:48 +0100 Subject: [PATCH 14/18] perf(core): add decided-cache fast path in Bluestreak commit Read back from self.decided at the top of each round iteration, matching the Mysticeti path. Skip and Commit are monotonically final, so cache hits avoid re-evaluating skip evidence and re-walking the commit chain. --- .../src/consensus/universal_committer.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/crates/starfish-core/src/consensus/universal_committer.rs b/crates/starfish-core/src/consensus/universal_committer.rs index db8c1ea..bb46d39 100644 --- a/crates/starfish-core/src/consensus/universal_committer.rs +++ b/crates/starfish-core/src/consensus/universal_committer.rs @@ -183,8 +183,19 @@ impl UniversalCommitter { for round in last_decided_round + 1..=highest_anchor { let leader = self.committee.elect_leader(round); + // Fast path: reuse finalized decisions from previous calls. + let key = (leader, round); + if let Some(cached) = self.decided.get(&key) { + if cached.is_final() { + committed.push(cached.clone()); + if matches!(cached, LeaderStatus::Commit(..)) { + newly_committed.insert(key); + } + continue; + } + } + if self.check_direct_skip_bluestreak(leader, round) { - let key = (leader, round); if !self.decided.contains_key(&key) { let status = LeaderStatus::Skip(leader, round); self.decided.insert(key, status.clone()); From f3c21953d01821006e8faf2d6bedaf26089e3c4b Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Fri, 20 Mar 2026 14:11:24 +0100 Subject: [PATCH 15/18] fix(core): remove round filter from Bluestreak leader block references Bluestreak leaders are the sole aggregation points in the DAG, but compress_pending_block_references was filtering out refs from rounds r-2 or earlier. This caused late-arriving blocks to be silently dropped and become permanently unreachable. Remove the r.round >= prev_round filter so leaders include all non-self pending refs, consistent with StarfishBls leader behavior. --- crates/starfish-core/src/core.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/starfish-core/src/core.rs b/crates/starfish-core/src/core.rs index ffc0bc6..3fe07be 100644 --- a/crates/starfish-core/src/core.rs +++ b/crates/starfish-core/src/core.rs @@ -1077,14 +1077,11 @@ impl Core { .collect(); } - let prev_round = block_round.saturating_sub(1); let mut seen = AHashSet::new(); return pending_refs .iter() .copied() - .filter(|r| { - r.authority != self.authority && seen.insert(*r) && r.round >= prev_round - }) + .filter(|r| r.authority != self.authority && seen.insert(*r)) .collect(); } From 3cd265799e52a55b1a48c1d2deabe79940aaf48c Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Fri, 20 Mar 2026 14:11:54 +0100 Subject: [PATCH 16/18] fix(core): only re-emit Commits from decided cache in Bluestreak Cached Skips should short-circuit evaluation but not be pushed to the committed list, matching the original guard behavior. Restructure the decided-cache fast path to only push LeaderStatus::Commit entries. --- crates/starfish-core/src/consensus/universal_committer.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/starfish-core/src/consensus/universal_committer.rs b/crates/starfish-core/src/consensus/universal_committer.rs index bb46d39..66e56a9 100644 --- a/crates/starfish-core/src/consensus/universal_committer.rs +++ b/crates/starfish-core/src/consensus/universal_committer.rs @@ -184,11 +184,14 @@ impl UniversalCommitter { let leader = self.committee.elect_leader(round); // Fast path: reuse finalized decisions from previous calls. + // Only re-emit Commits; cached Skips still short-circuit evaluation + // but are not pushed to `committed` (matching the original guard + // behavior that never re-emitted Skips). let key = (leader, round); if let Some(cached) = self.decided.get(&key) { if cached.is_final() { - committed.push(cached.clone()); - if matches!(cached, LeaderStatus::Commit(..)) { + if let LeaderStatus::Commit(..) = cached { + committed.push(cached.clone()); newly_committed.insert(key); } continue; From b5c1746b064007cd63184c445cdb373449f14a85 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Fri, 20 Mar 2026 14:19:05 +0100 Subject: [PATCH 17/18] style(core): fix rustfmt formatting in dag_state.rs --- crates/starfish-core/src/dag_state.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index c7566cf..91f06ae 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -367,10 +367,7 @@ struct DagStateInner { /// O(log n) check whether `certs` contains any entry at the given round. /// Exploits `BlockReference` ordering `(round, authority, digest)` to /// binary-search instead of scanning the entire set. -fn has_certificate_at_round( - certs: &BTreeSet, - round: RoundNumber, -) -> bool { +fn has_certificate_at_round(certs: &BTreeSet, round: RoundNumber) -> bool { let lo = BlockReference { round, authority: 0, @@ -726,10 +723,7 @@ impl DagState { let prev_round = round - 1; let mut stake: Stake = 0; for auth in 0..inner.committee_size { - if has_certificate_at_round( - &inner.vertex_certificates[auth], - prev_round, - ) { + if has_certificate_at_round(&inner.vertex_certificates[auth], prev_round) { stake += self .committee .get_stake(auth as AuthorityIndex) From 8f0665c91edf2e393a119ccd4284f5e4000f5549 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Fri, 20 Mar 2026 14:19:11 +0100 Subject: [PATCH 18/18] feat(monitoring): average core timers across validators and reorder config panel - Core functions time/s panel now shows avg by proc across all validators with stacked area chart - Configuration panel target order matches title: protocol, dissemination, tx type, storage --- monitoring/grafana/grafana-dashboard.json | 24 +++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/monitoring/grafana/grafana-dashboard.json b/monitoring/grafana/grafana-dashboard.json index 4dd6a3c..c783ab3 100644 --- a/monitoring/grafana/grafana-dashboard.json +++ b/monitoring/grafana/grafana-dashboard.json @@ -222,8 +222,8 @@ "type": "prometheus", "uid": "Fixed-UID-testbed" }, - "expr": "quantile(0.5, transaction_mode_info{node=~\"$node\"}) == 0", - "legendFormat": "AllZero", + "expr": "max by (mode) (dissemination_mode_info{node=~\"$node\"})", + "legendFormat": "{{mode}}", "refId": "B", "instant": true }, @@ -232,8 +232,8 @@ "type": "prometheus", "uid": "Fixed-UID-testbed" }, - "expr": "quantile(0.5, transaction_mode_info{node=~\"$node\"}) == 1", - "legendFormat": "Random", + "expr": "quantile(0.5, transaction_mode_info{node=~\"$node\"}) == 0", + "legendFormat": "AllZero", "refId": "C", "instant": true }, @@ -242,8 +242,8 @@ "type": "prometheus", "uid": "Fixed-UID-testbed" }, - "expr": "quantile(0.5, storage_backend_info{node=~\"$node\"}) == 0", - "legendFormat": "RocksDB", + "expr": "quantile(0.5, transaction_mode_info{node=~\"$node\"}) == 1", + "legendFormat": "Random", "refId": "D", "instant": true }, @@ -252,8 +252,8 @@ "type": "prometheus", "uid": "Fixed-UID-testbed" }, - "expr": "quantile(0.5, storage_backend_info{node=~\"$node\"}) == 1", - "legendFormat": "TideHunter", + "expr": "quantile(0.5, storage_backend_info{node=~\"$node\"}) == 0", + "legendFormat": "RocksDB", "refId": "E", "instant": true }, @@ -262,8 +262,8 @@ "type": "prometheus", "uid": "Fixed-UID-testbed" }, - "expr": "max by (mode) (dissemination_mode_info{node=~\"$node\"})", - "legendFormat": "{{mode}}", + "expr": "quantile(0.5, storage_backend_info{node=~\"$node\"}) == 1", + "legendFormat": "TideHunter", "refId": "F", "instant": true } @@ -1570,10 +1570,10 @@ }, "disableTextWrap": false, "editorMode": "code", - "expr": "rate(utilization_timer{node=~\"$node\", proc=~\"Core::.*|BlockManager::.*\"}[1m]) / 1000", + "expr": "avg by (proc) (rate(utilization_timer{node=~\"$node\", proc=~\"Core::.*|BlockManager::.*\"}[1m])) / 1000", "fullMetaSearch": false, "includeNullMetadata": true, - "legendFormat": "{{node}} - {{proc}}", + "legendFormat": "{{proc}}", "range": true, "refId": "A", "useBackend": false