Skip to content

Commit

Permalink
Add protocol-related metrics (#822)
Browse files Browse the repository at this point in the history
  • Loading branch information
LukasGasior1 authored Jan 25, 2024
2 parents 6fda690 + 37c42be commit 09baf36
Show file tree
Hide file tree
Showing 6 changed files with 273 additions and 25 deletions.
22 changes: 22 additions & 0 deletions core-rust/node-common/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ pub trait TakesMetricLabels {
label2: impl MetricLabel,
label3: impl MetricLabel,
) -> Self::Metric;
fn with_four_labels(
&self,
label1: impl MetricLabel,
label2: impl MetricLabel,
label3: impl MetricLabel,
label4: impl MetricLabel,
) -> Self::Metric;
}

impl<T: MetricVecBuilder> TakesMetricLabels for MetricVec<T> {
Expand Down Expand Up @@ -194,6 +201,21 @@ impl<T: MetricVecBuilder> TakesMetricLabels for MetricVec<T> {
label3.prometheus_label_name().as_ref(),
])
}

fn with_four_labels(
&self,
label1: impl MetricLabel,
label2: impl MetricLabel,
label3: impl MetricLabel,
label4: impl MetricLabel,
) -> Self::Metric {
self.with_label_values(&[
label1.prometheus_label_name().as_ref(),
label2.prometheus_label_name().as_ref(),
label3.prometheus_label_name().as_ref(),
label4.prometheus_label_name().as_ref(),
])
}
}

/// Typically applied to enums or Errors where we wish to derive a label name.
Expand Down
221 changes: 220 additions & 1 deletion core-rust/state-manager/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,13 @@ use node_common::config::limits::*;
use node_common::locks::{LockFactory, Mutex};
use node_common::metrics::*;
use prometheus::{
Gauge, Histogram, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry,
Gauge, GaugeVec, Histogram, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry,
};
use radix_engine::blueprints::consensus_manager::EpochChangeEvent;

use crate::protocol::{
PendingProtocolUpdateState, ProtocolState, ProtocolUpdateEnactmentCondition,
};
use crate::store::traits::measurement::CategoryDbVolumeStatistic;
use radix_engine::transaction::TransactionFeeSummary;
use radix_engine_common::prelude::*;
Expand All @@ -98,6 +102,16 @@ pub struct CommittedTransactionsMetrics {
pub finalization_cost_units_consumed: Histogram,
}

pub struct ProtocolMetrics {
pub protocol_update_readiness_ratio: GaugeVec,
pub pending_update_threshold_required_ratio: GaugeVec,
pub pending_update_threshold_required_consecutive_epochs: IntGaugeVec,
pub pending_update_threshold_current_consecutive_epochs: IntGaugeVec,
pub pending_update_lower_bound_epoch: IntGaugeVec,
pub pending_update_upper_bound_epoch: IntGaugeVec,
pub enacted_protocol_update_state_version: IntGaugeVec,
}

pub struct VertexPrepareMetrics {
pub proposal_transactions_size: Histogram,
pub wasted_proposal_bandwidth: Histogram,
Expand Down Expand Up @@ -330,6 +344,211 @@ impl CommittedTransactionsMetrics {
}
}

