Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions crates/ethcore/src/engines/hbbft/hbbft_early_epoch_end_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
};
use std::{
collections::BTreeMap,
time::{Duration, Instant},
time::{Duration, Instant, UNIX_EPOCH},
};

use super::{
Expand Down Expand Up @@ -261,8 +261,8 @@ impl HbbftEarlyEpochEndManager {
}
}

/// decides on the memorium data if we should update to contract data.
/// end executes them.
/// decides on the memorium data if we should update to contract data,
/// and sends out transactions to do so.
pub fn decide(
&mut self,
memorium: &HbbftMessageMemorium,
Expand All @@ -286,25 +286,51 @@ impl HbbftEarlyEpochEndManager {
debug!(target: "engine", "early-epoch-end: detected attempt to break because of is_major_syncing() instead of is_synincg()no decision: syncing");
}

let block_num = if let Some(block) = full_client.block(BlockId::Latest) {
block.number()
let (block_num, block_time) = if let Some(block) = full_client.block(BlockId::Latest) {
(block.number(), block.timestamp())
} else {
error!(target:"engine", "early-epoch-end: could not retrieve latest block.");
return;
};

let treshold: u64 = 2;
// start of implementation for:
// https://github.com/DMDcoin/diamond-node/issues/243
// connectivity reports should not trigger if there is no block production
let now = UNIX_EPOCH.elapsed().expect("Time not available").as_secs();
// this should hold true.
if now >= block_time {
let elapsed_since_last_block = now - block_time;
// todo: this is max blocktime (heartbeat) x 2, better read the maximum blocktime.
// on phoenix protocol triggers, this would also skip the sending of disconnectivity reports.
if elapsed_since_last_block > 10 * 60 {
info!(target:"engine", "skipping early-epoch-end: now {now} ; block_time {block_time}: Block WAS created in the future ?!?! :-x. not sending early epoch end reports.");
return;
}
} else {
// if the newest block is from the future, something very problematic happened.
// the system clock could be wrong.
// or the blockchain really produces blocks from the future.
// we are just not sending reports in this case.

error!(target:"engine", "early-epoch-end: now {now} ; block_time {block_time}: Block WAS created in the future ?!?! :-x. not sending early epoch end reports.");
return;
}
// end of implementation for:
// https://github.com/DMDcoin/diamond-node/issues/243

let threshold: u64 = 2;

// todo: read this out from contracts: ConnectivityTrackerHbbft -> reportDisallowPeriod
// requires us to update the Contracts ABIs:
// https://github.com/DMDcoin/diamond-node/issues/115
let treshold_time = Duration::from_secs(12 * 60); // 12 Minutes = 1 times the heartbeat + 2 minutes as grace period.
let threshold_time = Duration::from_secs(12 * 60); // 12 Minutes = 1 times the heartbeat + 2 minutes as grace period.

if self.start_time.elapsed() < treshold_time {
debug!(target: "engine", "early-epoch-end: no decision: Treshold time not reached.");
if self.start_time.elapsed() < threshold_time {
debug!(target: "engine", "early-epoch-end: no decision: Threshold time not reached.");
return;
}

if block_num < self.start_block + treshold {
if block_num < self.start_block + threshold {
// not enought blocks have passed this epoch,
// to judge other nodes.
debug!(target: "engine", "early-epoch-end: no decision: not enough blocks.");
Expand All @@ -328,7 +354,7 @@ impl HbbftEarlyEpochEndManager {
if let Some(node_history) = epoch_history.get_history_for_node(validator) {
let last_message_time = node_history.get_last_good_message_time();
let last_message_time_lateness = last_message_time.elapsed();
if last_message_time_lateness > treshold_time {
if last_message_time_lateness > threshold_time {
// we do not have to send notification, if we already did so.
if !self.is_reported(client, validator_address) {
// this function will also add the validator to the list of flagged validators.
Expand Down
216 changes: 133 additions & 83 deletions crates/ethcore/src/engines/hbbft/hbbft_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ use super::{
sealing::{self, RlpSig, Sealing},
};
use crate::engines::hbbft::hbbft_message_memorium::HbbftMessageDispatcher;
use std::{ops::Deref, sync::atomic::Ordering};
use std::sync::atomic::Ordering;

// Internal representation for storing deferred outgoing consensus messages.
struct StoredOutgoingMessage {
Expand Down Expand Up @@ -194,6 +194,8 @@ const ENGINE_VALIDATOR_CANDIDATE_ACTIONS: TimerToken = 4;
// Check for current Phoenix Protocol phase
const ENGINE_PHOENIX_CHECK: TimerToken = 5;

const HBBFT_CONNECTIVITY_TOKEN: TimerToken = 6;

impl TransitionHandler {
fn handle_shutdown_on_missing_block_import(
&self,
Expand Down Expand Up @@ -383,6 +385,12 @@ impl IoHandler<()> for TransitionHandler {
.unwrap_or_else(
|e| warn!(target: "consensus", "ENGINE_PHOENIX_CHECK Timer failed: {}.", e),
);

// early epoch end connecitity token should be the same length then the max blocktime.
io.register_timer(HBBFT_CONNECTIVITY_TOKEN, Duration::from_secs(300))
.unwrap_or_else(
|e| warn!(target: "consensus", "ENGINE_PHOENIX_CHECK Timer failed: {}.", e),
);
}

fn timeout(&self, io: &IoContext<()>, timer: TimerToken) {
Expand Down Expand Up @@ -472,6 +480,10 @@ impl IoHandler<()> for TransitionHandler {
}
} else if timer == ENGINE_PHOENIX_CHECK {
self.engine.handle_phoenix_recovery_protocol();
} else if timer == HBBFT_CONNECTIVITY_TOKEN {
if let Err(err) = self.engine.do_validator_engine_early_epoch_end_actions() {
error!(target: "consensus", "do_validator_engine_early_epoch_end_actions failed: {:?}", err);
}
}
}
}
Expand Down Expand Up @@ -884,7 +896,7 @@ impl HoneyBadgerBFT {
self.hbbft_message_dispatcher.report_seal_bad(
&sender_id,
block_num,
BadSealReason::ErrorTresholdSignStep,
BadSealReason::ErrorThresholdSignStep,
);
}
}
Expand Down Expand Up @@ -1224,102 +1236,135 @@ impl HoneyBadgerBFT {
return Ok(());
}

// If we have no signer there is nothing for us to send.
let mining_address = match self.signer.read().as_ref() {
Some(signer) => signer.address(),
None => {
// we do not have a signer on Full and RPC nodes.
// here is a possible performance improvement:
// this won't change during the lifetime of the application ?!
return Ok(());
}
};
self.hbbft_peers_service
.channel()
.send(HbbftConnectToPeersMessage::AnnounceAvailability)?;

let engine_client = client_arc.as_ref();
if let Err(err) = self
.hbbft_engine_cache
.lock()
.refresh_cache(mining_address, engine_client)
{
trace!(target: "engine", "do_validator_engine_actions: data could not get updated, follow up tasks might fail: {:?}", err);
}
self.hbbft_peers_service
.send_message(HbbftConnectToPeersMessage::AnnounceOwnInternetAddress)?;

let engine_client = client_arc.deref();
if self.should_connect_to_validator_set() {
// we just keep those variables here, because we need them in the early_epoch_end_manager.
// this is just an optimization, so we do not acquire the lock for that much time.
let mut validator_set: Vec<NodeId> = Vec::new();

let block_chain_client = match engine_client.as_full_client() {
Some(block_chain_client) => block_chain_client,
None => {
return Err("Unable to retrieve client.as_full_client()".into());
}
};
{
let hbbft_state_option =
self.hbbft_state.try_read_for(Duration::from_millis(250));
match hbbft_state_option {
Some(hbbft_state) => {
//hbbft_state.is_validator();

let should_connect_to_validator_set = self.should_connect_to_validator_set();
let mut should_handle_early_epoch_end = false;

// we just keep those variables here, because we need them in the early_epoch_end_manager.
// this is just an optimization, so we do not acquire the lock for that much time.
let mut validator_set: Vec<NodeId> = Vec::new();
let mut epoch_start_block: u64 = 0;
let mut epoch_num: u64 = 0;

{
let hbbft_state_option =
self.hbbft_state.try_read_for(Duration::from_millis(250));
match hbbft_state_option {
Some(hbbft_state) => {
should_handle_early_epoch_end = hbbft_state.is_validator();

// if we are a pending validator, we will also do the reserved peers management.
if should_handle_early_epoch_end {
// we already remember here stuff the early epoch manager needs,
// so we do not have to acquire the lock for that long.
epoch_num = hbbft_state.get_current_posdao_epoch();
epoch_start_block =
hbbft_state.get_current_posdao_epoch_start_block();
validator_set = hbbft_state.get_validator_set();
}
}
None => {
// maybe improve here, to return with a result, that triggers a retry soon.
debug!(target: "engine", "Unable to do_validator_engine_actions: Could not acquire read lock for hbbft state. Unable to decide about early epoch end. retrying soon.");
}
};
} // drop lock for hbbft_state
None => {
// maybe improve here, to return with a result, that triggers a retry soon.
debug!(target: "engine", "Unable to do_validator_engine_actions: Could not acquire read lock for hbbft state. Unable to decide about early epoch end. retrying soon.");
}
};
} // drop lock for hbbft_state

// if we do not have to do anything, we can return early.
if !(should_connect_to_validator_set || should_handle_early_epoch_end) {
return Ok(());
if !validator_set.is_empty() {
self.hbbft_peers_service.send_message(
HbbftConnectToPeersMessage::ConnectToCurrentPeers(validator_set),
)?;
}
}

self.hbbft_peers_service
.channel()
.send(HbbftConnectToPeersMessage::AnnounceAvailability)?;
self.do_keygen();

self.hbbft_peers_service
.send_message(HbbftConnectToPeersMessage::AnnounceOwnInternetAddress)?;
return Ok(());
}

if should_connect_to_validator_set {
self.hbbft_peers_service.send_message(
HbbftConnectToPeersMessage::ConnectToCurrentPeers(validator_set.clone()),
)?;
}
None => {
// client arc not ready yet,
// can happen during initialization and shutdown.
return Ok(());
}
}
}

if should_handle_early_epoch_end {
self.handle_early_epoch_end(
block_chain_client,
engine_client,
&mining_address,
epoch_start_block,
epoch_num,
&validator_set,
);
/// refreshes engine cache and returns the mining address.
fn refresh_engine_cache(&self, engine_client: &dyn EngineClient) -> Option<Address> {
let mining_address = match self.signer.read().as_ref() {
Some(signer) => signer.address(),
None => {
// we do not have a signer on Full and RPC nodes.
// here is a possible performance improvement:
// this won't change during the lifetime of the application ?!
return None;
}
};

if let Err(err) = self
.hbbft_engine_cache
.lock()
.refresh_cache(mining_address, engine_client)
{
warn!(target: "engine", "do_validator_engine_actions: data could not get updated, follow up tasks might fail: {:?}", err);
}

return Some(mining_address);
}

/// hbbft early epoch end actions are executed on a different timing than the regular validator engine steps
fn do_validator_engine_early_epoch_end_actions(&self) -> Result<(), Error> {
// here we need to differentiate the different engine functions,
// that requires different levels of access to the client.
trace!(target: "engine", "do_validator_engine_actions.");
match self.client_arc() {
Some(client_arc) => {
if self.is_syncing(&client_arc) {
// we are syncing - do not do anything.
trace!(target: "engine", "do_validator_engine_actions: skipping because we are syncing.");
return Ok(());
}

self.do_keygen();
let engine_client = client_arc.as_ref();
let mining_address = match self.refresh_engine_cache(engine_client) {
Some(h) => h,
None => return Ok(()),
};

let block_chain_client = match engine_client.as_full_client() {
Some(block_chain_client) => block_chain_client,
None => {
return Err("Unable to retrieve client.as_full_client()".into());
}
};

let hbbft_state_option = self.hbbft_state.try_read_for(Duration::from_millis(250));
match hbbft_state_option {
Some(hbbft_state) => {
if !hbbft_state.is_validator() {
// we are not known as validator, we dont have to further process early epoch end actions.
return Ok(());
}

// if we are a pending validator, we will also do the reserved peers management.
// we already remember here stuff the early epoch manager needs,
// so we do not have to acquire the lock for that long.
let epoch_num = hbbft_state.get_current_posdao_epoch();
let epoch_start_block = hbbft_state.get_current_posdao_epoch_start_block();
let validator_set = hbbft_state.get_validator_set();

self.handle_early_epoch_end(
block_chain_client,
engine_client,
&mining_address,
epoch_start_block,
epoch_num,
&validator_set,
);
}
None => {
// maybe improve here, to return with a result, that triggers a retry soon.
debug!(target: "engine", "Unable to do_validator_engine_early_epoch_end_actions: Could not acquire read lock for hbbft state. Unable to decide about early epoch end. retrying soon.");
}
};

return Ok(());
}

None => {
// client arc not ready yet,
// can happen during initialization and shutdown.
Expand Down Expand Up @@ -1417,16 +1462,21 @@ impl HoneyBadgerBFT {
}
}

/** returns if the signer of hbbft is tracked as available in the hbbft contracts..*/
/// returns if the signer of hbbft is tracked as available in the hbbft contracts.
pub fn is_available(&self) -> bool {
self.hbbft_engine_cache.lock().is_available()
}

/** returns if the signer of hbbft is stacked. */
/// returns if the signer of hbbft is stacked.
pub fn is_staked(&self) -> bool {
self.hbbft_engine_cache.lock().is_staked()
}

/// returns if the signer of hbbft is a current validator.
pub fn is_validator(&self) -> bool {
self.hbbft_state.read().is_validator()
}

fn start_hbbft_epoch_if_ready(&self) {
if let Some(client) = self.client_arc() {
if self.transaction_queue_and_time_thresholds_reached(&client) {
Expand Down
Loading
Loading