Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VID] Vid Vote Logic in Consensus Task #2107

Merged
merged 23 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3a8000a
increase view_sync_timeout
dailinsubjam Nov 17, 2023
9c9dd71
Merge branch 'develop' into sishan/fix_vote_sending_timed_out
dailinsubjam Nov 17, 2023
2c065d5
Increase the duration to get the expected number of successful views.
dailinsubjam Nov 17, 2023
3dfaa5b
try more completion time
dailinsubjam Nov 17, 2023
e7447ff
Merge branch 'develop' into sishan/fix_vote_sending_timed_out
dailinsubjam Nov 21, 2023
0de380c
recheck VidDisperseSend, VidDisperseRecv, VidVoteSend, VidVoteRecv, V…
dailinsubjam Nov 22, 2023
a15256d
mark places need to be updated
dailinsubjam Nov 28, 2023
0f2af3c
Merge branch 'develop' into sishan/vid_vote_logic
dailinsubjam Nov 28, 2023
389d4e2
Merge branch 'develop' into sishan/vid_vote_logic
dailinsubjam Nov 28, 2023
97aba4f
deal with VidDisperseRecv in consensus task, add the condition that o…
dailinsubjam Nov 30, 2023
68ef74d
update test_vid_task
dailinsubjam Nov 30, 2023
61db45b
tune some CI parameters
dailinsubjam Nov 30, 2023
a19b439
remove useless vid-related structures(vidvote, vidcert, vidvotecollec…
dailinsubjam Nov 30, 2023
3984f32
lint
dailinsubjam Nov 30, 2023
8d0d242
merge & lint
dailinsubjam Nov 30, 2023
05b8b84
resolve a part of comments: log level, cancel poll after val, quorum …
dailinsubjam Nov 30, 2023
3ee8c79
log level & typo
dailinsubjam Nov 30, 2023
f64a468
remove redundant clone
dailinsubjam Nov 30, 2023
1b72a62
Add issue link for changing VidDisperse structure
dailinsubjam Nov 30, 2023
cfadefc
add comments for vote_if_able()
dailinsubjam Dec 1, 2023
86b3e45
merge develop again
dailinsubjam Dec 1, 2023
3ae73fa
add the issue 1732
dailinsubjam Dec 1, 2023
2e972dd
change testing parameters for view timeout
dailinsubjam Dec 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ pub async fn add_consensus_task<TYPES: NodeType, I: NodeImplementation<TYPES>>(
event_stream: event_stream.clone(),
output_event_stream: output_stream,
da_certs: HashMap::new(),
vid_certs: HashMap::new(),
vid_shares: HashMap::new(),
current_proposal: None,
id: handle.hotshot.inner.id,
public_key: c_api.public_key().clone(),
Expand Down
124 changes: 0 additions & 124 deletions crates/hotshot/src/traits/networking/web_server_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,6 @@ struct Inner<TYPES: NodeType> {
/// Task map for quorum votes.
vote_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for vid votes
vid_vote_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for VID certs
vid_cert_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
/// Task map for VID disperse data
vid_disperse_task_map:
Arc<RwLock<HashMap<u64, UnboundedSender<ConsensusIntentEvent<TYPES::SignatureKey>>>>>,
Expand Down Expand Up @@ -177,8 +171,6 @@ impl<TYPES: NodeType> Inner<TYPES> {
}
MessagePurpose::DAC => config::get_da_certificate_route(view_number),
MessagePurpose::VidDisperse => config::get_vid_disperse_route(view_number), // like `Proposal`
MessagePurpose::VidVote => config::get_vid_vote_route(view_number, vote_index), // like `Vote`
MessagePurpose::VidCert => config::get_vid_certificate_route(view_number), // like `DAC`
};

if message_purpose == MessagePurpose::Data {
Expand Down Expand Up @@ -251,14 +243,6 @@ impl<TYPES: NodeType> Inner<TYPES> {
direct_poll_queue.push(vote.clone());
}
}
MessagePurpose::VidVote => {
// TODO copy-pasted from `MessagePurpose::Vote` https://github.com/EspressoSystems/HotShot/issues/1690
let mut direct_poll_queue = self.direct_poll_queue.write().await;
for vote in &deserialized_messages {
vote_index += 1;
direct_poll_queue.push(vote.clone());
}
}
MessagePurpose::DAC => {
debug!(
"Received DAC from web server for view {} {}",
Expand All @@ -274,22 +258,6 @@ impl<TYPES: NodeType> Inner<TYPES> {
// In future we should check to make sure DAC is valid
return Ok(());
}
MessagePurpose::VidCert => {
// TODO copy-pasted from `MessagePurpose::DAC` https://github.com/EspressoSystems/HotShot/issues/1690
debug!(
"Received VID cert from web server for view {} {}",
view_number, self.is_da
);
// Only pushing the first proposal since we will soon only be allowing 1 proposal per view
self.broadcast_poll_queue
.write()
.await
.push(deserialized_messages[0].clone());

// return if we found a VID cert, since there will only be 1 per view
// In future we should check to make sure VID cert is valid
return Ok(());
}
MessagePurpose::VidDisperse => {
// TODO copy-pasted from `MessagePurpose::Proposal` https://github.com/EspressoSystems/HotShot/issues/1690

Expand Down Expand Up @@ -360,8 +328,6 @@ impl<TYPES: NodeType> Inner<TYPES> {
// TODO ED Should add extra error checking here to make sure we are intending to cancel a task
ConsensusIntentEvent::CancelPollForVotes(event_view)
| ConsensusIntentEvent::CancelPollForProposal(event_view)
| ConsensusIntentEvent::CancelPollForVIDVotes(event_view)
| ConsensusIntentEvent::CancelPollForVIDCertificate(event_view)
| ConsensusIntentEvent::CancelPollForDAC(event_view)
| ConsensusIntentEvent::CancelPollForVIDDisperse(event_view)
| ConsensusIntentEvent::CancelPollForViewSyncVotes(event_view) => {
Expand Down Expand Up @@ -535,8 +501,6 @@ impl<TYPES: NodeType + 'static> WebServerNetwork<TYPES> {
tx_index: Arc::default(),
proposal_task_map: Arc::default(),
vote_task_map: Arc::default(),
vid_vote_task_map: Arc::default(),
vid_cert_task_map: Arc::default(),
vid_disperse_task_map: Arc::default(),
dac_task_map: Arc::default(),
view_sync_cert_task_map: Arc::default(),
Expand Down Expand Up @@ -572,9 +536,7 @@ impl<TYPES: NodeType + 'static> WebServerNetwork<TYPES> {
}
MessagePurpose::ViewSyncVote => config::post_view_sync_vote_route(*view_number),
MessagePurpose::DAC => config::post_da_certificate_route(*view_number),
MessagePurpose::VidVote => config::post_vid_vote_route(*view_number),
MessagePurpose::VidDisperse => config::post_vid_disperse_route(*view_number),
MessagePurpose::VidCert => config::post_vid_certificate_route(*view_number),
};

let network_msg: SendMsg<Message<TYPES>> = SendMsg {
Expand Down Expand Up @@ -951,43 +913,6 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
.await;
}
}
ConsensusIntentEvent::PollForVIDVotes(view_number) => {
let mut task_map = self.inner.vid_vote_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);
async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::VidVote, view_number)
.await
{
warn!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
// TODO ED This won't work for vote collection, last task is more than 2 view ago depending on size of network, will need to rely on cancel task from consensus
if let Some((_, sender)) = task_map.remove_entry(&(view_number.wrapping_sub(2))) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDVotes(
view_number.wrapping_sub(2),
))
.await;
}
}

ConsensusIntentEvent::PollForDAC(view_number) => {
let mut task_map = self.inner.dac_task_map.write().await;
Expand Down Expand Up @@ -1026,42 +951,6 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
}
}