impl ProtocolMetrics {
pub fn new(registry: &Registry, initial_protocol_state: &ProtocolState) -> Self {
let instance = Self {
protocol_update_readiness_ratio: GaugeVec::new(
opts(
"protocol_update_readiness_ratio",
"A ratio of supporting stake to total stake in the current validator set.",
),
&["readiness_signal_name"],
)
.registered_at(registry),
pending_update_threshold_required_ratio: GaugeVec::new(
opts(
"protocol_update_pending_threshold_required_ratio",
"Required readiness ratio for the given protocol update threshold.",
),
&["protocol_version_name", "readiness_signal_name", "threshold_index", "threshold_consecutive_epochs"],
)
.registered_at(registry),
pending_update_threshold_required_consecutive_epochs: IntGaugeVec::new(
opts(
"protocol_update_pending_threshold_required_consecutive_epochs",
"Required number of consecutive epochs of support for the given protocol update threshold.",
),
&["protocol_version_name", "readiness_signal_name", "threshold_index", "threshold_consecutive_epochs"],
)
.registered_at(registry),
pending_update_threshold_current_consecutive_epochs: IntGaugeVec::new(
opts(
"protocol_update_pending_threshold_current_consecutive_epochs",
"Current number of consecutive epochs of support for the given protocol update threshold.",
),
&["protocol_version_name", "readiness_signal_name", "threshold_index", "threshold_consecutive_epochs"],
)
.registered_at(registry),
pending_update_lower_bound_epoch: IntGaugeVec::new(
opts(
"protocol_update_pending_lower_bound_epoch",
"Earliest epoch when the given protocol update can be enacted (inclusive)",
),
&["protocol_version_name", "readiness_signal_name"],
)
.registered_at(registry),
pending_update_upper_bound_epoch: IntGaugeVec::new(
opts(
"protocol_update_pending_upper_bound_epoch",
"Upper bound epoch for the given protocol update (exclusive)",
),
&["protocol_version_name", "readiness_signal_name"],
)
.registered_at(registry),
enacted_protocol_update_state_version: IntGaugeVec::new(
opts(
"protocol_update_enacted_state_version",
"State version at which the protocol update was enacted (init proof)",
),
&["protocol_version_name"],
)
.registered_at(registry),
};

instance.update_state_based_metrics(initial_protocol_state);

instance
}

pub fn update(&self, protocol_state: &ProtocolState, epoch_change: &EpochChangeEvent) {
self.update_state_based_metrics(protocol_state);
self.update_epoch_change_based_metrics(epoch_change);
}

/// Updates the metrics that are based on ProtocolState (pending, enacted updates)
fn update_state_based_metrics(&self, protocol_state: &ProtocolState) {
// Reset the metrics (to clear leftover pending updates as they transition to enacted)
self.pending_update_threshold_required_ratio.reset();
self.pending_update_threshold_required_consecutive_epochs
.reset();
self.pending_update_threshold_current_consecutive_epochs
.reset();
self.pending_update_lower_bound_epoch.reset();
self.pending_update_upper_bound_epoch.reset();
self.enacted_protocol_update_state_version.reset();

for pending_protocol_update in protocol_state.pending_protocol_updates.iter() {
let protocol_update = &pending_protocol_update.protocol_update;
match &pending_protocol_update.protocol_update.enactment_condition {
ProtocolUpdateEnactmentCondition::EnactAtStartOfEpochIfValidatorsReady {
lower_bound_inclusive,
upper_bound_exclusive,
..
} => {
let readiness_signal_name = protocol_update.readiness_signal_name();
self.pending_update_lower_bound_epoch
.with_two_labels(
protocol_update.next_protocol_version.to_string(),
readiness_signal_name.to_string(),
)
.set(lower_bound_inclusive.number() as i64);
self.pending_update_upper_bound_epoch
.with_two_labels(
protocol_update.next_protocol_version.to_string(),
readiness_signal_name.to_string(),
)
.set(upper_bound_exclusive.number() as i64);
match &pending_protocol_update.state {
PendingProtocolUpdateState::ForSignalledReadinessSupportCondition {
thresholds_state,
} => {
for (index, (threshold, threshold_state)) in
thresholds_state.iter().enumerate()
{
self.pending_update_threshold_required_ratio
.with_four_labels(
protocol_update.next_protocol_version.to_string(),
readiness_signal_name.to_string(),
index.to_string(),
threshold
.required_consecutive_completed_epochs_of_support
.to_string(),
)
.set(dec_to_f64_for_metrics(
&threshold.required_ratio_of_stake_supported,
));
self.pending_update_threshold_required_consecutive_epochs
.with_four_labels(
protocol_update.next_protocol_version.to_string(),
readiness_signal_name.to_string(),
index.to_string(),
threshold
.required_consecutive_completed_epochs_of_support
.to_string(),
)
.set(
threshold.required_consecutive_completed_epochs_of_support
as i64,
);
self.pending_update_threshold_current_consecutive_epochs
.with_four_labels(
protocol_update.next_protocol_version.to_string(),
readiness_signal_name.to_string(),
index.to_string(),
threshold
.required_consecutive_completed_epochs_of_support
.to_string(),
)
.set(
threshold_state.consecutive_started_epochs_of_support
as i64,
);
}
}
PendingProtocolUpdateState::Empty => { /* no-op, shouldn't happen */ }
}
}
ProtocolUpdateEnactmentCondition::EnactAtStartOfEpochUnconditionally(epoch) => {
self.pending_update_lower_bound_epoch
.with_two_labels(
protocol_update.next_protocol_version.to_string(),
"".to_string(),
)
.set(epoch.number() as i64);
self.pending_update_upper_bound_epoch
.with_two_labels(
protocol_update.next_protocol_version.to_string(),
"".to_string(),
)
.set(epoch.number() as i64 + 1);
}
}
}

for (state_version, protocol_version_name) in protocol_state.enacted_protocol_updates.iter()
{
self.enacted_protocol_update_state_version
.with_label(protocol_version_name.to_string())
.set(state_version.number() as i64);
}
}

/// Updates the metrics that are based on epoch change event
fn update_epoch_change_based_metrics(&self, epoch_change: &EpochChangeEvent) {
self.protocol_update_readiness_ratio.reset();

let total_stake = epoch_change
.validator_set
.total_active_stake_xrd()
.expect("Failed to calculate the total stake");
for (readiness_signal_name, stake_readiness) in
epoch_change.significant_protocol_update_readiness.iter()
{
let readiness_ratio = stake_readiness
.checked_div(total_stake)
.unwrap_or(Decimal::ZERO);
self.protocol_update_readiness_ratio
.with_label(readiness_signal_name)
.set(dec_to_f64_for_metrics(&readiness_ratio))
}
}
}

