Skip to content
Closed
3 changes: 2 additions & 1 deletion crates/orchestrator/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ pub struct BenchmarkParametersGeneric<N, C> {
/// paying for data sent between the nodes.
pub use_internal_ip_address: bool,
// Consensus protocol to deploy
// (starfish | starfish-speed | starfish-bls | mysticeti | cordial-miners)
// (starfish | starfish-speed | starfish-bls | mysticeti |
// cordial-miners | bluestreak)
pub consensus_protocol: String,
/// number Byzantine nodes
pub byzantine_nodes: usize,
Expand Down
2 changes: 1 addition & 1 deletion crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub enum Operation {

/// Consensus to deploy. Available options:
/// starfish | starfish-speed | starfish-bls | mysticeti |
/// cordial-miners
/// cordial-miners | bluestreak
#[clap(long, value_name = "STRING", default_value = "starfish", global = true)]
consensus: String,

Expand Down
9 changes: 6 additions & 3 deletions crates/starfish-core/src/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ impl BroadcasterParameters {
ConsensusProtocol::Starfish
| ConsensusProtocol::StarfishSpeed
| ConsensusProtocol::StarfishBls
| ConsensusProtocol::CordialMiners => Self {
| ConsensusProtocol::CordialMiners
| ConsensusProtocol::Bluestreak => Self {
batch_own_block_size: committee_size,
batch_other_block_size: committee_size * committee_size,
batch_shard_size: committee_size * committee_size,
Expand Down Expand Up @@ -417,7 +418,8 @@ where
}
ConsensusProtocol::Mysticeti
| ConsensusProtocol::CordialMiners
| ConsensusProtocol::SailfishPlusPlus => {
| ConsensusProtocol::SailfishPlusPlus
| ConsensusProtocol::Bluestreak => {
let all_blocks = self.inner.dag_state.get_storage_blocks(&block_references);

let mut blocks = Vec::new();
Expand Down Expand Up @@ -821,7 +823,8 @@ fn push_transport_format(consensus_protocol: ConsensusProtocol) -> PushOtherBloc
| ConsensusProtocol::StarfishBls => PushOtherBlocksFormat::HeadersAndShards,
ConsensusProtocol::CordialMiners
| ConsensusProtocol::Mysticeti
| ConsensusProtocol::SailfishPlusPlus => PushOtherBlocksFormat::FullBlocks,
| ConsensusProtocol::SailfishPlusPlus
| ConsensusProtocol::Bluestreak => PushOtherBlocksFormat::FullBlocks,
}
}

Expand Down
11 changes: 10 additions & 1 deletion crates/starfish-core/src/consensus/linearizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl Linearizer {
let mut to_commit = Vec::new();
let leader_block_ref = *leader_block.reference();
let min_round = leader_block_ref.round.saturating_sub(MAX_TRAVERSAL_DEPTH);
let follow_unprovable = dag_state.consensus_protocol == ConsensusProtocol::Bluestreak;

assert!(self.committed.insert(leader_block_ref));
let mut current_level = vec![leader_block];
Expand All @@ -115,6 +116,13 @@ impl Linearizer {
next_refs.push(*reference);
}
}
if follow_unprovable {
if let Some(cert_ref) = x.unprovable_certificate() {
if cert_ref.round >= min_round && self.committed.insert(*cert_ref) {
next_refs.push(*cert_ref);
}
}
}
}
if next_refs.is_empty() {
break;
Expand Down Expand Up @@ -259,7 +267,8 @@ impl Linearizer {
}
ConsensusProtocol::Mysticeti
| ConsensusProtocol::CordialMiners
| ConsensusProtocol::SailfishPlusPlus => {
| ConsensusProtocol::SailfishPlusPlus
| ConsensusProtocol::Bluestreak => {
self.collect_subdag_ancestors(dag_state, leader_block)
}
};
Expand Down
232 changes: 232 additions & 0 deletions crates/starfish-core/src/consensus/universal_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ impl UniversalCommitter {
if self.dag_state.consensus_protocol == ConsensusProtocol::SailfishPlusPlus {
return self.try_commit_sailfish(last_decided);
}
if self.dag_state.consensus_protocol == ConsensusProtocol::Bluestreak {
return self.try_commit_bluestreak(last_decided);
}

let highest_known_round = self.dag_state.highest_round();
let last_decided_round = last_decided.round();
Expand Down Expand Up @@ -180,6 +183,133 @@ impl UniversalCommitter {
.collect()
}

fn try_commit_bluestreak(&mut self, last_decided: BlockReference) -> Vec<LeaderStatus> {
let highest_known_round = self.dag_state.highest_round();
let last_decided_round = last_decided.round();
let highest_anchor = highest_known_round.saturating_sub(2);
let mut committed = Vec::new();
let mut newly_committed = AHashSet::new();

for round in last_decided_round + 1..=highest_anchor {
let leader = self.committee.elect_leader(round);

if self.check_direct_skip_bluestreak(leader, round) {
let key = (leader, round);
if !self.decided.contains_key(&key) {
let status = LeaderStatus::Skip(leader, round);
self.decided.insert(key, status.clone());
if self.metrics_emitted.insert(key) {
tracing::debug!("Decided {status}");
self.update_metrics(&status, true);
}
committed.push(status);
}
continue;
}

let Some(anchor) = self.try_direct_commit_bluestreak(leader, round) else {
continue;
};

let mut chain = vec![anchor.clone()];
let mut current = anchor;
for prev_round in (last_decided_round + 1..current.round()).rev() {
let prev_leader = self.committee.elect_leader(prev_round);
let prev_key = (prev_leader, prev_round);
if newly_committed.contains(&prev_key) {
continue;
}

let mut linked_leaders: Vec<_> = self
.dag_state
.get_blocks_at_authority_round(prev_leader, prev_round)
.into_iter()
.filter(|block| self.dag_state.has_vertex_certificate(block.reference()))
.filter(|block| self.dag_state.linked(&current, block))
.collect();

if linked_leaders.len() > 1 {
panic!(
"[Bluestreak] More than one linked leader for {}",
format_authority_round(prev_leader, prev_round)
);
}

if let Some(prev) = linked_leaders.pop() {
current = prev.clone();
chain.push(prev);
}
}

chain.reverse();
for leader_block in chain {
let key = (leader_block.authority(), leader_block.round());
if !newly_committed.insert(key) {
continue;
}
let direct_decide = key.1 == round;
let status = LeaderStatus::Commit(leader_block, None);
self.decided.insert(key, status.clone());
if self.metrics_emitted.insert(key) {
tracing::debug!("Decided {status}");
self.update_metrics(&status, direct_decide);
}
committed.push(status);
}
}

committed.sort();
committed
}

fn try_direct_commit_bluestreak(
&self,
leader: AuthorityIndex,
leader_round: RoundNumber,
) -> Option<crate::data::Data<crate::types::VerifiedBlock>> {
let cert_round = leader_round + 2;
let leader_blocks = self
.dag_state
.get_blocks_at_authority_round(leader, leader_round);

for leader_block in leader_blocks {
if !self
.dag_state
.has_vertex_certificate(leader_block.reference())
{
continue;
}
let leader_ref = *leader_block.reference();
let cert_blocks = self.dag_state.get_blocks_by_round_cached(cert_round);
let mut supporters = StakeAggregator::<QuorumThreshold>::new();
for block in cert_blocks.iter() {
if block.unprovable_certificate() == Some(&leader_ref)
&& supporters.add(block.authority(), &self.committee)
{
return Some(leader_block);
}
}
}

None
}

fn check_direct_skip_bluestreak(&self, leader: AuthorityIndex, round: RoundNumber) -> bool {
let vote_round = round + 1;
let vote_blocks = self.dag_state.get_blocks_by_round_cached(vote_round);
let mut non_voters = StakeAggregator::<QuorumThreshold>::new();
for block in vote_blocks.iter() {
let votes_for = block
.block_references()
.iter()
.any(|r| r.round == round && r.authority == leader);
if !votes_for && non_voters.add(block.authority(), &self.committee) {
return true;
}
}
false
}

fn try_commit_sailfish(&mut self, last_decided: BlockReference) -> Vec<LeaderStatus> {
let highest_known_round = self.dag_state.highest_round();
let last_decided_round = last_decided.round();
Expand Down Expand Up @@ -432,6 +562,13 @@ impl UniversalCommitterBuilder {
wave_length: WAVE_LENGTH,
pipeline: false,
},
ConsensusProtocol::Bluestreak => Self {
committee,
dag_state,
metrics,
wave_length: WAVE_LENGTH,
pipeline: true,
},
ConsensusProtocol::CordialMiners => Self {
committee,
dag_state,
Expand Down Expand Up @@ -566,4 +703,99 @@ mod tests {
"expected round-1 leader to commit with f+1 certified supporters in round 2"
);
}

#[test]
fn bluestreak_direct_commit_uses_unprovable_certificates() {
let dag_state = open_test_dag_state_for("bluestreak", 0);
let committee = Committee::new_for_benchmarks(4);
let registry = Registry::new();
let (metrics, _reporter) = Metrics::new(
&registry,
Some(committee.as_ref()),
Some("bluestreak"),
None,
);

let leader = make_full_block(1, 1, vec![BlockReference::new_test(1, 0)]);
let leader_ref = *leader.reference();
let vote_a = make_full_block(0, 2, vec![leader_ref]);
let vote_b = make_full_block(2, 2, vec![leader_ref]);
let vote_c = make_full_block(3, 2, vec![leader_ref]);

let empty_transactions = Vec::new();
let commitment = TransactionsCommitment::new_from_transactions(&empty_transactions);

let mut cert_a = crate::types::VerifiedBlock::new_with_unprovable(
0,
3,
vec![*vote_a.reference()],
Vec::new(),
0,
SignatureBytes::default(),
empty_transactions.clone(),
commitment,
None,
None,
None,
Some(leader_ref),
);
cert_a.preserialize();
let cert_a = Data::new(cert_a);

let mut cert_b = crate::types::VerifiedBlock::new_with_unprovable(
2,
3,
vec![*vote_b.reference()],
Vec::new(),
0,
SignatureBytes::default(),
empty_transactions.clone(),
commitment,
None,
None,
None,
Some(leader_ref),
);
cert_b.preserialize();
let cert_b = Data::new(cert_b);

let mut cert_c = crate::types::VerifiedBlock::new_with_unprovable(
3,
3,
vec![*vote_c.reference()],
Vec::new(),
0,
SignatureBytes::default(),
empty_transactions,
commitment,
None,
None,
None,
Some(leader_ref),
);
cert_c.preserialize();
let cert_c = Data::new(cert_c);

dag_state.insert_general_block(leader, DataSource::BlockBundleStreaming);
dag_state.insert_general_block(vote_a, DataSource::BlockBundleStreaming);
dag_state.insert_general_block(vote_b, DataSource::BlockBundleStreaming);
dag_state.insert_general_block(vote_c, DataSource::BlockBundleStreaming);
dag_state.insert_general_block(cert_a, DataSource::BlockBundleStreaming);
dag_state.insert_general_block(cert_b, DataSource::BlockBundleStreaming);
dag_state.insert_general_block(cert_c, DataSource::BlockBundleStreaming);

let mut committer = UniversalCommitterBuilder::new(committee, dag_state, metrics).build();
let decided = committer.try_commit(BlockReference::new_test(0, 0));

assert!(
decided.iter().any(|status| {
matches!(
status,
LeaderStatus::Commit(block, None)
if block.authority() == 1 && block.round() == 1
)
}),
"expected round-1 leader to commit from round-3 unprovable certificates"
);
}
}
Loading
Loading