ConsensusIntentEvent::PollForVIDCertificate(view_number) => {
let mut task_map = self.inner.vid_cert_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
// create new task
let (sender, receiver) = unbounded();
e.insert(sender);
async_spawn({
let inner_clone = self.inner.clone();
async move {
if let Err(e) = inner_clone
.poll_web_server(receiver, MessagePurpose::VidCert, view_number)
.await
{
warn!(
"Background receive proposal polling encountered an error: {:?}",
e
);
}
}
});
} else {
error!("Somehow task already existed!");
}

// GC proposal collection if we are two views in the future
if let Some((_, sender)) = task_map.remove_entry(&(view_number.wrapping_sub(2))) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDCertificate(
view_number.wrapping_sub(2),
))
.await;
}
}
ConsensusIntentEvent::CancelPollForVotes(view_number) => {
let mut task_map = self.inner.vote_task_map.write().await;

Expand All @@ -1075,19 +964,6 @@ impl<TYPES: NodeType + 'static> ConnectedNetwork<Message<TYPES>, TYPES::Signatur
}
}

ConsensusIntentEvent::CancelPollForVIDVotes(view_number) => {
let mut task_map = self.inner.vid_vote_task_map.write().await;

if let Some((_, sender)) = task_map.remove_entry(&(view_number)) {
// Send task cancel message to old task

// If task already exited we expect an error
let _res = sender
.send(ConsensusIntentEvent::CancelPollForVIDVotes(view_number))
.await;
}
}