/// Unsafe, metrics-only conversion
fn dec_to_f64_for_metrics(input: &Decimal) -> f64 {
f64::from_str(&input.to_string()).unwrap_or(0f64)
}

impl VertexPrepareMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ impl ProtocolUpdateTransactionCommitter {
consensus_parent_round_timestamp_ms: latest_header
.consensus_parent_round_timestamp_ms,
proposer_timestamp_ms: latest_header.proposer_timestamp_ms,
next_epoch: series_executor.next_epoch().cloned(),
next_epoch: series_executor.epoch_change().map(|ev| ev.into()),
next_protocol_version: None,
},
origin: LedgerProofOrigin::ProtocolUpdate {
Expand Down
8 changes: 2 additions & 6 deletions core-rust/state-manager/src/staging/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,8 @@ impl ProcessedCommitResult {
self
}

pub fn next_epoch(&self) -> Option<NextEpoch> {
self.local_receipt
.local_execution
.next_epoch
.as_ref()
.map(|next_epoch_result| NextEpoch::from(next_epoch_result.clone()))
pub fn epoch_change(&self) -> Option<EpochChangeEvent> {
self.local_receipt.local_execution.next_epoch.clone()
}

// TODO(after RCnet-v3): Extract the `pub fn`s below (re-used by preview) to an isolated helper.
Expand Down
30 changes: 20 additions & 10 deletions core-rust/state-manager/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub struct StateComputer<S> {
ledger_transaction_validator: RwLock<LedgerTransactionValidator>,
ledger_metrics: LedgerMetrics,
committed_transactions_metrics: CommittedTransactionsMetrics,
protocol_metrics: ProtocolMetrics,
vertex_prepare_metrics: VertexPrepareMetrics,
vertex_limits_config: VertexLimitsConfig,
protocol_state: RwLock<ProtocolState>,
Expand Down Expand Up @@ -147,6 +148,8 @@ impl<
let committed_transactions_metrics =
CommittedTransactionsMetrics::new(metrics_registry, &execution_configurator.read());

let protocol_metrics = ProtocolMetrics::new(metrics_registry, &initial_protocol_state);

StateComputer {
network: network.clone(),
store,
Expand All @@ -169,6 +172,7 @@ impl<
current_ledger_proposer_timestamp_ms,
),
committed_transactions_metrics,
protocol_metrics,
protocol_state: lock_factory
.named("protocol_state")
.new_rwlock(initial_protocol_state),
Expand Down Expand Up @@ -304,7 +308,7 @@ where
raw,
validated,
ledger_hashes: commit.hash_structures_diff.ledger_hashes,
next_epoch: commit.next_epoch(),
next_epoch: commit.epoch_change().map(|ev| ev.into()),
}
}

Expand Down Expand Up @@ -484,8 +488,8 @@ where
break;
}

// Don't process any additional transactions if next epoch has occurred
if series_executor.next_epoch().is_some() {
// Don't process any additional transactions if epoch change has occurred
if series_executor.epoch_change().is_some() {
stop_reason = VertexPrepareStopReason::EpochChange;
break;
}
Expand Down Expand Up @@ -697,7 +701,7 @@ where
PrepareResult {
committed: committable_transactions,
rejected: rejected_transactions,
next_epoch: series_executor.next_epoch().cloned(),
next_epoch: series_executor.epoch_change().map(|ev| ev.into()),
next_protocol_version: series_executor.next_protocol_version(),
ledger_hashes: *series_executor.latest_ledger_hashes(),
}
Expand Down Expand Up @@ -1173,13 +1177,21 @@ where
});
}

let new_protocol_state = series_executor.protocol_state();

// Update the protocol metrics
let epoch_change = series_executor.epoch_change();
if let Some(epoch_change) = &epoch_change {
self.protocol_metrics
.update(&new_protocol_state, epoch_change)
}

// Step 4.: Check final invariants, perform the DB commit
if series_executor.next_epoch() != commit_ledger_header.next_epoch.as_ref() {
let next_epoch: Option<NextEpoch> = epoch_change.map(|ev| ev.into());
if next_epoch != commit_ledger_header.next_epoch {
panic!(
"resultant next epoch at version {} differs from the proof ({:?} != {:?})",
commit_state_version,
series_executor.next_epoch(),
commit_ledger_header.next_epoch
commit_state_version, next_epoch, commit_ledger_header.next_epoch
);
}

Expand Down Expand Up @@ -1211,8 +1223,6 @@ where
.as_ref()
.map(|next_epoch| next_epoch.epoch);

let new_protocol_state = series_executor.protocol_state();

write_store.commit(CommitBundle {
transactions: committed_transaction_bundles,
proof: commit_request.proof,
Expand Down
Loading

0 comments on commit 09baf36

Please sign in to comment.