From 9723f394fbe53a65b15fa891c44d7d2176841fe6 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 17:45:27 +0100 Subject: [PATCH 1/9] feat(core): add MysticetiCompress protocol variant with basic wiring Add MysticetiCompress to ConsensusProtocol enum with from_str parsing, helper methods (is_mysticeti_compress, uses_dual_dag), and wire into all match arms: PushCausal dissemination, no erasure coding, Blake3 transaction commitment, ancestor-based subdag collection, full-block format, wave_length=3 with pipeline=true (3 pipelined committers). --- crates/starfish-core/src/broadcaster.rs | 9 ++++-- .../starfish-core/src/consensus/linearizer.rs | 3 +- .../src/consensus/universal_committer.rs | 7 +++++ crates/starfish-core/src/core.rs | 3 +- crates/starfish-core/src/dag_state.rs | 28 ++++++++++++++++--- crates/starfish-core/src/net_sync.rs | 3 +- crates/starfish-core/src/types.rs | 10 +++++-- 7 files changed, 50 insertions(+), 13 deletions(-) diff --git a/crates/starfish-core/src/broadcaster.rs b/crates/starfish-core/src/broadcaster.rs index d772410..f4a519a 100644 --- a/crates/starfish-core/src/broadcaster.rs +++ b/crates/starfish-core/src/broadcaster.rs @@ -85,7 +85,8 @@ impl BroadcasterParameters { ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed | ConsensusProtocol::StarfishBls - | ConsensusProtocol::CordialMiners => Self { + | ConsensusProtocol::CordialMiners + | ConsensusProtocol::MysticetiCompress => Self { batch_own_block_size: committee_size, batch_other_block_size: committee_size * committee_size, batch_shard_size: committee_size * committee_size, @@ -417,7 +418,8 @@ where } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { let all_blocks = self.inner.dag_state.get_storage_blocks(&block_references); let mut blocks = Vec::new(); @@ -821,7 +823,8 @@ fn push_transport_format(consensus_protocol: ConsensusProtocol) -> PushOtherBloc | ConsensusProtocol::StarfishBls => PushOtherBlocksFormat::HeadersAndShards, ConsensusProtocol::CordialMiners | ConsensusProtocol::Mysticeti - | ConsensusProtocol::SailfishPlusPlus => PushOtherBlocksFormat::FullBlocks, + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => PushOtherBlocksFormat::FullBlocks, } } diff --git a/crates/starfish-core/src/consensus/linearizer.rs b/crates/starfish-core/src/consensus/linearizer.rs index 1c7c40f..0e44eb7 100644 --- a/crates/starfish-core/src/consensus/linearizer.rs +++ b/crates/starfish-core/src/consensus/linearizer.rs @@ -259,7 +259,8 @@ impl Linearizer { } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { self.collect_subdag_ancestors(dag_state, leader_block) } }; diff --git a/crates/starfish-core/src/consensus/universal_committer.rs b/crates/starfish-core/src/consensus/universal_committer.rs index bf4568f..ca0536e 100644 --- a/crates/starfish-core/src/consensus/universal_committer.rs +++ b/crates/starfish-core/src/consensus/universal_committer.rs @@ -432,6 +432,13 @@ impl UniversalCommitterBuilder { wave_length: WAVE_LENGTH, pipeline: false, }, + ConsensusProtocol::MysticetiCompress => Self { + committee, + dag_state, + metrics, + wave_length: WAVE_LENGTH, + pipeline: true, + }, ConsensusProtocol::CordialMiners => Self { committee, dag_state, diff --git a/crates/starfish-core/src/core.rs b/crates/starfish-core/src/core.rs index 8834c4d..7e9fb52 100644 --- a/crates/starfish-core/src/core.rs +++ b/crates/starfish-core/src/core.rs @@ -746,7 +746,8 @@ impl Core { )), ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => None, + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => None, } } diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index d9ed13c..1bcc678 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -101,6 +101,7 @@ pub enum ConsensusProtocol { StarfishSpeed, StarfishBls, SailfishPlusPlus, + MysticetiCompress, } impl ConsensusProtocol { @@ -112,6 +113,7 @@ impl ConsensusProtocol { "starfish-bls" | "starfish-l" => ConsensusProtocol::StarfishBls, "starfish-speed" | "starfish-s" => ConsensusProtocol::StarfishSpeed, "sailfish++" | "sailfish-pp" => ConsensusProtocol::SailfishPlusPlus, + "mysticeti-compress" => ConsensusProtocol::MysticetiCompress, _ => ConsensusProtocol::Starfish, } } @@ -129,13 +131,26 @@ impl ConsensusProtocol { matches!(self, ConsensusProtocol::SailfishPlusPlus) } + pub fn is_mysticeti_compress(self) -> bool { + matches!(self, ConsensusProtocol::MysticetiCompress) + } + + /// Protocols that use a dual dirty/clean DAG with vertex certification. + pub fn uses_dual_dag(self) -> bool { + matches!( + self, + ConsensusProtocol::SailfishPlusPlus | ConsensusProtocol::MysticetiCompress + ) + } + pub fn default_dissemination_mode(self) -> DisseminationMode { match self { ConsensusProtocol::Mysticeti | ConsensusProtocol::SailfishPlusPlus => { DisseminationMode::Pull } - ConsensusProtocol::CordialMiners => DisseminationMode::PushCausal, - ConsensusProtocol::Starfish + ConsensusProtocol::MysticetiCompress + | ConsensusProtocol::CordialMiners + | ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed | ConsensusProtocol::StarfishBls => DisseminationMode::PushCausal, } @@ -611,6 +626,9 @@ impl DagState { ConsensusProtocol::SailfishPlusPlus => { tracing::info!("Starting Sailfish++ protocol") } + ConsensusProtocol::MysticetiCompress => { + tracing::info!("Starting MysticetiCompress protocol") + } } let dag_state = Self { store: store.clone(), @@ -3295,7 +3313,8 @@ mod tests { let merkle_root = match consensus_protocol { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { TransactionsCommitment::new_from_transactions(&empty_transactions) } ConsensusProtocol::Starfish @@ -3329,7 +3348,8 @@ mod tests { let merkle_root = match consensus_protocol { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { TransactionsCommitment::new_from_transactions(&empty_transactions) } ConsensusProtocol::Starfish diff --git a/crates/starfish-core/src/net_sync.rs b/crates/starfish-core/src/net_sync.rs index 73c2c2a..e1bd378 100644 --- a/crates/starfish-core/src/net_sync.rs +++ b/crates/starfish-core/src/net_sync.rs @@ -504,7 +504,8 @@ impl ConnectionHandler { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { blocks_with_transactions.push(block); } ConsensusProtocol::Starfish diff --git a/crates/starfish-core/src/types.rs b/crates/starfish-core/src/types.rs index f3e2331..05a3f64 100644 --- a/crates/starfish-core/src/types.rs +++ b/crates/starfish-core/src/types.rs @@ -820,7 +820,8 @@ impl VerifiedBlock { } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { TransactionsCommitment::new_from_transactions(&transactions) } }; @@ -1111,7 +1112,8 @@ impl VerifiedBlock { } ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners - | ConsensusProtocol::SailfishPlusPlus => { + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::MysticetiCompress => { let empty_transactions = Vec::new(); let empty_transactions_commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); @@ -1297,7 +1299,9 @@ impl VerifiedBlock { "Only StarfishBls blocks may carry BLS fields" ); } - ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners => { + ConsensusProtocol::Mysticeti + | ConsensusProtocol::CordialMiners + | ConsensusProtocol::MysticetiCompress => { ensure!( acknowledgments.is_empty(), "{consensus_protocol:?} blocks must not carry acknowledgments" From b0bd2a3debe39e1ae258097069ef113607d5fe34 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 18:08:01 +0100 Subject: [PATCH 2/9] feat(core): add unprovable_certificate field to BlockHeader Add Optional field for MysticetiCompress unprovable certificate mechanism. Includes accessors on BlockHeader and VerifiedBlock. All existing callers pass None. Digest integration deferred to block building step. --- crates/starfish-core/src/threshold_clock.rs | 1 + crates/starfish-core/src/types.rs | 13 +++++++++++++ 2 files changed, 14 insertions(+) diff --git a/crates/starfish-core/src/threshold_clock.rs b/crates/starfish-core/src/threshold_clock.rs index 7490458..288b7b1 100644 --- a/crates/starfish-core/src/threshold_clock.rs +++ b/crates/starfish-core/src/threshold_clock.rs @@ -116,6 +116,7 @@ mod tests { strong_vote: None, bls: None, sailfish: None, + unprovable_certificate: None, serialized: None, } } diff --git a/crates/starfish-core/src/types.rs b/crates/starfish-core/src/types.rs index 05a3f64..5ee6f94 100644 --- a/crates/starfish-core/src/types.rs +++ b/crates/starfish-core/src/types.rs @@ -283,6 +283,9 @@ pub struct BlockHeader { /// Sailfish++ control certificates (timeout/no-vote). None for all other /// protocols. pub(crate) sailfish: Option>, + /// MysticetiCompress unprovable certificate: optional reference to a + /// leader at round r-2 certified by 2f+1 votes at r-1. + pub(crate) unprovable_certificate: Option, // -- Cache (not serialized) ----------------------------------------------- /// Cached bincode-serialized bytes. Populated by `preserialize()` off the @@ -415,6 +418,10 @@ impl BlockHeader { .unwrap_or(&[]) } + pub fn unprovable_certificate(&self) -> Option<&BlockReference> { + self.unprovable_certificate.as_ref() + } + pub fn sailfish(&self) -> Option<&SailfishFields> { self.sailfish.as_deref() } @@ -728,6 +735,7 @@ impl VerifiedBlock { strong_vote, bls: bls.map(Box::new), sailfish: sailfish.map(Box::new), + unprovable_certificate: None, serialized: None, }; @@ -966,6 +974,10 @@ impl VerifiedBlock { self.header.is_strong_blame() } + pub fn unprovable_certificate(&self) -> Option<&BlockReference> { + self.header.unprovable_certificate() + } + pub fn sailfish(&self) -> Option<&SailfishFields> { self.header.sailfish() } @@ -1975,6 +1987,7 @@ mod tests { strong_vote: None, bls: None, sailfish: None, + unprovable_certificate: None, serialized: None, }; From 874fde2e015869a6ee4e9239cfacff17dcc7801b Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 18:11:11 +0100 Subject: [PATCH 3/9] feat(core): add unprovable_certificate field to BlockHeader for MysticetiCompress Add optional unprovable_certificate field to BlockHeader to support the MysticetiCompress protocol. This field references a leader block at round r-2 that is claimed to have been seen by 2f+1 validators at round r-1, and is part of the signed block hash for compression. Changes: - Add unprovable_certificate: Option field to BlockHeader - Add accessor methods unprovable_certificate() to BlockHeader and VerifiedBlock - Initialize field to None in all BlockHeader constructors - Update BlockDigest hashing to include unprovable_certificate in digest - Update all Block Digest::new and new_without_transactions calls to pass parameter - Update Signer::sign_block and PublicKey::verify_signature_in_block to handle field Co-Authored-By: Claude --- crates/starfish-core/src/types.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/starfish-core/src/types.rs b/crates/starfish-core/src/types.rs index 5ee6f94..a940967 100644 --- a/crates/starfish-core/src/types.rs +++ b/crates/starfish-core/src/types.rs @@ -779,6 +779,7 @@ impl VerifiedBlock { strong_vote: None, bls: None, sailfish: None, + unprovable_certificate: None, serialized: None, }; let mut block = Self { From 3b1bc0c66194c9900d6e8b814b7c449e36b7896a Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 18:12:58 +0100 Subject: [PATCH 4/9] feat(core): non-leader reference compression and unprovable_certificate MysticetiCompress non-leaders include at most own_prev + leader at r-1. Leaders keep the full clean-DAG frontier. compute_unprovable_certificate checks direct evidence (2f+1 votes at r-1) and propagation (f+1 at r). --- crates/starfish-core/src/core.rs | 95 ++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/crates/starfish-core/src/core.rs b/crates/starfish-core/src/core.rs index 7e9fb52..8838f28 100644 --- a/crates/starfish-core/src/core.rs +++ b/crates/starfish-core/src/core.rs @@ -936,6 +936,19 @@ impl Core { sailfish_fields, ); + // MysticetiCompress: compute and set unprovable_certificate for + // non-leader blocks. + if self.dag_state.consensus_protocol == ConsensusProtocol::MysticetiCompress + && self.committee.elect_leader(clock_round) != self.authority + && clock_round >= 3 + { + let cert = self.compute_unprovable_certificate( + clock_round, + block.block_references(), + ); + block.header.unprovable_certificate = cert; + } + self.metrics .proposed_block_refs .observe(block_ref_count as f64); @@ -947,6 +960,60 @@ impl Core { Data::new(block) } + /// Compute the unprovable certificate for a MysticetiCompress non-leader + /// block at `clock_round`. Returns a reference to the leader at r-2 if + /// 2f+1 blocks at r-1 reference it (direct evidence) or f+1 blocks at r + /// already propagate the same certificate. + fn compute_unprovable_certificate( + &self, + clock_round: RoundNumber, + _block_references: &[BlockReference], + ) -> Option { + let leader_round = clock_round - 2; + let leader = self.committee.elect_leader(leader_round); + let leader_blocks = + self.dag_state + .get_blocks_at_authority_round(leader, leader_round); + let leader_ref = *leader_blocks.first()?.reference(); + + // Direct evidence: 2f+1 blocks at r-1 reference this leader. + let vote_round = clock_round - 1; + let vote_blocks = self.dag_state.get_blocks_by_round(vote_round); + let mut vote_stake: u64 = 0; + for block in &vote_blocks { + if block + .block_references() + .iter() + .any(|r| *r == leader_ref) + { + vote_stake += self + .committee + .get_stake(block.authority()) + .unwrap_or(0); + } + } + if self.committee.is_quorum(vote_stake) { + return Some(leader_ref); + } + + // Propagation: f+1 blocks at round r carry the same certificate. + let current_blocks = self.dag_state.get_blocks_by_round(clock_round); + let mut prop_stake: u64 = 0; + for block in ¤t_blocks { + if block.unprovable_certificate() == Some(&leader_ref) { + prop_stake += self + .committee + .get_stake(block.authority()) + .unwrap_or(0); + } + } + if prop_stake >= self.committee.validity_threshold() { + return Some(leader_ref); + } + + None + } + /// Compute SailfishPlusPlus control fields for a new block at /// `clock_round`. /// @@ -1074,6 +1141,34 @@ impl Core { }) .collect(); } + // MysticetiCompress: non-leaders include only the leader at r-1 + // (own_prev is always prepended by build_block). Leaders keep the + // full clean-DAG frontier for the quorum requirement. + if self.dag_state.consensus_protocol == ConsensusProtocol::MysticetiCompress { + let is_leader = self.committee.elect_leader(block_round) == self.authority; + if !is_leader { + let prev_round = block_round.saturating_sub(1); + let leader = self.committee.elect_leader(prev_round); + return pending_refs + .iter() + .copied() + .filter(|r| r.authority == leader && r.round == prev_round) + .take(1) + .collect(); + } + // Leaders: keep all previous-round refs (same as SailfishPP). + let prev_round = block_round.saturating_sub(1); + let mut seen = AHashSet::new(); + return pending_refs + .iter() + .copied() + .filter(|r| { + r.authority != self.authority + && seen.insert(*r) + && r.round >= prev_round + }) + .collect(); + } if self.dag_state.consensus_protocol == ConsensusProtocol::StarfishBls { if self.committee.elect_leader(block_round) != self.authority { return Vec::new(); From fe4d4bb2705972baeecd1be72aaf2b6db6007ee5 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 18:14:07 +0100 Subject: [PATCH 5/9] refactor(core): generalize dual DAG logic for MysticetiCompress Replace SailfishPlusPlus-specific checks with uses_dual_dag() helper: - Certified ref recovery now applies to both SailfishPlusPlus and MysticetiCompress - proposal_round logic for vertex certification applies to both dual DAG protocols This enables MysticetiCompress to benefit from the same vertex certification and proposal timing mechanisms as SailfishPlusPlus. Co-Authored-By: Claude --- crates/starfish-core/src/dag_state.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index 1bcc678..6241021 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -554,7 +554,7 @@ impl DagState { // Recover persisted active Sailfish++ certifications. Only the active, // ancestor-closed frontier is persisted; preliminary waiting state is // intentionally not recovered. - if consensus_protocol == ConsensusProtocol::SailfishPlusPlus { + if consensus_protocol.uses_dual_dag() { let certified_from_round = if use_windowed { inner.evicted_rounds.iter().copied().min().unwrap_or(0) } else { @@ -661,7 +661,7 @@ impl DagState { /// clock progress to `r` and quorum stake of RBC-certified vertices in /// round `r - 1`. Other protocols use the raw threshold clock directly. pub fn proposal_round(&self) -> RoundNumber { - if self.consensus_protocol != ConsensusProtocol::SailfishPlusPlus { + if !self.consensus_protocol.uses_dual_dag() { return self.threshold_clock_round(); } @@ -1497,7 +1497,7 @@ impl DagState { } } - if self.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + if self.consensus_protocol.uses_dual_dag() && quorum_round > 1 && !self.certified_parent_quorum(quorum_round - 1) { From d63c403c9cdceba11ab5fb3808c04be161e89e01 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 18:14:19 +0100 Subject: [PATCH 6/9] refactor(core): use uses_dual_dag() helper for protocol checks Replace explicit SailfishPlusPlus checks with uses_dual_dag() helper in dag_state.rs to support both SailfishPlusPlus and MysticetiCompress protocols for: - Vertex certification recovery from storage - Proposal round calculation (requires certified vertices) - Certified parent quorum checks This enables MysticetiCompress to benefit from the dual DAG infrastructure (clean/dirty DAG with vertex certification). Co-Authored-By: Claude --- crates/starfish-core/src/dag_state.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index 6241021..6a8c0fd 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -2526,7 +2526,7 @@ impl DagStateInner { committee: &Committee, activated: &mut Vec, ) { - if self.consensus_protocol != ConsensusProtocol::SailfishPlusPlus || block.round() <= 1 { + if !self.consensus_protocol.uses_dual_dag() || block.round() <= 1 { return; } From e357c3d877c96859ca1c64e2aa1ed81fb34c095d Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 18:15:36 +0100 Subject: [PATCH 7/9] feat(core): add unprovable_certificate dependency tracking for MysticetiCompress Ensure unprovable_certificate targets are included in missing dependencies check. When a MysticetiCompress block references an unprovable certificate, that certificate block must be present in the vertex certification set before the block can be processed. This maintains ancestor-closure invariant for the clean DAG. Co-Authored-By: Claude --- crates/starfish-core/src/dag_state.rs | 88 +++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index 6a8c0fd..d9f4a7b 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -2440,6 +2440,19 @@ impl DagStateInner { missing.push(*parent); } } + // MysticetiCompress: the unprovable_certificate target is also a + // causal dependency that must be ancestor-closed. + if self.consensus_protocol == ConsensusProtocol::MysticetiCompress { + if let Some(cert_ref) = block.unprovable_certificate() { + if cert_ref.round > 0 + && !self.vertex_certificates[cert_ref.authority as usize] + .contains(cert_ref) + && !missing.contains(cert_ref) + { + missing.push(*cert_ref); + } + } + } missing } @@ -2518,6 +2531,7 @@ impl DagStateInner { self.update_data_availability(&block); self.update_starfish_speed_leader_hints(&block); self.update_inferred_sailfish_support(&block, committee, activated); + self.check_compress_pre_clean(&block, committee, activated); } fn update_inferred_sailfish_support( @@ -2550,6 +2564,80 @@ impl DagStateInner { } } + /// MysticetiCompress pre-clean check: a block is pre-clean if it has no + /// `unprovable_certificate`, or its certificate is verified via the dirty + /// DAG. Pre-clean blocks feed into the vertex certification pipeline. + fn check_compress_pre_clean( + &mut self, + block: &VerifiedBlock, + committee: &Committee, + activated: &mut Vec, + ) { + if self.consensus_protocol != ConsensusProtocol::MysticetiCompress { + return; + } + let block_ref = *block.reference(); + let is_pre_clean = match block.unprovable_certificate() { + None => true, + Some(leader_ref) => { + self.verify_unprovable_certificate_local( + block_ref.round, + leader_ref, + committee, + ) + } + }; + if is_pre_clean { + self.note_vertex_certified(block_ref, activated); + } + } + + /// Verify an unprovable certificate against the local dirty DAG. + /// Returns true if 2f+1 blocks at r-1 reference the leader (direct) + /// or f+1 blocks at r propagate the same certificate. + fn verify_unprovable_certificate_local( + &self, + block_round: RoundNumber, + leader_ref: &BlockReference, + committee: &Committee, + ) -> bool { + // Direct: 2f+1 at r-1 reference the leader. + let vote_round = block_round.saturating_sub(1); + let mut stake: Stake = 0; + for auth_dag in &self.index { + if let Some(round_blocks) = auth_dag.get(&vote_round) { + for block in round_blocks.values() { + if block + .block_references() + .iter() + .any(|r| r == leader_ref) + { + stake += committee + .get_stake(block.authority()) + .unwrap_or(0); + } + } + } + } + if committee.is_quorum(stake) { + return true; + } + // Propagation: f+1 at round r carry the same certificate. + let mut prop_stake: Stake = 0; + for auth_dag in &self.index { + if let Some(round_blocks) = auth_dag.get(&block_round) { + for block in round_blocks.values() { + if block.unprovable_certificate() == Some(leader_ref) { + prop_stake += committee + .get_stake(block.authority()) + .unwrap_or(0); + } + } + } + } + prop_stake >= committee.validity_threshold() + } + fn infer_vertex_certificate_closure( &mut self, root: BlockReference, From 67ae40f6c942405f13274bf0e9aba51e64dca4ed Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 20:38:08 +0100 Subject: [PATCH 8/9] feat(core): add Bluestreak protocol --- crates/orchestrator/src/benchmark.rs | 3 +- crates/orchestrator/src/main.rs | 2 +- crates/starfish-core/src/broadcaster.rs | 6 +- .../starfish-core/src/consensus/linearizer.rs | 10 +- .../src/consensus/universal_committer.rs | 227 ++++++++++++- crates/starfish-core/src/core.rs | 173 +++++----- crates/starfish-core/src/crypto.rs | 88 +++++ crates/starfish-core/src/dag_state.rs | 316 ++++++++++++++---- crates/starfish-core/src/net_sync.rs | 3 +- crates/starfish-core/src/types.rs | 259 +++++++++++++- crates/starfish-core/src/validator.rs | 8 + scripts/dryrun.sh | 2 +- 12 files changed, 925 insertions(+), 172 deletions(-) diff --git a/crates/orchestrator/src/benchmark.rs b/crates/orchestrator/src/benchmark.rs index f8687f4..b9ec6b3 100644 --- a/crates/orchestrator/src/benchmark.rs +++ b/crates/orchestrator/src/benchmark.rs @@ -38,7 +38,8 @@ pub struct BenchmarkParametersGeneric { /// paying for data sent between the nodes. pub use_internal_ip_address: bool, // Consensus protocol to deploy - // (starfish | starfish-speed | starfish-bls | mysticeti | cordial-miners) + // (starfish | starfish-speed | starfish-bls | mysticeti | + // cordial-miners | bluestreak) pub consensus_protocol: String, /// number Byzantine nodes pub byzantine_nodes: usize, diff --git a/crates/orchestrator/src/main.rs b/crates/orchestrator/src/main.rs index 4a5ae98..8c5b581 100644 --- a/crates/orchestrator/src/main.rs +++ b/crates/orchestrator/src/main.rs @@ -115,7 +115,7 @@ pub enum Operation { /// Consensus to deploy. Available options: /// starfish | starfish-speed | starfish-bls | mysticeti | - /// cordial-miners + /// cordial-miners | bluestreak #[clap(long, value_name = "STRING", default_value = "starfish", global = true)] consensus: String, diff --git a/crates/starfish-core/src/broadcaster.rs b/crates/starfish-core/src/broadcaster.rs index f4a519a..b9cd166 100644 --- a/crates/starfish-core/src/broadcaster.rs +++ b/crates/starfish-core/src/broadcaster.rs @@ -86,7 +86,7 @@ impl BroadcasterParameters { | ConsensusProtocol::StarfishSpeed | ConsensusProtocol::StarfishBls | ConsensusProtocol::CordialMiners - | ConsensusProtocol::MysticetiCompress => Self { + | ConsensusProtocol::Bluestreak => Self { batch_own_block_size: committee_size, batch_other_block_size: committee_size * committee_size, batch_shard_size: committee_size * committee_size, @@ -419,7 +419,7 @@ where ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => { + | ConsensusProtocol::Bluestreak => { let all_blocks = self.inner.dag_state.get_storage_blocks(&block_references); let mut blocks = Vec::new(); @@ -824,7 +824,7 @@ fn push_transport_format(consensus_protocol: ConsensusProtocol) -> PushOtherBloc ConsensusProtocol::CordialMiners | ConsensusProtocol::Mysticeti | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => PushOtherBlocksFormat::FullBlocks, + | ConsensusProtocol::Bluestreak => PushOtherBlocksFormat::FullBlocks, } } diff --git a/crates/starfish-core/src/consensus/linearizer.rs b/crates/starfish-core/src/consensus/linearizer.rs index 0e44eb7..fddc58b 100644 --- a/crates/starfish-core/src/consensus/linearizer.rs +++ b/crates/starfish-core/src/consensus/linearizer.rs @@ -100,6 +100,7 @@ impl Linearizer { let mut to_commit = Vec::new(); let leader_block_ref = *leader_block.reference(); let min_round = leader_block_ref.round.saturating_sub(MAX_TRAVERSAL_DEPTH); + let follow_unprovable = dag_state.consensus_protocol == ConsensusProtocol::Bluestreak; assert!(self.committed.insert(leader_block_ref)); let mut current_level = vec![leader_block]; @@ -115,6 +116,13 @@ impl Linearizer { next_refs.push(*reference); } } + if follow_unprovable { + if let Some(cert_ref) = x.unprovable_certificate() { + if cert_ref.round >= min_round && self.committed.insert(*cert_ref) { + next_refs.push(*cert_ref); + } + } + } } if next_refs.is_empty() { break; @@ -260,7 +268,7 @@ impl Linearizer { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => { + | ConsensusProtocol::Bluestreak => { self.collect_subdag_ancestors(dag_state, leader_block) } }; diff --git a/crates/starfish-core/src/consensus/universal_committer.rs b/crates/starfish-core/src/consensus/universal_committer.rs index ca0536e..636365a 100644 --- a/crates/starfish-core/src/consensus/universal_committer.rs +++ b/crates/starfish-core/src/consensus/universal_committer.rs @@ -43,6 +43,9 @@ impl UniversalCommitter { if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus { return self.try_commit_sailfish(last_decided); } + if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { + return self.try_commit_bluestreak(last_decided); + } let highest_known_round = self.dag_state.highest_round(); let last_decided_round = last_decided.round(); @@ -180,6 +183,133 @@ impl UniversalCommitter { .collect() } + fn try_commit_bluestreak(&mut self, last_decided: BlockReference) -> Vec { + let highest_known_round = self.dag_state.highest_round(); + let last_decided_round = last_decided.round(); + let highest_anchor = highest_known_round.saturating_sub(2); + let mut committed = Vec::new(); + let mut newly_committed = AHashSet::new(); + + for round in last_decided_round + 1..=highest_anchor { + let leader = self.committee.elect_leader(round); + + if self.check_direct_skip_bluestreak(leader, round) { + let key = (leader, round); + if !self.decided.contains_key(&key) { + let status = LeaderStatus::Skip(leader, round); + self.decided.insert(key, status.clone()); + if self.metrics_emitted.insert(key) { + tracing::debug!("Decided {status}"); + self.update_metrics(&status, true); + } + committed.push(status); + } + continue; + } + + let Some(anchor) = self.try_direct_commit_bluestreak(leader, round) else { + continue; + }; + + let mut chain = vec![anchor.clone()]; + let mut current = anchor; + for prev_round in (last_decided_round + 1..current.round()).rev() { + let prev_leader = self.committee.elect_leader(prev_round); + let prev_key = (prev_leader, prev_round); + if newly_committed.contains(&prev_key) { + continue; + } + + let mut linked_leaders: Vec<_> = self + .dag_state + .get_blocks_at_authority_round(prev_leader, prev_round) + .into_iter() + .filter(|block| self.dag_state.has_vertex_certificate(block.reference())) + .filter(|block| self.dag_state.linked(¤t, block)) + .collect(); + + if linked_leaders.len() > 1 { + panic!( + "[Bluestreak] More than one linked leader for {}", + format_authority_round(prev_leader, prev_round) + ); + } + + if let Some(prev) = linked_leaders.pop() { + current = prev.clone(); + chain.push(prev); + } + } + + chain.reverse(); + for leader_block in chain { + let key = (leader_block.authority(), leader_block.round()); + if !newly_committed.insert(key) { + continue; + } + let direct_decide = key.1 == round; + let status = LeaderStatus::Commit(leader_block, None); + self.decided.insert(key, status.clone()); + if self.metrics_emitted.insert(key) { + tracing::debug!("Decided {status}"); + self.update_metrics(&status, direct_decide); + } + committed.push(status); + } + } + + committed.sort(); + committed + } + + fn try_direct_commit_bluestreak( + &self, + leader: AuthorityIndex, + leader_round: RoundNumber, + ) -> Option> { + let cert_round = leader_round + 2; + let leader_blocks = self + .dag_state + .get_blocks_at_authority_round(leader, leader_round); + + for leader_block in leader_blocks { + if !self + .dag_state + .has_vertex_certificate(leader_block.reference()) + { + continue; + } + let leader_ref = *leader_block.reference(); + let cert_blocks = self.dag_state.get_blocks_by_round_cached(cert_round); + let mut supporters = StakeAggregator::::new(); + for block in cert_blocks.iter() { + if block.unprovable_certificate() == Some(&leader_ref) + && supporters.add(block.authority(), &self.committee) + { + return Some(leader_block); + } + } + } + + None + } + + fn check_direct_skip_bluestreak(&self, leader: AuthorityIndex, round: RoundNumber) -> bool { + let vote_round = round + 1; + let vote_blocks = self.dag_state.get_blocks_by_round_cached(vote_round); + let mut non_voters = StakeAggregator::::new(); + for block in vote_blocks.iter() { + let votes_for = block + .block_references() + .iter() + .any(|r| r.round == round && r.authority == leader); + if !votes_for && non_voters.add(block.authority(), &self.committee) { + return true; + } + } + false + } + fn try_commit_sailfish(&mut self, last_decided: BlockReference) -> Vec { let highest_known_round = self.dag_state.highest_round(); let last_decided_round = last_decided.round(); @@ -432,7 +562,7 @@ impl UniversalCommitterBuilder { wave_length: WAVE_LENGTH, pipeline: false, }, - ConsensusProtocol::MysticetiCompress => Self { + ConsensusProtocol::Bluestreak => Self { committee, dag_state, metrics, @@ -573,4 +703,99 @@ mod tests { "expected round-1 leader to commit with f+1 certified supporters in round 2" ); } + + #[test] + fn bluestreak_direct_commit_uses_unprovable_certificates() { + let dag_state = open_test_dag_state_for("bluestreak", 0); + let committee = Committee::new_for_benchmarks(4); + let registry = Registry::new(); + let (metrics, _reporter) = Metrics::new( + ®istry, + Some(committee.as_ref()), + Some("bluestreak"), + None, + ); + + let leader = make_full_block(1, 1, vec![BlockReference::new_test(1, 0)]); + let leader_ref = *leader.reference(); + let vote_a = make_full_block(0, 2, vec![leader_ref]); + let vote_b = make_full_block(2, 2, vec![leader_ref]); + let vote_c = make_full_block(3, 2, vec![leader_ref]); + + let empty_transactions = Vec::new(); + let commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); + + let mut cert_a = crate::types::VerifiedBlock::new_with_unprovable( + 0, + 3, + vec![*vote_a.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + Some(leader_ref), + ); + cert_a.preserialize(); + let cert_a = Data::new(cert_a); + + let mut cert_b = crate::types::VerifiedBlock::new_with_unprovable( + 2, + 3, + vec![*vote_b.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + Some(leader_ref), + ); + cert_b.preserialize(); + let cert_b = Data::new(cert_b); + + let mut cert_c = crate::types::VerifiedBlock::new_with_unprovable( + 3, + 3, + vec![*vote_c.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions, + commitment, + None, + None, + None, + Some(leader_ref), + ); + cert_c.preserialize(); + let cert_c = Data::new(cert_c); + + dag_state.insert_general_block(leader, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(vote_a, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(vote_b, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(vote_c, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(cert_a, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(cert_b, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(cert_c, DataSource::BlockBundleStreaming); + + let mut committer = UniversalCommitterBuilder::new(committee, dag_state, metrics).build(); + let decided = committer.try_commit(BlockReference::new_test(0, 0)); + + assert!( + decided.iter().any(|status| { + matches!( + status, + LeaderStatus::Commit(block, None) + if block.authority() == 1 && block.round() == 1 + ) + }), + "expected round-1 leader to commit from round-3 unprovable certificates" + ); + } } diff --git a/crates/starfish-core/src/core.rs b/crates/starfish-core/src/core.rs index 8838f28..22af544 100644 --- a/crates/starfish-core/src/core.rs +++ b/crates/starfish-core/src/core.rs @@ -13,7 +13,7 @@ use crate::{ block_handler::BlockHandler, block_manager::BlockManager, bls_certificate_aggregator::{BlsCertificateAggregator, apply_certificate_events}, - committee::Committee, + committee::{Committee, QuorumThreshold, StakeAggregator, ValidityThreshold}, config::NodePrivateConfig, consensus::{ CommitMetastate, @@ -480,7 +480,7 @@ impl Core { } // SailfishPlusPlus: require certified parent quorum before creating a block. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + if self.dag_state.consensus_protocol.uses_dual_dag() && clock_round > 1 && !self.dag_state.certified_parent_quorum(clock_round - 1) { @@ -523,9 +523,13 @@ impl Core { // set below threshold-clock quorum, we cannot build a valid block yet. // Requeue both transactions and include refs so the next attempt sees // them again. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + let allow_own_prev_only = self.dag_state.consensus_protocol + == ConsensusProtocol::Bluestreak + && self.committee.elect_leader(clock_round.saturating_sub(1)) == self.authority; + if self.dag_state.consensus_protocol.uses_dual_dag() && clock_round > 1 && block_references.is_empty() + && !allow_own_prev_only { for r in raw_refs { self.pending.push(MetaTransaction::Include(r)); @@ -688,15 +692,19 @@ impl Core { self.compress_pending_block_references(&pending_refs, block_round); // SailfishPlusPlus: filter parents to only include certified blocks. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus { + if self.dag_state.consensus_protocol.uses_dual_dag() { block_references.retain(|r| r.round == 0 || self.dag_state.has_vertex_certificate(r)); } // SailfishPlusPlus: verify the filtered parent set, together with the // creator's own previous block (always included by build_block), still // has quorum stake at round-1. - if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + let compress_non_leader = self.dag_state.consensus_protocol + == ConsensusProtocol::Bluestreak + && self.committee.elect_leader(block_round) != self.authority; + if self.dag_state.consensus_protocol.uses_dual_dag() && block_round > 1 + && !compress_non_leader { let prev_round = block_round - 1; let mut prev_round_stake: u64 = 0; @@ -747,7 +755,7 @@ impl Core { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => None, + | ConsensusProtocol::Bluestreak => None, } } @@ -913,8 +921,17 @@ impl Core { } else { None }; + let unprovable_certificate = if self.dag_state.consensus_protocol + == ConsensusProtocol::Bluestreak + && self.committee.elect_leader(clock_round) != self.authority + && clock_round >= 3 + { + self.compute_unprovable_certificate(clock_round, &block_references) + } else { + None + }; - let mut block = VerifiedBlock::new_with_signer( + let mut block = VerifiedBlock::new_with_signer_and_unprovable( self.authority, clock_round, block_references, @@ -934,21 +951,9 @@ impl Core { precomputed_round_sig, precomputed_leader_sig, sailfish_fields, + unprovable_certificate, ); - // MysticetiCompress: compute and set unprovable_certificate for - // non-leader blocks. - if self.dag_state.consensus_protocol == ConsensusProtocol::MysticetiCompress - && self.committee.elect_leader(clock_round) != self.authority - && clock_round >= 3 - { - let cert = self.compute_unprovable_certificate( - clock_round, - block.block_references(), - ); - block.header.unprovable_certificate = cert; - } - self.metrics .proposed_block_refs .observe(block_ref_count as f64); @@ -960,60 +965,6 @@ impl Core { Data::new(block) } - /// Compute the unprovable certificate for a MysticetiCompress non-leader - /// block at `clock_round`. Returns a reference to the leader at r-2 if - /// 2f+1 blocks at r-1 reference it (direct evidence) or f+1 blocks at r - /// already propagate the same certificate. - fn compute_unprovable_certificate( - &self, - clock_round: RoundNumber, - _block_references: &[BlockReference], - ) -> Option { - let leader_round = clock_round - 2; - let leader = self.committee.elect_leader(leader_round); - let leader_blocks = - self.dag_state - .get_blocks_at_authority_round(leader, leader_round); - let leader_ref = *leader_blocks.first()?.reference(); - - // Direct evidence: 2f+1 blocks at r-1 reference this leader. - let vote_round = clock_round - 1; - let vote_blocks = self.dag_state.get_blocks_by_round(vote_round); - let mut vote_stake: u64 = 0; - for block in &vote_blocks { - if block - .block_references() - .iter() - .any(|r| *r == leader_ref) - { - vote_stake += self - .committee - .get_stake(block.authority()) - .unwrap_or(0); - } - } - if self.committee.is_quorum(vote_stake) { - return Some(leader_ref); - } - - // Propagation: f+1 blocks at round r carry the same certificate. - let current_blocks = self.dag_state.get_blocks_by_round(clock_round); - let mut prop_stake: u64 = 0; - for block in ¤t_blocks { - if block.unprovable_certificate() == Some(&leader_ref) { - prop_stake += self - .committee - .get_stake(block.authority()) - .unwrap_or(0); - } - } - if prop_stake >= self.committee.validity_threshold() { - return Some(leader_ref); - } - - None - } - /// Compute SailfishPlusPlus control fields for a new block at /// `clock_round`. /// @@ -1092,6 +1043,46 @@ impl Core { true } + fn compute_unprovable_certificate( + &self, + clock_round: RoundNumber, + _block_references: &[BlockReference], + ) -> Option { + let leader_round = clock_round.checked_sub(2)?; + let leader = self.committee.elect_leader(leader_round); + let leader_block = self + .dag_state + .get_blocks_at_authority_round(leader, leader_round) + .into_iter() + .next()?; + let leader_ref = *leader_block.reference(); + + let vote_round = clock_round.checked_sub(1)?; + let vote_blocks = self.dag_state.get_blocks_by_round_cached(vote_round); + let mut vote_stake = StakeAggregator::::new(); + for block in vote_blocks.iter() { + if block.block_references().iter().any(|r| *r == leader_ref) { + vote_stake.add(block.authority(), &self.committee); + } + } + if vote_stake.is_quorum(&self.committee) { + return Some(leader_ref); + } + + let current_round_blocks = self.dag_state.get_blocks_by_round_cached(clock_round); + let mut propagation_stake = StakeAggregator::::new(); + for block in current_round_blocks.iter() { + if block.unprovable_certificate() == Some(&leader_ref) { + propagation_stake.add(block.authority(), &self.committee); + } + } + if propagation_stake.get_stake() >= self.committee.validity_threshold() { + return Some(leader_ref); + } + + None + } + fn prepare_last_blocks(&mut self) { let target = match self.dag_state.byzantine_strategy { Some( @@ -1127,6 +1118,30 @@ impl Core { pending_refs: &[BlockReference], block_round: RoundNumber, ) -> Vec { + if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { + let is_leader = self.committee.elect_leader(block_round) == self.authority; + if !is_leader { + let prev_round = block_round.saturating_sub(1); + let leader = self.committee.elect_leader(prev_round); + return pending_refs + .iter() + .copied() + .filter(|r| r.authority == leader && r.round == prev_round) + .take(1) + .collect(); + } + + let prev_round = block_round.saturating_sub(1); + let mut seen = AHashSet::new(); + return pending_refs + .iter() + .copied() + .filter(|r| { + r.authority != self.authority && seen.insert(*r) && r.round >= prev_round + }) + .collect(); + } + // SailfishPlusPlus: keep all previous-round references unconditionally // so that certified-parent filtering doesn't drop below quorum. // Only older-round references go through normal compression. @@ -1141,10 +1156,10 @@ impl Core { }) .collect(); } - // MysticetiCompress: non-leaders include only the leader at r-1 + // Bluestreak: non-leaders include only the leader at r-1 // (own_prev is always prepended by build_block). Leaders keep the // full clean-DAG frontier for the quorum requirement. - if self.dag_state.consensus_protocol == ConsensusProtocol::MysticetiCompress { + if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { let is_leader = self.committee.elect_leader(block_round) == self.authority; if !is_leader { let prev_round = block_round.saturating_sub(1); @@ -1163,9 +1178,7 @@ impl Core { .iter() .copied() .filter(|r| { - r.authority != self.authority - && seen.insert(*r) - && r.round >= prev_round + r.authority != self.authority && seen.insert(*r) && r.round >= prev_round }) .collect(); } @@ -1396,7 +1409,7 @@ impl Core { } fn flush_pending_sailfish_certified_refs(&self) { - if self.dag_state.consensus_protocol != ConsensusProtocol::SailfishPlusPlus { + if !self.dag_state.consensus_protocol.uses_dual_dag() { return; } self.dag_state.flush_pending_sailfish_certified_refs(); diff --git a/crates/starfish-core/src/crypto.rs b/crates/starfish-core/src/crypto.rs index a1583c5..f5ca401 100644 --- a/crates/starfish-core/src/crypto.rs +++ b/crates/starfish-core/src/crypto.rs @@ -184,6 +184,30 @@ impl BlockDigest { signature: &SignatureBytes, merkle_root: TransactionsCommitment, strong_vote: Option, + ) -> Self { + Self::new_without_transactions_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + signature, + merkle_root, + strong_vote, + None, + ) + } + + pub fn new_without_transactions_with_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: &[BlockReference], + acknowledgment_references: &[BlockReference], + meta_creation_time_ns: TimestampNs, + signature: &SignatureBytes, + merkle_root: TransactionsCommitment, + strong_vote: Option, + unprovable_certificate: Option<&BlockReference>, ) -> Self { let mut hasher = Blake3Hasher::new(); Self::digest_without_signature( @@ -196,6 +220,7 @@ impl BlockDigest { merkle_root, strong_vote, ); + Self::hash_unprovable_certificate(&mut hasher, unprovable_certificate); hasher.update(signature.as_bytes()); Self(hasher.finalize().into()) } @@ -209,6 +234,30 @@ impl BlockDigest { signature: &SignatureBytes, transactions_commitment: TransactionsCommitment, strong_vote: Option, + ) -> Self { + Self::new_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + signature, + transactions_commitment, + strong_vote, + None, + ) + } + + pub fn new_with_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: &[BlockReference], + acknowledgment_references: &[BlockReference], + meta_creation_time_ns: TimestampNs, + signature: &SignatureBytes, + transactions_commitment: TransactionsCommitment, + strong_vote: Option, + unprovable_certificate: Option<&BlockReference>, ) -> Self { let mut hasher = Blake3Hasher::new(); Self::digest_without_signature( @@ -221,6 +270,7 @@ impl BlockDigest { transactions_commitment, strong_vote, ); + Self::hash_unprovable_certificate(&mut hasher, unprovable_certificate); hasher.update(signature.as_bytes()); Self(hasher.finalize().into()) } @@ -250,6 +300,19 @@ impl BlockDigest { CryptoHash::crypto_hash(&mask, hasher); } } + + /// Extend a block digest hasher with the Bluestreak unprovable + /// certificate reference. Called after `digest_without_signature` and + /// before finalizing. No-op when `None`, preserving backward + /// compatibility. + pub(crate) fn hash_unprovable_certificate( + hasher: &mut Blake3Hasher, + unprovable_certificate: Option<&BlockReference>, + ) { + if let Some(cert_ref) = unprovable_certificate { + cert_ref.crypto_hash(hasher); + } + } } impl From<[u8; BLOCK_DIGEST_SIZE]> for BlockDigest { @@ -418,6 +481,7 @@ impl PublicKey { header.merkle_root(), header.strong_vote(), ); + BlockDigest::hash_unprovable_certificate(&mut hasher, header.unprovable_certificate()); let digest: [u8; BLOCK_DIGEST_SIZE] = hasher.finalize().into(); self.0.verify(&signature, digest.as_ref()) } @@ -461,6 +525,29 @@ impl Signer { meta_creation_time_ns: TimestampNs, transactions_commitment: TransactionsCommitment, strong_vote: Option, + ) -> SignatureBytes { + self.sign_block_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + transactions_commitment, + strong_vote, + None, + ) + } + + pub fn sign_block_with_unprovable( + &self, + authority: AuthorityIndex, + round: RoundNumber, + block_references: &[BlockReference], + acknowledgment_references: &[BlockReference], + meta_creation_time_ns: TimestampNs, + transactions_commitment: TransactionsCommitment, + strong_vote: Option, + unprovable_certificate: Option<&BlockReference>, ) -> SignatureBytes { let mut hasher = Blake3Hasher::new(); BlockDigest::digest_without_signature( @@ -473,6 +560,7 @@ impl Signer { transactions_commitment, strong_vote, ); + BlockDigest::hash_unprovable_certificate(&mut hasher, unprovable_certificate); let digest: [u8; BLOCK_DIGEST_SIZE] = hasher.finalize().into(); let signature = self.0.sign(digest.as_ref()); SignatureBytes(signature.to_bytes()) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index d9f4a7b..6e0db05 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -101,7 +101,7 @@ pub enum ConsensusProtocol { StarfishSpeed, StarfishBls, SailfishPlusPlus, - MysticetiCompress, + Bluestreak, } impl ConsensusProtocol { @@ -113,7 +113,7 @@ impl ConsensusProtocol { "starfish-bls" | "starfish-l" => ConsensusProtocol::StarfishBls, "starfish-speed" | "starfish-s" => ConsensusProtocol::StarfishSpeed, "sailfish++" | "sailfish-pp" => ConsensusProtocol::SailfishPlusPlus, - "mysticeti-compress" => ConsensusProtocol::MysticetiCompress, + "bluestreak" => ConsensusProtocol::Bluestreak, _ => ConsensusProtocol::Starfish, } } @@ -131,15 +131,15 @@ impl ConsensusProtocol { matches!(self, ConsensusProtocol::SailfishPlusPlus) } - pub fn is_mysticeti_compress(self) -> bool { - matches!(self, ConsensusProtocol::MysticetiCompress) + pub fn is_bluestreak(self) -> bool { + matches!(self, ConsensusProtocol::Bluestreak) } /// Protocols that use a dual dirty/clean DAG with vertex certification. pub fn uses_dual_dag(self) -> bool { matches!( self, - ConsensusProtocol::SailfishPlusPlus | ConsensusProtocol::MysticetiCompress + ConsensusProtocol::SailfishPlusPlus | ConsensusProtocol::Bluestreak ) } @@ -148,7 +148,7 @@ impl ConsensusProtocol { ConsensusProtocol::Mysticeti | ConsensusProtocol::SailfishPlusPlus => { DisseminationMode::Pull } - ConsensusProtocol::MysticetiCompress + ConsensusProtocol::Bluestreak | ConsensusProtocol::CordialMiners | ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed @@ -551,7 +551,7 @@ impl DagState { } } - // Recover persisted active Sailfish++ certifications. Only the active, + // Recover persisted active certified vertices. Only the active, // ancestor-closed frontier is persisted; preliminary waiting state is // intentionally not recovered. if consensus_protocol.uses_dual_dag() { @@ -577,6 +577,19 @@ impl DagState { } } } + if consensus_protocol == ConsensusProtocol::Bluestreak { + let mut recovered_blocks: Vec<_> = inner + .index + .iter() + .flat_map(|auth_map| auth_map.values()) + .flat_map(|blocks| blocks.values().cloned()) + .collect(); + recovered_blocks.sort_by_key(|block| *block.reference()); + let mut activated = Vec::new(); + for block in recovered_blocks { + inner.check_bluestreak_pre_clean(&block, &committee, &mut activated); + } + } metrics.dag_state_entries.inc_by(block_count); metrics.dag_highest_round.set(inner.highest_round as i64); @@ -626,8 +639,8 @@ impl DagState { ConsensusProtocol::SailfishPlusPlus => { tracing::info!("Starting Sailfish++ protocol") } - ConsensusProtocol::MysticetiCompress => { - tracing::info!("Starting MysticetiCompress protocol") + ConsensusProtocol::Bluestreak => { + tracing::info!("Starting Bluestreak protocol") } } let dag_state = Self { @@ -2279,19 +2292,43 @@ impl DagStateInner { later_block: &Data, earlier_block: &Data, ) -> bool { - let mut parents = AHashSet::from([later_block.clone()]); - for _round_number in (earlier_block.round()..later_block.round()).rev() { - parents = parents - .iter() - .flat_map(|block| block.block_references()) - .map(|block_reference| { - self.get_storage_block(*block_reference) - .expect("Block should be in DagState") - }) - .filter(|included_block| included_block.round() >= earlier_block.round()) - .collect(); + let target = *earlier_block.reference(); + let mut frontier = vec![later_block.clone()]; + let mut visited = AHashSet::new(); + + while let Some(block) = frontier.pop() { + let block_ref = *block.reference(); + if !visited.insert(block_ref) { + continue; + } + if block_ref == target { + return true; + } + if block.round() <= earlier_block.round() { + continue; + } + + for parent_ref in block.block_references() { + let parent = self + .get_storage_block(*parent_ref) + .expect("Block should be in DagState"); + if parent.round() >= earlier_block.round() { + frontier.push(parent); + } + } + if self.consensus_protocol == ConsensusProtocol::Bluestreak { + if let Some(cert_ref) = block.unprovable_certificate() { + let parent = self + .get_storage_block(*cert_ref) + .expect("Block should be in DagState"); + if parent.round() >= earlier_block.round() { + frontier.push(parent); + } + } + } } - parents.contains(earlier_block) + + false } /// Compute all block references reachable from `later_block` at @@ -2302,16 +2339,42 @@ impl DagStateInner { later_block: &Data, target_round: RoundNumber, ) -> AHashSet { - let mut frontier = AHashSet::from([later_block.clone()]); - for _ in (target_round..later_block.round()).rev() { - frontier = frontier - .iter() - .flat_map(|block| block.block_references()) - .filter_map(|r| self.get_storage_block(*r)) - .filter(|b| b.round() >= target_round) - .collect(); + let mut reachable = AHashSet::new(); + let mut frontier = vec![later_block.clone()]; + let mut visited = AHashSet::new(); + + while let Some(block) = frontier.pop() { + let block_ref = *block.reference(); + if !visited.insert(block_ref) { + continue; + } + if block.round() == target_round { + reachable.insert(block_ref); + continue; + } + if block.round() < target_round { + continue; + } + + for reference in block.block_references() { + if let Some(parent) = self.get_storage_block(*reference) { + if parent.round() >= target_round { + frontier.push(parent); + } + } + } + if self.consensus_protocol == ConsensusProtocol::Bluestreak { + if let Some(cert_ref) = block.unprovable_certificate() { + if let Some(parent) = self.get_storage_block(*cert_ref) { + if parent.round() >= target_round { + frontier.push(parent); + } + } + } + } } - frontier.iter().map(|b| *b.reference()).collect() + + reachable } /// Per-authority eviction using BTreeMap::split_off. @@ -2440,13 +2503,12 @@ impl DagStateInner { missing.push(*parent); } } - // MysticetiCompress: the unprovable_certificate target is also a + // Bluestreak: the unprovable_certificate target is also a // causal dependency that must be ancestor-closed. - if self.consensus_protocol == ConsensusProtocol::MysticetiCompress { + if self.consensus_protocol == ConsensusProtocol::Bluestreak { if let Some(cert_ref) = block.unprovable_certificate() { if cert_ref.round > 0 - && !self.vertex_certificates[cert_ref.authority as usize] - .contains(cert_ref) + && !self.vertex_certificates[cert_ref.authority as usize].contains(cert_ref) && !missing.contains(cert_ref) { missing.push(*cert_ref); @@ -2481,6 +2543,14 @@ impl DagStateInner { } } } + if let Some(cert_ref) = block.unprovable_certificate() { + if let Some(children) = self.pending_vertex_certificate_children.get_mut(cert_ref) { + children.remove(&block_ref); + if children.is_empty() { + self.pending_vertex_certificate_children.remove(cert_ref); + } + } + } activated.push(block_ref); self.pending_persisted_vertex_certificates[auth].insert(block_ref); @@ -2531,7 +2601,7 @@ impl DagStateInner { self.update_data_availability(&block); self.update_starfish_speed_leader_hints(&block); self.update_inferred_sailfish_support(&block, committee, activated); - self.check_compress_pre_clean(&block, committee, activated); + self.check_bluestreak_pre_clean(&block, committee, activated); } fn update_inferred_sailfish_support( @@ -2564,78 +2634,94 @@ impl DagStateInner { } } - /// MysticetiCompress pre-clean check: a block is pre-clean if it has no + /// Bluestreak pre-clean check: a block is pre-clean if it has no /// `unprovable_certificate`, or its certificate is verified via the dirty /// DAG. Pre-clean blocks feed into the vertex certification pipeline. - fn check_compress_pre_clean( + fn check_bluestreak_pre_clean( &mut self, block: &VerifiedBlock, committee: &Committee, activated: &mut Vec, ) { - if self.consensus_protocol != ConsensusProtocol::MysticetiCompress { + if self.consensus_protocol != ConsensusProtocol::Bluestreak { return; } let block_ref = *block.reference(); let is_pre_clean = match block.unprovable_certificate() { None => true, Some(leader_ref) => { - self.verify_unprovable_certificate_local( - block_ref.round, - leader_ref, - committee, - ) + self.verify_unprovable_certificate(block_ref.round, leader_ref, committee) } }; if is_pre_clean { self.note_vertex_certified(block_ref, activated); } + self.reevaluate_bluestreak_pending(committee, activated); } /// Verify an unprovable certificate against the local dirty DAG. /// Returns true if 2f+1 blocks at r-1 reference the leader (direct) /// or f+1 blocks at r propagate the same certificate. - fn verify_unprovable_certificate_local( + fn verify_unprovable_certificate( &self, block_round: RoundNumber, leader_ref: &BlockReference, committee: &Committee, ) -> bool { - // Direct: 2f+1 at r-1 reference the leader. - let vote_round = block_round.saturating_sub(1); - let mut stake: Stake = 0; - for auth_dag in &self.index { - if let Some(round_blocks) = auth_dag.get(&vote_round) { - for block in round_blocks.values() { - if block - .block_references() - .iter() - .any(|r| r == leader_ref) - { - stake += committee - .get_stake(block.authority()) - .unwrap_or(0); - } - } + if leader_ref.round + 2 != block_round { + return false; + } + + let vote_round = match block_round.checked_sub(1) { + Some(round) => round, + None => return false, + }; + let mut stake = StakeAggregator::::new(); + for block in self.get_blocks_by_round(vote_round) { + if block.block_references().iter().any(|r| r == leader_ref) { + stake.add(block.authority(), committee); } } - if committee.is_quorum(stake) { + if stake.is_quorum(committee) { return true; } - // Propagation: f+1 at round r carry the same certificate. - let mut prop_stake: Stake = 0; - for auth_dag in &self.index { - if let Some(round_blocks) = auth_dag.get(&block_round) { + + let mut propagation = StakeAggregator::::new(); + for block in self.get_blocks_by_round(block_round) { + if block.unprovable_certificate() == Some(leader_ref) { + propagation.add(block.authority(), committee); + } + } + propagation.get_stake() >= committee.validity_threshold() + } + + fn reevaluate_bluestreak_pending( + &mut self, + committee: &Committee, + activated: &mut Vec, + ) { + let mut ready = Vec::new(); + for auth_map in &self.index { + for round_blocks in auth_map.values() { for block in round_blocks.values() { - if block.unprovable_certificate() == Some(leader_ref) { - prop_stake += committee - .get_stake(block.authority()) - .unwrap_or(0); + let block_ref = *block.reference(); + if self.vertex_certificates[block_ref.authority as usize].contains(&block_ref) { + continue; + } + let Some(leader_ref) = block.unprovable_certificate() else { + continue; + }; + if self.verify_unprovable_certificate(block.round(), leader_ref, committee) { + ready.push(block_ref); } } } } - prop_stake >= committee.validity_threshold() + ready.sort(); + ready.dedup(); + for block_ref in ready { + self.note_vertex_certified(block_ref, activated); + } } fn infer_vertex_certificate_closure( @@ -2808,6 +2894,7 @@ impl DagStateInner { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak ) && block.transactions().is_none() && block.merkle_root() == TransactionsCommitment::new_from_transactions(&Vec::new()); if block.has_empty_payload() || is_empty_full_block { @@ -3402,7 +3489,7 @@ mod tests { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => { + | ConsensusProtocol::Bluestreak => { TransactionsCommitment::new_from_transactions(&empty_transactions) } ConsensusProtocol::Starfish @@ -3437,7 +3524,7 @@ mod tests { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => { + | ConsensusProtocol::Bluestreak => { TransactionsCommitment::new_from_transactions(&empty_transactions) } ConsensusProtocol::Starfish @@ -3991,6 +4078,91 @@ mod tests { assert!(!dag_state.mark_dac_certified(rejected, cert)); } + #[test] + fn bluestreak_reachability_follows_unprovable_certificate() { + let dag_state = open_test_dag_state_for("bluestreak", 0); + let empty_transactions = Vec::new(); + let commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); + + let mut leader = VerifiedBlock::new( + 1, + 1, + vec![BlockReference::new_test(1, 0)], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + ); + leader.preserialize(); + let leader = Data::new(leader); + let leader_ref = *leader.reference(); + + let mut own_round_one = VerifiedBlock::new( + 0, + 1, + vec![BlockReference::new_test(0, 0)], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + ); + own_round_one.preserialize(); + let own_round_one = Data::new(own_round_one); + + let mut own_prev = VerifiedBlock::new( + 0, + 2, + vec![*own_round_one.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions.clone(), + commitment, + None, + None, + None, + ); + own_prev.preserialize(); + let own_prev = Data::new(own_prev); + + let mut compress = VerifiedBlock::new_with_unprovable( + 0, + 3, + vec![*own_prev.reference()], + Vec::new(), + 0, + SignatureBytes::default(), + empty_transactions, + commitment, + None, + None, + None, + Some(leader_ref), + ); + compress.preserialize(); + let compress = Data::new(compress); + + dag_state.insert_general_block(leader.clone(), DataSource::BlockBundleStreaming); + dag_state.insert_general_block(own_round_one, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(own_prev, DataSource::BlockBundleStreaming); + dag_state.insert_general_block(compress.clone(), DataSource::BlockBundleStreaming); + + assert!(dag_state.linked(&compress, &leader)); + assert!( + dag_state + .reachable_at_round(&compress, 1) + .contains(&leader_ref) + ); + } + #[test] fn consensus_protocol_resolves_dissemination_defaults() { assert_eq!( diff --git a/crates/starfish-core/src/net_sync.rs b/crates/starfish-core/src/net_sync.rs index e1bd378..929732b 100644 --- a/crates/starfish-core/src/net_sync.rs +++ b/crates/starfish-core/src/net_sync.rs @@ -505,7 +505,7 @@ impl ConnectionHandler { + | ConsensusProtocol::Bluestreak => { blocks_with_transactions.push(block); } ConsensusProtocol::Starfish @@ -1606,6 +1606,7 @@ impl NetworkSyncer loop { let notified = inner.threshold_clock_notify.notified(); let round = if inner.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus + || inner.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak { inner.dag_state.proposal_round().saturating_sub(1) } else { diff --git a/crates/starfish-core/src/types.rs b/crates/starfish-core/src/types.rs index a940967..1f465a6 100644 --- a/crates/starfish-core/src/types.rs +++ b/crates/starfish-core/src/types.rs @@ -283,7 +283,7 @@ pub struct BlockHeader { /// Sailfish++ control certificates (timeout/no-vote). None for all other /// protocols. pub(crate) sailfish: Option>, - /// MysticetiCompress unprovable certificate: optional reference to a + /// Bluestreak unprovable certificate: optional reference to a /// leader at round r-2 certified by 2f+1 votes at r-1. pub(crate) unprovable_certificate: Option, @@ -701,6 +701,36 @@ impl VerifiedBlock { strong_vote: Option, bls: Option, sailfish: Option, + ) -> Self { + Self::new_with_unprovable( + authority, + round, + block_references, + acknowledgment_references, + meta_creation_time_ns, + signature, + transactions, + merkle_root, + strong_vote, + bls, + sailfish, + None, + ) + } + + pub fn new_with_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: Vec, + acknowledgment_references: Vec, + meta_creation_time_ns: TimestampNs, + signature: SignatureBytes, + transactions: Vec, + merkle_root: TransactionsCommitment, + strong_vote: Option, + bls: Option, + sailfish: Option, + unprovable_certificate: Option, ) -> Self { let (acknowledgment_intersection, acknowledgment_references) = compress_acknowledgments(&block_references, &acknowledgment_references); @@ -713,7 +743,7 @@ impl VerifiedBlock { reference: BlockReference { authority, round, - digest: BlockDigest::new_without_transactions( + digest: BlockDigest::new_without_transactions_with_unprovable( authority, round, &block_references, @@ -722,6 +752,7 @@ impl VerifiedBlock { &signature, merkle_root, strong_vote, + unprovable_certificate.as_ref(), ), }, block_references, @@ -735,7 +766,7 @@ impl VerifiedBlock { strong_vote, bls: bls.map(Box::new), sailfish: sailfish.map(Box::new), - unprovable_certificate: None, + unprovable_certificate, serialized: None, }; @@ -812,6 +843,53 @@ impl VerifiedBlock { precomputed_round_sig: Option, precomputed_leader_sig: Option, sailfish: Option, + ) -> Self { + Self::new_with_signer_and_unprovable( + authority, + round, + block_references, + voted_leader_ref, + acknowledgment_references, + meta_creation_time_ns, + signer, + bls_signer, + committee_opt, + aggregate_dac_sigs, + transactions, + encoded_transactions, + consensus_protocol, + strong_vote, + aggregate_round_sig, + certified_leader, + precomputed_round_sig, + precomputed_leader_sig, + sailfish, + None, + ) + } + + #[allow(clippy::too_many_arguments)] + pub fn new_with_signer_and_unprovable( + authority: AuthorityIndex, + round: RoundNumber, + block_references: Vec, + voted_leader_ref: Option, + acknowledgment_references: Vec, + meta_creation_time_ns: TimestampNs, + signer: &Signer, + bls_signer: Option<&BlsSigner>, + committee_opt: Option<&Committee>, + aggregate_dac_sigs: Vec, + transactions: Vec, + encoded_transactions: Option>, + consensus_protocol: ConsensusProtocol, + strong_vote: Option, + aggregate_round_sig: Option, + certified_leader: Option<(BlockReference, BlsAggregateCertificate)>, + precomputed_round_sig: Option, + precomputed_leader_sig: Option, + sailfish: Option, + unprovable_certificate: Option, ) -> Self { let transactions_commitment = match consensus_protocol { ConsensusProtocol::Starfish @@ -830,7 +908,7 @@ impl VerifiedBlock { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => { + | ConsensusProtocol::Bluestreak => { TransactionsCommitment::new_from_transactions(&transactions) } }; @@ -846,7 +924,7 @@ impl VerifiedBlock { &acknowledgment_references, aggregate_dac_sigs, ); - let signature = signer.sign_block( + let signature = signer.sign_block_with_unprovable( authority, round, &block_references, @@ -854,6 +932,7 @@ impl VerifiedBlock { meta_creation_time_ns, transactions_commitment, strong_vote, + unprovable_certificate.as_ref(), ); // Build BLS fields when the StarfishBls path is active. Partial round @@ -882,7 +961,7 @@ impl VerifiedBlock { } }); - Self::new( + Self::new_with_unprovable( authority, round, block_references, @@ -894,6 +973,7 @@ impl VerifiedBlock { strong_vote, bls, sailfish, + unprovable_certificate, ) } @@ -1126,7 +1206,7 @@ impl VerifiedBlock { ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners | ConsensusProtocol::SailfishPlusPlus - | ConsensusProtocol::MysticetiCompress => { + | ConsensusProtocol::Bluestreak => { let empty_transactions = Vec::new(); let empty_transactions_commitment = TransactionsCommitment::new_from_transactions(&empty_transactions); @@ -1163,7 +1243,7 @@ impl VerifiedBlock { ); } let acknowledgments = self.header.acknowledgments(); - let digest = BlockDigest::new( + let digest = BlockDigest::new_with_unprovable( self.authority(), round, &self.header.block_references, @@ -1172,6 +1252,7 @@ impl VerifiedBlock { &self.header.signature, self.header.transactions_commitment, self.header.strong_vote, + self.header.unprovable_certificate(), ); ensure!( digest == self.digest(), @@ -1204,6 +1285,10 @@ impl VerifiedBlock { } match consensus_protocol { ConsensusProtocol::StarfishBls => { + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); let bls = self .header .bls() @@ -1311,10 +1396,12 @@ impl VerifiedBlock { self.header.bls().is_none(), "Only StarfishBls blocks may carry BLS fields" ); + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); } - ConsensusProtocol::Mysticeti - | ConsensusProtocol::CordialMiners - | ConsensusProtocol::MysticetiCompress => { + ConsensusProtocol::Mysticeti | ConsensusProtocol::CordialMiners => { ensure!( acknowledgments.is_empty(), "{consensus_protocol:?} blocks must not carry acknowledgments" @@ -1327,6 +1414,56 @@ impl VerifiedBlock { self.header.bls().is_none(), "Only StarfishBls blocks may carry BLS fields" ); + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); + } + ConsensusProtocol::Bluestreak => { + ensure!( + acknowledgments.is_empty(), + "Bluestreak blocks must not carry acknowledgments" + ); + ensure!( + self.header.bls().is_none(), + "Only StarfishBls blocks may carry BLS fields" + ); + let is_leader = self.authority() == committee.elect_leader(round); + if !is_leader { + ensure!( + self.header.block_references.len() <= 2, + "Bluestreak non-leader may reference at most own_prev + leader" + ); + let _self_ref = *self + .header + .block_references + .iter() + .find(|r| r.authority == self.authority()) + .ok_or_else(|| { + eyre::eyre!( + "Bluestreak non-leader block must reference its own previous block" + ) + })?; + if let Some(cert_ref) = self.header.unprovable_certificate() { + ensure!( + round >= 3 && cert_ref.round + 2 == round, + "unprovable_certificate must reference leader at round r-2" + ); + ensure!( + cert_ref.authority == committee.elect_leader(cert_ref.round), + "unprovable_certificate must reference the elected leader" + ); + } + } else { + ensure!( + self.header.unprovable_certificate().is_none(), + "Bluestreak leaders must not carry unprovable_certificate" + ); + ensure!( + threshold_clock_valid_block_header(&self.header, committee), + "Bluestreak leader must reference 2f+1 blocks from previous round" + ); + } } ConsensusProtocol::SailfishPlusPlus => { ensure!( @@ -1341,6 +1478,10 @@ impl VerifiedBlock { self.header.bls().is_none(), "Only StarfishBls blocks may carry BLS fields" ); + ensure!( + self.header.unprovable_certificate().is_none(), + "Only Bluestreak blocks may carry unprovable_certificate" + ); if round > 1 { let prev_round = round - 1; let prev_leader = committee.elect_leader(prev_round); @@ -2200,6 +2341,102 @@ mod tests { ); } + #[test] + fn verifies_bluestreak_non_leader_with_unprovable_certificate() { + let committee = Committee::new_for_benchmarks(4); + let signers = Signer::new_for_test(committee.len()); + let authority = committee + .authorities() + .find(|authority| *authority != committee.elect_leader(3)) + .unwrap(); + let round_2_leader = committee.elect_leader(2); + let round_1_leader = committee.elect_leader(1); + let mut block = VerifiedBlock::new_with_signer_and_unprovable( + authority, + 3, + vec![ + BlockReference::new_test(authority, 2), + BlockReference::new_test(round_2_leader, 2), + ], + None, + vec![], + 0, + &signers[authority as usize], + None, + None, + vec![], + vec![], + None, + ConsensusProtocol::Bluestreak, + None, + None, + None, + None, + None, + None, + Some(BlockReference::new_test(round_1_leader, 1)), + ); + + let mut encoder = Encoder::new(2, 4, 2).unwrap(); + block + .verify( + committee.as_ref(), + 0, + 1, + &mut encoder, + ConsensusProtocol::Bluestreak, + ) + .unwrap(); + } + + #[test] + fn rejects_bluestreak_leader_with_unprovable_certificate() { + let committee = Committee::new_for_benchmarks(4); + let signers = Signer::new_for_test(committee.len()); + let leader = committee.elect_leader(3); + let round_1_leader = committee.elect_leader(1); + let mut block = VerifiedBlock::new_with_signer_and_unprovable( + leader, + 3, + (0..committee.quorum_threshold() as AuthorityIndex) + .map(|authority| BlockReference::new_test(authority, 2)) + .collect(), + None, + vec![], + 0, + &signers[leader as usize], + None, + None, + vec![], + vec![], + None, + ConsensusProtocol::Bluestreak, + None, + None, + None, + None, + None, + None, + Some(BlockReference::new_test(round_1_leader, 1)), + ); + + let mut encoder = Encoder::new(2, 4, 2).unwrap(); + let err = block + .verify( + committee.as_ref(), + 0, + 1, + &mut encoder, + ConsensusProtocol::Bluestreak, + ) + .unwrap_err(); + + assert!( + err.to_string() + .contains("Bluestreak leaders must not carry unprovable_certificate") + ); + } + #[test] fn verifies_empty_mysticeti_block_without_transaction_data() { let committee = Committee::new_for_benchmarks(4); diff --git a/crates/starfish-core/src/validator.rs b/crates/starfish-core/src/validator.rs index 9e2d76d..90c24ef 100644 --- a/crates/starfish-core/src/validator.rs +++ b/crates/starfish-core/src/validator.rs @@ -310,11 +310,17 @@ mod smoke_tests { #[test_case("starfish-speed", 80)] #[test_case("starfish-bls", 100)] #[test_case("sailfish++", 120)] + #[test_case("bluestreak", 140)] #[tokio::test] async fn validator_commit(consensus: &str, port_offset: u16) { run_commit_test(consensus, port_offset).await; } + #[tokio::test] + async fn validator_commit_bluestreak_basic() { + run_commit_test("bluestreak", 150).await; + } + async fn run_sync_test(consensus: &str, port_offset: u16) { let committee_size = 4; let committee = Committee::new_for_benchmarks(committee_size); @@ -402,6 +408,7 @@ mod smoke_tests { #[test_case("starfish-speed", 180)] #[test_case("starfish-bls", 200)] #[test_case("sailfish++", 220)] + #[test_case("bluestreak", 260)] #[tokio::test] async fn validator_sync(consensus: &str, port_offset: u16) { run_sync_test(consensus, port_offset).await; @@ -465,6 +472,7 @@ mod smoke_tests { #[test_case("starfish-speed", 280)] #[test_case("starfish-bls", 300)] #[test_case("sailfish++", 320)] + #[test_case("bluestreak", 380)] #[tokio::test] async fn validator_crash_faults(consensus: &str, port_offset: u16) { run_crash_faults_test(consensus, port_offset).await; diff --git a/scripts/dryrun.sh b/scripts/dryrun.sh index aaf5115..287eb51 100755 --- a/scripts/dryrun.sh +++ b/scripts/dryrun.sh @@ -7,7 +7,7 @@ NUM_NODES=${NUM_NODES:-10} DESIRED_TPS=${DESIRED_TPS:-1000} # Options: starfish, starfish-speed, starfish-bls, -# cordial-miners, mysticeti +# cordial-miners, mysticeti, bluestreak CONSENSUS=${CONSENSUS:-starfish-speed} NUM_BYZANTINE_NODES=${NUM_BYZANTINE_NODES:-0} # Options: timeout-leader, leader-withholding, From 16164074743679e2e0400d2a4c0e3ff3315f88b8 Mon Sep 17 00:00:00 2001 From: Nikita Polianskii Date: Thu, 19 Mar 2026 20:41:52 +0100 Subject: [PATCH 9/9] fix(core): default Bluestreak dissemination to pull --- crates/starfish-core/src/dag_state.rs | 7 ++++--- scripts/dryrun.sh | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/starfish-core/src/dag_state.rs b/crates/starfish-core/src/dag_state.rs index 6e0db05..93fdfe2 100644 --- a/crates/starfish-core/src/dag_state.rs +++ b/crates/starfish-core/src/dag_state.rs @@ -145,11 +145,12 @@ impl ConsensusProtocol { pub fn default_dissemination_mode(self) -> DisseminationMode { match self { - ConsensusProtocol::Mysticeti | ConsensusProtocol::SailfishPlusPlus => { + ConsensusProtocol::Mysticeti + | ConsensusProtocol::SailfishPlusPlus + | ConsensusProtocol::Bluestreak => { DisseminationMode::Pull } - ConsensusProtocol::Bluestreak - | ConsensusProtocol::CordialMiners + ConsensusProtocol::CordialMiners | ConsensusProtocol::Starfish | ConsensusProtocol::StarfishSpeed | ConsensusProtocol::StarfishBls => DisseminationMode::PushCausal, diff --git a/scripts/dryrun.sh b/scripts/dryrun.sh index 287eb51..6c48cb4 100755 --- a/scripts/dryrun.sh +++ b/scripts/dryrun.sh @@ -8,7 +8,7 @@ NUM_NODES=${NUM_NODES:-10} DESIRED_TPS=${DESIRED_TPS:-1000} # Options: starfish, starfish-speed, starfish-bls, # cordial-miners, mysticeti, bluestreak -CONSENSUS=${CONSENSUS:-starfish-speed} +CONSENSUS=${CONSENSUS:-bluestreak} NUM_BYZANTINE_NODES=${NUM_BYZANTINE_NODES:-0} # Options: timeout-leader, leader-withholding, # equivocating-chains, equivocating-two-chains, @@ -24,7 +24,7 @@ STORAGE_BACKEND=${STORAGE_BACKEND:-rocksdb} TRANSACTION_MODE=${TRANSACTION_MODE:-random} # Dissemination mode: protocol-default (default) | pull | # push-causal | push-useful -DISSEMINATION_MODE=${DISSEMINATION_MODE:-push-causal} +DISSEMINATION_MODE=${DISSEMINATION_MODE:-protocol-default} # Enable lz4 network compression. # Auto-enabled for random transaction mode. # Set COMPRESS_NETWORK=1 or =0 to override.