Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions crates/iota-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,15 @@ serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
static_assertions.workspace = true
sysinfo = "0.33"
tap.workspace = true
tch = { version = "0.13", features = ["download-libtorch"], optional = true }
tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full", "tracing", "test-util"] }
tokio-stream.workspace = true
tracing.workspace = true
twox-hash = "1.6"
ordered-float = { version = "5.0", features = ["serde"] }
tch = { version = "0.13", features = ["download-libtorch"], optional = true }
sysinfo = "0.33"

# internal dependencies
consensus-config.workspace = true
Expand Down
48 changes: 31 additions & 17 deletions crates/iota-core/src/congestion_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ use std::{
fs::OpenOptions,
io::Write,
path::PathBuf,
sync::Arc,
};

use iota_metrics::monitored_scope;
use iota_types::{
base_types::ObjectID,
digests::TransactionDigest,
Expand All @@ -18,20 +20,18 @@ use iota_types::{
transaction::{TransactionData, TransactionDataAPI},
};
use moka::{ops::compute::Op, sync::Cache};
use prometheus::Registry;
use serde::Deserialize;
use tracing::info;

#[cfg(feature = "gas-nn")]
use crate::model_updater::{
build_cp_update_batch, build_train_tx_batch, InNodeModelUpdater, ObjectCheckpointStats,
ObjectSnapshot, RawTxItem, ModelUpdater, ModelReader,
InNodeModelUpdater, ModelReader, ModelUpdater, ObjectCheckpointStats, ObjectSnapshot,
RawTxItem, build_cp_update_batch, build_train_tx_batch,
};
use tracing::info;
use iota_metrics::monitored_scope;
use prometheus::Registry;
use std::sync::Arc;
use crate::gas_metrics::{GasMetrics, init_gas_metrics};

use crate::{
execution_cache::TransactionCacheRead
execution_cache::TransactionCacheRead,
gas_metrics::{GasMetrics, init_gas_metrics},
};

/// Capacity of the congestion tracker's cache.
Expand Down Expand Up @@ -162,7 +162,8 @@ pub struct CongestionTracker {
}

impl CongestionTracker {
/// Compose and send model update and training batches for the given checkpoint.
/// Compose and send model update and training batches for the given
/// checkpoint.
#[cfg(feature = "gas-nn")]
fn inform_model(
&self,
Expand Down Expand Up @@ -266,7 +267,8 @@ impl CongestionTracker {
self.metrics.record_hw_sample("congestion.inform_model");
}

/// Fallback when gas-nn is disabled: still record metrics, but skip NN work.
/// Fallback when gas-nn is disabled: still record metrics, but skip NN
/// work.
#[cfg(not(feature = "gas-nn"))]
fn inform_model(
&self,
Expand Down Expand Up @@ -315,7 +317,9 @@ impl CongestionTracker {
effects: &[TransactionEffects],
) {
let _scope = monitored_scope("CongestionTracker::process_checkpoint_effects");
let h = self.metrics.latency_component("congestion.process_checkpoint_effects");
let h = self
.metrics
.latency_component("congestion.process_checkpoint_effects");
let timer = h.start_timer();
// Containers for checkpoint's congestion and clearing transactions data.
let mut congestion_txs_data: Vec<TxData> = Vec::with_capacity(effects.len());
Expand Down Expand Up @@ -353,7 +357,7 @@ impl CongestionTracker {
.predict_for_tx(tx_data, this.reference_gas_price),
)
}
.unwrap_or(self.reference_gas_price);
.unwrap_or(self.reference_gas_price);

// Skip system transactions
if gas_price == 1 {
Expand Down Expand Up @@ -413,8 +417,12 @@ impl CongestionTracker {
);
// Record touched objects histogram
let mut touched: std::collections::HashSet<ObjectID> = std::collections::HashSet::new();
for tx in &congestion_txs_data { touched.extend(tx.objects.iter().cloned()); }
for tx in &clearing_txs_data { touched.extend(tx.objects.iter().cloned()); }
for tx in &congestion_txs_data {
touched.extend(tx.objects.iter().cloned());
}
for tx in &clearing_txs_data {
touched.extend(tx.objects.iter().cloned());
}
self.metrics.touched_hist().observe(touched.len() as u64);

drop(timer);
Expand Down Expand Up @@ -522,7 +530,10 @@ impl CongestionTracker {
/// Non-blocking: reads from lock-free snapshot of per-object tips.
#[cfg(feature = "gas-nn")]
pub fn get_suggested_gas_price_with_nn(&self, transaction: &TransactionData) -> Option<u64> {
Some(self.model_reader.predict_for_tx(transaction, self.reference_gas_price))
Some(
self.model_reader
.predict_for_tx(transaction, self.reference_gas_price),
)
}

/// Returns a map of all objects and their hotness values.
Expand Down Expand Up @@ -568,7 +579,10 @@ impl CongestionTracker {

// Write header if the file is new
if !file_exists {
writeln!(file, "checkpoint,digest,gasprice,feedback,sui,ogd,nn,cleared")?;
writeln!(
file,
"checkpoint,digest,gasprice,feedback,sui,ogd,nn,cleared"
)?;
}

// Build row
Expand Down
40 changes: 29 additions & 11 deletions crates/iota-core/src/gas_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
// Copyright (c) 2025 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use std::sync::{Arc, Mutex};

use iota_metrics::histogram::{Histogram, HistogramVec};
use once_cell::sync::OnceCell;
use prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Registry, register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, register_int_gauge_with_registry};
use prometheus::{
IntCounterVec, IntGauge, IntGaugeVec, Registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry, register_int_gauge_with_registry,
};
use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, System, UpdateKind, get_current_pid};

/// Percentiles to export for latency histograms (in 1/1000ths).
Expand All @@ -21,12 +27,14 @@ pub struct GasMetrics {
/// Pending training items gauge.
pub pending_train_items: IntGauge,

/// Batches accounting. Labels: type ("update"|"train"), status ("received"|"dropped").
/// Batches accounting. Labels: type ("update"|"train"), status
/// ("received"|"dropped").
pub batches_total: IntCounterVec,
/// Inference accounting. Label: status ("success"|"fallback"|"none").
pub infer_total: IntCounterVec,

/// Per-operation process CPU usage (percent) as histogram; label: component.
/// Per-operation process CPU usage (percent) as histogram; label:
/// component.
pub proc_cpu_pct: HistogramVec,
/// Per-operation process RSS memory (bytes) as histogram; label: component.
pub proc_mem_bytes: HistogramVec,
Expand Down Expand Up @@ -63,7 +71,8 @@ impl GasMetrics {
"gas_predict_pending_train_items",
"Pending training items count",
registry,
).unwrap();
)
.unwrap();

let batches_total = register_int_counter_vec_with_registry!(
"gas_predict_batches_total",
Expand Down Expand Up @@ -111,8 +120,9 @@ impl GasMetrics {
}
}

/// Pre-create time series for common components so that dashboards show lines even
/// when NN is disabled or idle. This sends a single zero sample per series.
/// Pre-create time series for common components so that dashboards show
/// lines even when NN is disabled or idle. This sends a single zero
/// sample per series.
pub fn warmup_series(&self) {
let components = [
"model_updater.predict",
Expand All @@ -139,26 +149,30 @@ impl GasMetrics {
}

pub fn touched_hist(&self) -> Histogram {
self.touched_per_cp.with_label_values(&["congestion.touched"])
self.touched_per_cp
.with_label_values(&["congestion.touched"])
}

/// Sample process CPU% and RSS memory and record to histograms for the given component.
/// Sample process CPU% and RSS memory and record to histograms for the
/// given component.
pub fn record_hw_sample(&self, component: &str) {
if let Some((cpu_pct, rss_bytes)) = self.sampler.sample_proc() {
self.proc_cpu_pct
.with_label_values(&[component])
.observe(cpu_pct as u64);
self.proc_mem_bytes
.with_label_values(&[component])
.observe(rss_bytes as u64);
.observe(rss_bytes);
}
}
}

static GLOBAL: OnceCell<Arc<GasMetrics>> = OnceCell::new();

pub fn init_gas_metrics(registry: &Registry) -> Arc<GasMetrics> {
let m = GLOBAL.get_or_init(|| Arc::new(GasMetrics::new(registry))).clone();
let m = GLOBAL
.get_or_init(|| Arc::new(GasMetrics::new(registry)))
.clone();
// warmup once per process
m.warmup_series();
m
Expand Down Expand Up @@ -193,7 +207,11 @@ impl GasHwSampler {

fn sample_proc(&self) -> Option<(f32, u64)> {
let mut sys = self.sys.lock().ok()?;
let _ = sys.refresh_processes_specifics(ProcessesToUpdate::Some(&[self.pid]), false, self.refresh_kind);
let _ = sys.refresh_processes_specifics(
ProcessesToUpdate::Some(&[self.pid]),
false,
self.refresh_kind,
);
let p = sys.process(self.pid)?;
let cpu = p.cpu_usage();
let rss = p.memory();
Expand Down
2 changes: 1 addition & 1 deletion crates/iota-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ pub mod epoch;
pub mod execution_cache;
mod execution_driver;
mod fallback_fetch;
pub mod gas_metrics;
pub mod jsonrpc_index;
pub mod metrics;
pub mod mock_consensus;
pub mod model;
#[cfg(feature = "gas-nn")]
pub mod model_updater;
pub mod gas_metrics;
pub mod module_cache_metrics;
pub mod mysticeti_adapter;
pub mod overload_monitor;
Expand Down
5 changes: 4 additions & 1 deletion crates/iota-core/src/model.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) 2025 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

//! Libtorch (tch) port of the hotness-aware gas predictor.
//!
//! Parity with Python model:
Expand Down Expand Up @@ -241,7 +244,7 @@ impl GasLearner {
/// One-time warm-up to avoid first-call stalls (parity with Python
/// startup).
pub fn warmup(&self) {
let _ = tch::no_grad(|| {
tch::no_grad(|| {
let dummy = Tensor::zeros([1, T, F], (Kind::Float, self.vs.device()));
let (_pred, _attn) = self
.model
Expand Down
Loading
Loading