ConsensusIntentEvent::PollForViewSyncCertificate(view_number) => {
let mut task_map = self.inner.view_sync_cert_task_map.write().await;
if let Entry::Vacant(e) = task_map.entry(view_number) {
Expand Down
92 changes: 76 additions & 16 deletions crates/task-impls/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use hotshot_task::{
};
use hotshot_types::{
consensus::{Consensus, View},
data::{Leaf, QuorumProposal, VidCommitment},
data::{Leaf, QuorumProposal, VidCommitment, VidDisperse},
event::{Event, EventType},
message::{GeneralConsensusMessage, Proposal},
simple_certificate::{DACertificate, QuorumCertificate, TimeoutCertificate, VIDCertificate},
simple_certificate::{DACertificate, QuorumCertificate, TimeoutCertificate},
simple_vote::{QuorumData, QuorumVote, TimeoutData, TimeoutVote},
traits::{
block_contents::BlockHeader,
Expand Down Expand Up @@ -116,8 +116,12 @@ pub struct ConsensusTaskState<
/// All the DA certs we've received for current and future views.
pub da_certs: HashMap<TYPES::Time, DACertificate<TYPES>>,

/// All the VID certs we've received for current and future views.
pub vid_certs: HashMap<TYPES::Time, VIDCertificate<TYPES>>,
/// All the VID shares we've received for current and future views.
/// In the future we will need a different struct similar to VidDisperse except
/// it stores only one share.
/// TODO https://github.com/EspressoSystems/HotShot/issues/2146
/// TODO https://github.com/EspressoSystems/HotShot/issues/1732
pub vid_shares: HashMap<TYPES::Time, Proposal<TYPES, VidDisperse<TYPES>>>,

/// The most recent proposal we have, will correspond to the current view if Some()
/// Will be none if the view advanced through timeout/view_sync
Expand Down Expand Up @@ -155,7 +159,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
}

#[instrument(skip_all, fields(id = self.id, view = *self.cur_view), name = "Consensus vote if able", level = "error")]

// Check if we are able to vote, like whether the proposal is valid,
// whether we have DAC and VID share, and if so, vote.
async fn vote_if_able(&self) -> bool {
if !self.quorum_membership.has_stake(&self.public_key) {
debug!(
Expand All @@ -166,8 +171,9 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
}
if let Some(proposal) = &self.current_proposal {
// ED Need to account for the genesis DA cert
// No need to check vid share nor da cert for genesis
if proposal.justify_qc.is_genesis && proposal.view_number == TYPES::Time::new(1) {
// warn!("Proposal is genesis!");
info!("Proposal is genesis!");

let view = TYPES::Time::new(*proposal.view_number);
let justify_qc = proposal.justify_qc.clone();
Expand Down Expand Up @@ -225,6 +231,16 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
}
}

// Only vote if you has seen the VID share for this view
if let Some(_vid_share) = self.vid_shares.get(&proposal.view_number) {
} else {
debug!(
"We have not seen the VID share for this view {:?} yet, so we cannot vote.",
proposal.view_number
);
return false;
}

// Only vote if you have the DA cert
// ED Need to update the view number this is stored under?
if let Some(cert) = self.da_certs.get(&(proposal.get_view_number())) {
Expand Down Expand Up @@ -302,13 +318,13 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
return true;
}
}
info!(
debug!(
"Couldn't find DAC cert in certs, meaning we haven't received it yet for view {:?}",
*proposal.get_view_number(),
);
return false;
}
info!(
debug!(
"Could not vote because we don't have a proposal yet for view {}",
*self.cur_view
);
Expand Down Expand Up @@ -417,7 +433,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +

let view_leader_key = self.quorum_membership.get_leader(view);
if view_leader_key != sender {
error!("Leader key does not match key in proposal");
warn!("Leader key does not match key in proposal");
return;
}

Expand Down Expand Up @@ -872,20 +888,63 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, A: ConsensusApi<TYPES, I> +
self.current_proposal = None;
}
}
HotShotEvent::VidCertRecv(cert) => {
debug!("VID cert received for view ! {}", *cert.view_number);
HotShotEvent::VidDisperseRecv(disperse, sender) => {
let view = disperse.data.get_view_number();

let view = cert.get_view_number();
self.vid_certs.insert(view, cert);
debug!(
"VID disperse received for view: {:?} in consensus task",
view
);

// RM TODO: VOTING
}
// Allow VID disperse date that is one view older, in case we have updated the
// view.
// Adding `+ 1` on the LHS rather than `- 1` on the RHS, to avoid the overflow
// error due to subtracting the genesis view number.
if view + 1 < self.cur_view {
warn!("Throwing away VID disperse data that is more than one view older");
return;
}

info!("VID disperse data is not more than one view older.");
let payload_commitment = disperse.data.payload_commitment;

// Check whether the sender is the right leader for this view
let view_leader_key = self.quorum_membership.get_leader(view);
if view_leader_key != sender {
warn!(
"VID dispersal/share is not from expected leader key for view {} \n",
*view
);
return;
}

if !view_leader_key.validate(&disperse.signature, payload_commitment.as_ref()) {
warn!("Could not verify VID dispersal/share sig.");
return;
}

// stop polling for the received disperse after verifying it's valid
self.quorum_network
.inject_consensus_info(ConsensusIntentEvent::CancelPollForVIDDisperse(
*disperse.data.view_number,
))
.await;

// Add to the storage that we have received the VID disperse for a specific view
self.vid_shares.insert(view, disperse);
}
HotShotEvent::ViewChange(new_view) => {
debug!("View Change event for view {}", *new_view);
debug!("View Change event for view {} in consensus task", *new_view);

let old_view_number = self.cur_view;

// Start polling for VID disperse for the new view
self.quorum_network
.inject_consensus_info(ConsensusIntentEvent::PollForVIDDisperse(
*old_view_number + 1,
))
.await;

// update the view in state to the one in the message
// Publish a view change event to the application
if !self.update_view(new_view).await {
Expand Down Expand Up @@ -1103,6 +1162,7 @@ pub fn consensus_event_filter<TYPES: NodeType>(event: &HotShotEvent<TYPES>) -> b
| HotShotEvent::SendPayloadCommitmentAndMetadata(_, _)
| HotShotEvent::Timeout(_)
| HotShotEvent::TimeoutVoteRecv(_)
| HotShotEvent::VidDisperseRecv(_, _)
| HotShotEvent::Shutdown,
)
}
Loading