Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ This milestone focuses on choosing the head of the chain based on gossiped attes

This milestone focuses on performing the duties of a validator.

- Produce and broadcast attestations on each slot 🏗️
- Compute current proposer for each slot
- Build and broadcast new blocks when proposing
- Produce and broadcast attestations on each slot
- Compute current proposer for each slot
- Build and broadcast new blocks when proposing
109 changes: 101 additions & 8 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use std::time::{Duration, SystemTime};

use ethlambda_state_transition::is_proposer;
use ethlambda_types::{
attestation::SignedAttestation, block::SignedBlockWithAttestation, primitives::TreeHash,
signature::ValidatorSecretKey, state::State,
attestation::{Attestation, AttestationData, SignedAttestation},
block::{BlockSignatures, BlockWithAttestation, SignedBlockWithAttestation},
primitives::TreeHash,
signature::ValidatorSecretKey,
state::{Checkpoint, State},
};
use spawned_concurrency::tasks::{
CallResponse, CastResponse, GenServer, GenServerHandle, send_after,
Expand All @@ -22,6 +25,8 @@ pub mod store;
pub enum OutboundGossip {
/// Publish an attestation to the gossip network.
PublishAttestation(SignedAttestation),
/// Publish a block to the gossip network.
PublishBlock(SignedBlockWithAttestation),
}

pub struct BlockChain {
Expand Down Expand Up @@ -94,20 +99,42 @@ impl BlockChainServer {
// Update current slot metric
metrics::update_current_slot(slot);

// Produce attestations at interval 1
// At interval 0, check if we will propose (but don't build the block yet).
// Tick forkchoice first to accept attestations, then build the block
// using the freshly-accepted attestations.
let proposer_validator_id = (interval == 0)
.then(|| self.get_our_proposer(slot))
.flatten();

// Tick the store first - this accepts attestations at interval 0 if we have a proposal
self.store
.on_tick(timestamp, proposer_validator_id.is_some());

// Now build and publish the block (after attestations have been accepted)
if let Some(validator_id) = proposer_validator_id {
self.propose_block(slot, validator_id);
}

// Produce attestations at interval 1 (proposer already attested in block)
if interval == 1 {
self.produce_attestations(slot);
}

// TODO: check if we are proposing
let has_proposal = false;

self.store.on_tick(timestamp, has_proposal);

// Update safe target slot metric (updated by store.on_tick at interval 2)
metrics::update_safe_target_slot(self.store.safe_target_slot());
}

/// Returns the validator ID if any of our validators is the proposer for this slot.
fn get_our_proposer(&self, slot: u64) -> Option<u64> {
let head_state = self.store.head_state();
let num_validators = head_state.validators.len() as u64;

self.key_manager
.validator_ids()
.into_iter()
.find(|&vid| is_proposer(vid, slot, num_validators))
}

fn produce_attestations(&mut self, slot: u64) {
// Get the head state to determine number of validators
let head_state = self.store.head_state();
Expand Down Expand Up @@ -163,6 +190,72 @@ impl BlockChainServer {
}
}

/// Build and publish a block for the given slot and validator.
fn propose_block(&mut self, slot: u64, validator_id: u64) {
info!(%slot, %validator_id, "We are the proposer for this slot");

// Build the block with attestation signatures
let Ok((block, attestation_signatures)) = self
.store
.produce_block_with_signatures(slot, validator_id)
.inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to build block"))
else {
return;
};

// Create proposer's attestation (attests to the new block)
let proposer_attestation = Attestation {
validator_id,
data: AttestationData {
slot,
head: Checkpoint {
root: block.tree_hash_root(),
slot: block.slot,
},
target: self.store.get_attestation_target(),
source: *self.store.latest_justified(),
},
};

// Sign the proposer's attestation
let message_hash = proposer_attestation.data.tree_hash_root();
let epoch = slot as u32;
let Ok(proposer_signature) = self
.key_manager
.sign_attestation(validator_id, epoch, &message_hash)
.inspect_err(
|err| error!(%slot, %validator_id, %err, "Failed to sign proposer attestation"),
)
else {
return;
};

// Assemble SignedBlockWithAttestation
let signed_block = SignedBlockWithAttestation {
message: BlockWithAttestation {
block,
proposer_attestation,
},
signature: BlockSignatures {
proposer_signature,
attestation_signatures: attestation_signatures
.try_into()
.expect("attestation signatures within limit"),
},
};

// Publish to gossip network
let Ok(()) = self
.p2p_tx
.send(OutboundGossip::PublishBlock(signed_block))
.inspect_err(|err| error!(%slot, %validator_id, %err, "Failed to publish block"))
else {
return;
};

info!(%slot, %validator_id, "Published block");
}

fn on_block(&mut self, signed_block: SignedBlockWithAttestation) {
let slot = signed_block.message.block.slot;
if let Err(err) = self.store.on_block(signed_block) {
Expand Down
29 changes: 26 additions & 3 deletions crates/net/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,17 @@ pub async fn start_p2p(
swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap();
}

// Create topic for outbound attestations
// Create topics for outbound messages
let attestation_topic = libp2p::gossipsub::IdentTopic::new(format!(
"/leanconsensus/{network}/{ATTESTATION_TOPIC_KIND}/ssz_snappy"
));
let block_topic = libp2p::gossipsub::IdentTopic::new(format!(
"/leanconsensus/{network}/{BLOCK_TOPIC_KIND}/ssz_snappy"
));

info!("P2P node started on {listening_socket}");

event_loop(swarm, blockchain, p2p_rx, attestation_topic).await;
event_loop(swarm, blockchain, p2p_rx, attestation_topic, block_topic).await;
}

/// [libp2p Behaviour](libp2p::swarm::NetworkBehaviour) combining Gossipsub and Request-Response Behaviours
Expand All @@ -139,6 +142,7 @@ async fn event_loop(
mut blockchain: BlockChain,
mut p2p_rx: mpsc::UnboundedReceiver<OutboundGossip>,
attestation_topic: libp2p::gossipsub::IdentTopic,
block_topic: libp2p::gossipsub::IdentTopic,
) {
loop {
tokio::select! {
Expand All @@ -148,7 +152,7 @@ async fn event_loop(
let Some(message) = message else {
break;
};
handle_outgoing_gossip(&mut swarm, message, &attestation_topic).await;
handle_outgoing_gossip(&mut swarm, message, &attestation_topic, &block_topic).await;
}
event = swarm.next() => {
let Some(event) = event else {
Expand Down Expand Up @@ -178,6 +182,7 @@ async fn handle_outgoing_gossip(
swarm: &mut libp2p::Swarm<Behaviour>,
message: OutboundGossip,
attestation_topic: &libp2p::gossipsub::IdentTopic,
block_topic: &libp2p::gossipsub::IdentTopic,
) {
match message {
OutboundGossip::PublishAttestation(attestation) => {
Expand All @@ -198,6 +203,24 @@ async fn handle_outgoing_gossip(
.inspect(|_| trace!(%slot, %validator, "Published attestation to gossipsub"))
.inspect_err(|err| tracing::warn!(%slot, %validator, %err, "Failed to publish attestation to gossipsub"));
}
OutboundGossip::PublishBlock(signed_block) => {
let slot = signed_block.message.block.slot;
let proposer = signed_block.message.block.proposer_index;

// Encode to SSZ
let ssz_bytes = signed_block.as_ssz_bytes();

// Compress with raw snappy
let compressed = gossipsub::compress_message(&ssz_bytes);

// Publish to gossipsub
let _ = swarm
.behaviour_mut()
.gossipsub
.publish(block_topic.clone(), compressed)
.inspect(|_| info!(%slot, %proposer, "Published block to gossipsub"))
.inspect_err(|err| tracing::warn!(%slot, %proposer, %err, "Failed to publish block to gossipsub"));
}
}
}

Expand Down