From 53328acc813816757c57f3279cbd5f2aa738d2f0 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Mon, 19 Feb 2024 15:30:13 +0100 Subject: [PATCH] chore: backport fix of 0.0.11 (#453) Signed-off-by: Simon Paitrault Co-authored-by: David Palm --- Cargo.lock | 2 + Cargo.toml | 1 + crates/topos-config/src/tce.rs | 6 ++ .../topos-config/src/tce/synchronization.rs | 36 ++++++++ .../proto/topos/tce/v1/synchronization.proto | 2 + .../src/api/grpc/generated/topos.bin | Bin 16831 -> 16928 bytes .../src/api/grpc/generated/topos.tce.v1.rs | 2 + crates/topos-metrics/src/storage.rs | 16 +++- .../tests/subnet_contract.rs | 19 ++-- crates/topos-tce-api/src/graphql/query.rs | 16 ++++ crates/topos-tce-api/tests/runtime.rs | 71 +++++++++++++++ crates/topos-tce-storage/Cargo.toml | 1 + crates/topos-tce-storage/src/errors.rs | 3 + crates/topos-tce-storage/src/fullnode/mod.rs | 8 +- .../topos-tce-storage/src/rocks/db_column.rs | 17 +++- crates/topos-tce-storage/src/store.rs | 2 +- .../src/tests/checkpoints.rs | 4 +- crates/topos-tce-storage/src/tests/mod.rs | 4 +- crates/topos-tce-storage/src/validator/mod.rs | 83 ++++++++++++++++-- crates/topos-tce-synchronizer/Cargo.toml | 1 + crates/topos-tce-synchronizer/src/builder.rs | 15 ++-- .../src/checkpoints_collector/config.rs | 15 ---- .../src/checkpoints_collector/mod.rs | 12 ++- .../src/checkpoints_collector/tests.rs | 1 + crates/topos-tce-synchronizer/src/lib.rs | 16 +++- crates/topos-tce/src/lib.rs | 1 + 26 files changed, 295 insertions(+), 59 deletions(-) create mode 100644 crates/topos-config/src/tce/synchronization.rs delete mode 100644 crates/topos-tce-synchronizer/src/checkpoints_collector/config.rs diff --git a/Cargo.lock b/Cargo.lock index 4712f223b..0a2ab77b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8184,6 +8184,7 @@ dependencies = [ "tokio", "tokio-stream", "topos-core", + "topos-metrics", "topos-test-sdk", "tracing", "tracing-subscriber", @@ -8206,6 +8207,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tonic 0.10.2", + "topos-config", "topos-core", "topos-p2p", "topos-tce-gatekeeper", diff --git a/Cargo.toml b/Cargo.toml index 2a42d590d..cbc89c01b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ strip = true [workspace.dependencies] topos-core = { path = "./crates/topos-core", default-features = false } topos-crypto = { path = "./crates/topos-crypto", default-features = false } +topos-metrics = { path = "./crates/topos-metrics/", default-features = false } # Various utility crates clap = { version = "4.0", features = ["derive", "env", "string"] } diff --git a/crates/topos-config/src/tce.rs b/crates/topos-config/src/tce.rs index f2c2dec18..dcce94cd1 100644 --- a/crates/topos-config/src/tce.rs +++ b/crates/topos-config/src/tce.rs @@ -15,9 +15,11 @@ use topos_p2p::{Multiaddr, PeerId}; use self::broadcast::ReliableBroadcastParams; use self::p2p::P2PConfig; +use self::synchronization::SynchronizationConfig; pub mod broadcast; pub mod p2p; +pub mod synchronization; const DEFAULT_IP: std::net::Ipv4Addr = std::net::Ipv4Addr::new(0, 0, 0, 0); @@ -68,6 +70,10 @@ pub struct TceConfig { #[serde(default)] pub p2p: P2PConfig, + /// Synchronization configuration + #[serde(default)] + pub synchronization: SynchronizationConfig, + /// gRPC API Addr #[serde(default = "default_grpc_api_addr")] pub grpc_api_addr: SocketAddr, diff --git a/crates/topos-config/src/tce/synchronization.rs b/crates/topos-config/src/tce/synchronization.rs new file mode 100644 index 000000000..222cd1030 --- /dev/null +++ b/crates/topos-config/src/tce/synchronization.rs @@ -0,0 +1,36 @@ +use serde::{Deserialize, Serialize}; + +/// Configuration for the TCE synchronization +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "kebab-case")] +pub struct SynchronizationConfig { + /// Interval in seconds to synchronize the TCE + #[serde(default = "SynchronizationConfig::default_interval_seconds")] + pub interval_seconds: u64, + + /// Maximum number of Proof of delivery per query per subnet + #[serde(default = "SynchronizationConfig::default_limit_per_subnet")] + pub limit_per_subnet: usize, +} + +impl Default for SynchronizationConfig { + fn default() -> Self { + Self { + interval_seconds: SynchronizationConfig::INTERVAL_SECONDS, + limit_per_subnet: SynchronizationConfig::LIMIT_PER_SUBNET, + } + } +} + +impl SynchronizationConfig { + pub const INTERVAL_SECONDS: u64 = 10; + pub const LIMIT_PER_SUBNET: usize = 100; + + const fn default_interval_seconds() -> u64 { + Self::INTERVAL_SECONDS + } + + const fn default_limit_per_subnet() -> usize { + Self::LIMIT_PER_SUBNET + } +} diff --git a/crates/topos-core/proto/topos/tce/v1/synchronization.proto b/crates/topos-core/proto/topos/tce/v1/synchronization.proto index 09f56af18..a3f15d181 100644 --- a/crates/topos-core/proto/topos/tce/v1/synchronization.proto +++ b/crates/topos-core/proto/topos/tce/v1/synchronization.proto @@ -18,6 +18,8 @@ message CheckpointRequest { topos.shared.v1.UUID request_id = 1; repeated ProofOfDelivery checkpoint = 2; + + uint64 limit_per_subnet = 3; } message CheckpointResponse { diff --git a/crates/topos-core/src/api/grpc/generated/topos.bin b/crates/topos-core/src/api/grpc/generated/topos.bin index 635dba028d9a6cc52959cd2f574ff754f5fef4e0..adc02a560befb435a981ada5a9e72be1a47bd1ac 100644 GIT binary patch delta 1071 zcmX|=KX21O7>9l5vvJO$DozuJC~52@O&zL)#z~r_5(7xUieN%QR3K%j1QjY}L>^#a zHUlHy0B9Lt;*N^P`--i1wmgTQrzq}UzV3+0Z_|quL zN>5@K3|JxRMWj#8U1O^FF2X~_qLJxkQt&n+ROy*N@XJQ7`} zr(Gzo8*~v~2*F7v5j8afz}XU^(?RG=!Yx}vUUDN+Qk+6kv2RKwm5Qn*NTt`?mZ-C% z6)}d8bZ@An+)gA0&PZFiXd48HT;RwqNe#=QyDWz0hOmT&p}8Rta)#!HT$0K7vDy#- znatb}2r^li*&20@7M2i_t=39LMB>1?tzGcfxd%ZCmq6f(ATx%{v^-1vVr+!4jIOZ} z0wHT`gaipq_*`uRfY8JUfgm&q`?f}%gF{OQndCDi_)EVpV9fXis+S4Hgy@ffC z%?9)q=H@MU>Z6!fB3r}UyxvfU!P{Co$NDfn(w9L^<@t1$m8h!`0OD5W^8=xA6>i%a j;#Rez9W#g8TIgrzbB4L~|2fiMhdXAD^w(GWy)WH=J%d!A delta 951 zcmX|=L5`F_6o#p)2QtNIl-!IwczmfP6$>Dgl8g9j3*!A!&{&4ed=F`FN zKzboL&VG1*@{3J}m&w;$N=_`K$hps?l!6!wMdZ)rD@nw-6N=gJHhtaCcq9*{tBw$u z#Y;;9RGi8{$5EQ|&XgD_l9oJBkXus8RbvT5=IsvmbK)T-&SG@oNF{!T;XM&oGAT`T z55QG4P6r{G26t=?^U`~vmf~tiMrWp^q%u0R1gY%yi6xaxMn$w?NOoFkDUpPfJ7=Sn zKrHtcvc)3{B~=W6vaT=8hH!*HVKxN9oWg8KD4CL<>k9#pDb0pJkST))wx*Kn))I!4 z`zKn4LRvcyM;E+0vG$j6@d+z}%my;`@N?l^Y>W_&k=hs`5N2(R5GkQ8f2^+oKxk`( zKoHsnD_c{^qOyb`+vd^mri%MmFWObNSNw$z0MK0VF%Cj=RX?>gG*{1_n>LhcKHlPL zgW4E?Fr*H~FAt=WacYLB_fs!(8T5D$F6W_Wcx-AMA4kJ|OXzD3LQCjtjv~EE{0*(M zA6Mxi)%p`H<*8@c(UEQeVBF4pYaj%7!ChMu=fKI-^r6<#v8gfib^rU6eiuw_4I16e Ib^bB?2jXi*SpWb4 diff --git a/crates/topos-core/src/api/grpc/generated/topos.tce.v1.rs b/crates/topos-core/src/api/grpc/generated/topos.tce.v1.rs index 7f2ea84af..62810b084 100644 --- a/crates/topos-core/src/api/grpc/generated/topos.tce.v1.rs +++ b/crates/topos-core/src/api/grpc/generated/topos.tce.v1.rs @@ -7,6 +7,8 @@ pub struct CheckpointRequest { pub request_id: ::core::option::Option, #[prost(message, repeated, tag = "2")] pub checkpoint: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "3")] + pub limit_per_subnet: u64, } #[derive(serde::Deserialize, serde::Serialize)] #[allow(clippy::derive_partial_eq_without_eq)] diff --git a/crates/topos-metrics/src/storage.rs b/crates/topos-metrics/src/storage.rs index 82a3ef619..4698856eb 100644 --- a/crates/topos-metrics/src/storage.rs +++ b/crates/topos-metrics/src/storage.rs @@ -1,6 +1,6 @@ use prometheus::{ - self, register_histogram_with_registry, register_int_counter_with_registry, Histogram, - IntCounter, + self, register_histogram_with_registry, register_int_counter_with_registry, + register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, }; use lazy_static::lazy_static; @@ -31,4 +31,16 @@ lazy_static! { TOPOS_METRIC_REGISTRY ) .unwrap(); + pub static ref STORAGE_PENDING_POOL_COUNT: IntGauge = register_int_gauge_with_registry!( + "storage_pending_pool_count", + "Number of certificates in the pending pool.", + TOPOS_METRIC_REGISTRY + ) + .unwrap(); + pub static ref STORAGE_PRECEDENCE_POOL_COUNT: IntGauge = register_int_gauge_with_registry!( + "storage_precedence_pool_count", + "Number of certificates in the precedence pool.", + TOPOS_METRIC_REGISTRY + ) + .unwrap(); } diff --git a/crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs b/crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs index 7ca5e54f9..da2256324 100644 --- a/crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs +++ b/crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs @@ -321,18 +321,19 @@ async fn deploy_test_token( "Deploying new token {} with symbol {}", token_name, token_symbol ); - - let deploy_query = ierc20_messaging + let deploy_outcome = ierc20_messaging .deploy_token(token_encoded_params) .legacy() - .gas(DEFAULT_GAS); - - let deploy_result = deploy_query.send().await.map_err(|e| { - error!("Unable deploy token: {e}"); - e - })?; + .gas(DEFAULT_GAS) + .send() + .await + .map_err(|e| { + error!("Unable deploy token: {e}"); + e + })? + .await; - match deploy_result.await { + match deploy_outcome { Ok(r) => { info!("Token deployed: {:?}", r); } diff --git a/crates/topos-tce-api/src/graphql/query.rs b/crates/topos-tce-api/src/graphql/query.rs index 596c64140..4c2bc8751 100644 --- a/crates/topos-tce-api/src/graphql/query.rs +++ b/crates/topos-tce-api/src/graphql/query.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use async_graphql::{Context, EmptyMutation, Object, Schema, Subscription}; @@ -13,6 +14,7 @@ use topos_core::api::graphql::{ query::CertificateQuery, }; use topos_core::types::stream::CertificateSourceStreamPosition; +use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT}; use topos_tce_storage::fullnode::FullNodeStore; use topos_tce_storage::store::ReadStore; @@ -114,6 +116,20 @@ impl QueryRoot { Self::certificate_by_id(ctx, certificate_id).await } + /// This endpoint is used to get the current storage pool stats. + /// It returns the number of certificates in the pending and precedence pools. + /// The values are estimated as having a precise count is costly. + async fn get_storage_pool_stats( + &self, + _ctx: &Context<'_>, + ) -> Result, GraphQLServerError> { + let mut stats = HashMap::new(); + stats.insert("pending_pool", STORAGE_PENDING_POOL_COUNT.get()); + stats.insert("precedence_pool", STORAGE_PRECEDENCE_POOL_COUNT.get()); + + Ok(stats) + } + /// This endpoint is used to get the current checkpoint of the source streams. /// The checkpoint is the position of the last certificate delivered for each source stream. async fn get_checkpoint( diff --git a/crates/topos-tce-api/tests/runtime.rs b/crates/topos-tce-api/tests/runtime.rs index d269d4f91..ce451386a 100644 --- a/crates/topos-tce-api/tests/runtime.rs +++ b/crates/topos-tce-api/tests/runtime.rs @@ -21,6 +21,7 @@ use topos_core::{ }, uci::Certificate, }; +use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT}; use topos_tce_api::{Runtime, RuntimeEvent}; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::StorageClient; @@ -621,3 +622,73 @@ async fn can_query_graphql_endpoint_for_certificates( graphql_certificate.source_subnet_id ); } + +#[rstest] +#[timeout(Duration::from_secs(4))] +#[test(tokio::test)] +async fn check_storage_pool_stats( + broadcast_stream: broadcast::Receiver, +) { + let addr = get_available_addr(); + let graphql_addr = get_available_addr(); + let metrics_addr = get_available_addr(); + + let fullnode_store = create_fullnode_store::default().await; + + let store = + create_validator_store(vec![], futures::future::ready(fullnode_store.clone())).await; + STORAGE_PENDING_POOL_COUNT.set(10); + STORAGE_PRECEDENCE_POOL_COUNT.set(200); + + let storage_client = StorageClient::new(store.clone()); + + let (_runtime_client, _launcher, _ctx) = Runtime::builder() + .with_broadcast_stream(broadcast_stream) + .storage(storage_client) + .store(store) + .serve_grpc_addr(addr) + .serve_graphql_addr(graphql_addr) + .serve_metrics_addr(metrics_addr) + .build_and_launch() + .await; + + // Wait for server to boot + tokio::time::sleep(Duration::from_millis(100)).await; + + let query = "query {getStoragePoolStats}"; + + #[derive(Debug, Deserialize)] + struct Response { + // data: HashMap, + data: Stats, + } + + #[derive(Debug, Deserialize)] + #[serde(rename_all = "camelCase")] + struct Stats { + get_storage_pool_stats: PoolStats, + } + + #[derive(Debug, Deserialize)] + struct PoolStats { + pending_pool: u64, + precedence_pool: u64, + } + + let client = reqwest::Client::new(); + + let response = client + .post(format!("http://{}", graphql_addr)) + .json(&serde_json::json!({ + "query": query, + })) + .send() + .await + .unwrap() + .json::() + .await + .unwrap(); + + assert_eq!(response.data.get_storage_pool_stats.pending_pool, 10); + assert_eq!(response.data.get_storage_pool_stats.precedence_pool, 200); +} diff --git a/crates/topos-tce-storage/Cargo.toml b/crates/topos-tce-storage/Cargo.toml index 176e4326b..4b9cbcc7e 100644 --- a/crates/topos-tce-storage/Cargo.toml +++ b/crates/topos-tce-storage/Cargo.toml @@ -8,6 +8,7 @@ workspace = true [dependencies] topos-core = { workspace = true, features = ["uci", "api"] } +topos-metrics = { workspace = true } async-stream.workspace = true async-trait.workspace = true diff --git a/crates/topos-tce-storage/src/errors.rs b/crates/topos-tce-storage/src/errors.rs index bb9dee8e1..eba5c322c 100644 --- a/crates/topos-tce-storage/src/errors.rs +++ b/crates/topos-tce-storage/src/errors.rs @@ -33,6 +33,9 @@ pub enum InternalStorageError { #[error("Invalid query argument: {0}")] InvalidQueryArgument(&'static str), + #[error("Unexpected DB state: {0}")] + UnexpectedDBState(&'static str), + #[error(transparent)] Bincode(#[from] Box), diff --git a/crates/topos-tce-storage/src/fullnode/mod.rs b/crates/topos-tce-storage/src/fullnode/mod.rs index e5de4d8b8..600bfa625 100644 --- a/crates/topos-tce-storage/src/fullnode/mod.rs +++ b/crates/topos-tce-storage/src/fullnode/mod.rs @@ -3,6 +3,7 @@ use std::{collections::HashMap, sync::Arc}; use arc_swap::ArcSwap; use async_trait::async_trait; +use rocksdb::properties::ESTIMATE_NUM_KEYS; use topos_core::{ types::{ stream::{CertificateSourceStreamPosition, CertificateTargetStreamPosition, Position}, @@ -233,8 +234,11 @@ impl WriteStore for FullNodeStore { } impl ReadStore for FullNodeStore { - fn count_certificates_delivered(&self) -> Result { - Ok(self.perpetual_tables.certificates.iter()?.count()) + fn count_certificates_delivered(&self) -> Result { + Ok(self + .perpetual_tables + .certificates + .property_int_value(ESTIMATE_NUM_KEYS)?) } fn get_source_head(&self, subnet_id: &SubnetId) -> Result, StorageError> { diff --git a/crates/topos-tce-storage/src/rocks/db_column.rs b/crates/topos-tce-storage/src/rocks/db_column.rs index ff380eed2..2f81281af 100644 --- a/crates/topos-tce-storage/src/rocks/db_column.rs +++ b/crates/topos-tce-storage/src/rocks/db_column.rs @@ -6,8 +6,8 @@ use std::path::Path; #[cfg(test)] use rocksdb::ColumnFamilyDescriptor; use rocksdb::{ - BoundColumnFamily, DBRawIteratorWithThreadMode, DBWithThreadMode, Direction, IteratorMode, - MultiThreaded, ReadOptions, WriteBatch, + BoundColumnFamily, CStrLike, DBRawIteratorWithThreadMode, DBWithThreadMode, Direction, + IteratorMode, MultiThreaded, ReadOptions, WriteBatch, }; use bincode::Options; @@ -66,7 +66,7 @@ impl DBColumn { } /// Returns the CF of the DBColumn, used to build queries. - fn cf(&self) -> Result>, InternalStorageError> { + pub(crate) fn cf(&self) -> Result>, InternalStorageError> { self.rocksdb .cf_handle(self.cf) .ok_or(InternalStorageError::InvalidColumnFamily(self.cf)) @@ -78,6 +78,17 @@ where K: DeserializeOwned + Serialize + std::fmt::Debug, V: DeserializeOwned + Serialize + std::fmt::Debug, { + pub(crate) fn property_int_value( + &self, + property: impl CStrLike, + ) -> Result { + self.rocksdb + .property_int_value_cf(&self.cf()?, property)? + .ok_or(InternalStorageError::UnexpectedDBState( + "Property not found", + )) + } + /// Insert a record into the storage by passing a Key and a Value. /// /// Key are fixed length bincode serialized. diff --git a/crates/topos-tce-storage/src/store.rs b/crates/topos-tce-storage/src/store.rs index 16fe94851..12ac2465b 100644 --- a/crates/topos-tce-storage/src/store.rs +++ b/crates/topos-tce-storage/src/store.rs @@ -43,7 +43,7 @@ pub trait WriteStore: Send { /// [`FullNodeStore`](struct@super::fullnode::FullNodeStore) to read data. pub trait ReadStore: Send { /// Returns the number of certificates delivered - fn count_certificates_delivered(&self) -> Result; + fn count_certificates_delivered(&self) -> Result; /// Try to get a SourceHead of a subnet /// diff --git a/crates/topos-tce-storage/src/tests/checkpoints.rs b/crates/topos-tce-storage/src/tests/checkpoints.rs index 947eb381b..0331c061c 100644 --- a/crates/topos-tce-storage/src/tests/checkpoints.rs +++ b/crates/topos-tce-storage/src/tests/checkpoints.rs @@ -54,7 +54,7 @@ async fn get_checkpoint_diff_with_no_input(store: Arc) { } let checkpoint = store - .get_checkpoint_diff(&[]) + .get_checkpoint_diff(&[], 100) .unwrap() .into_iter() .map(|(subnet, proofs)| { @@ -97,7 +97,7 @@ async fn get_checkpoint_diff_with_input(store: Arc) { } let checkpoint = store - .get_checkpoint_diff(&[checkpoint]) + .get_checkpoint_diff(&[checkpoint], 100) .unwrap() .into_iter() .map(|(subnet, proofs)| { diff --git a/crates/topos-tce-storage/src/tests/mod.rs b/crates/topos-tce-storage/src/tests/mod.rs index a8d3c25e1..4e2fc1384 100644 --- a/crates/topos-tce-storage/src/tests/mod.rs +++ b/crates/topos-tce-storage/src/tests/mod.rs @@ -485,7 +485,7 @@ async fn get_source_head_for_subnet(store: Arc) { create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_2], 10); store - .insert_certificates_delivered(&expected_certificates_for_source_subnet_1) + .insert_certificates_delivered(&expected_certificates_for_source_subnet_1[..]) .await .unwrap(); @@ -493,7 +493,7 @@ async fn get_source_head_for_subnet(store: Arc) { create_certificate_chain(SOURCE_SUBNET_ID_2, &[TARGET_SUBNET_ID_2], 10); store - .insert_certificates_delivered(&expected_certificates_for_source_subnet_2) + .insert_certificates_delivered(&expected_certificates_for_source_subnet_2[..]) .await .unwrap(); diff --git a/crates/topos-tce-storage/src/validator/mod.rs b/crates/topos-tce-storage/src/validator/mod.rs index 15275e8cc..d9fcc137a 100644 --- a/crates/topos-tce-storage/src/validator/mod.rs +++ b/crates/topos-tce-storage/src/validator/mod.rs @@ -22,6 +22,7 @@ use std::{ use async_trait::async_trait; +use rocksdb::properties::ESTIMATE_NUM_KEYS; use topos_core::{ types::{ stream::{CertificateSourceStreamPosition, Position}, @@ -29,7 +30,8 @@ use topos_core::{ }, uci::{Certificate, CertificateId, SubnetId, INITIAL_CERTIFICATE_ID}, }; -use tracing::{debug, error, info, instrument}; +use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT}; +use tracing::{debug, error, info, instrument, warn}; use crate::{ errors::{InternalStorageError, StorageError}, @@ -68,11 +70,51 @@ impl ValidatorStore { fullnode_store: Arc, ) -> Result, StorageError> { let pending_tables: ValidatorPendingTables = ValidatorPendingTables::open(path); + let store = Arc::new(Self { pending_tables, fullnode_store, }); + store.pending_tables.pending_pool.rocksdb.compact_range_cf( + &store.pending_tables.pending_pool.cf()?, + None::<&[u8]>, + None::<&[u8]>, + ); + store + .pending_tables + .precedence_pool + .rocksdb + .compact_range_cf( + &store.pending_tables.precedence_pool.cf()?, + None::<&[u8]>, + None::<&[u8]>, + ); + + let pending_count: i64 = + store + .count_pending_certificates()? + .try_into() + .map_err(|error| { + warn!("Failed to convert estimate-num-keys to i64: {}", error); + StorageError::InternalStorage(InternalStorageError::UnexpectedDBState( + "Failed to convert estimate-num-keys to i64", + )) + })?; + + let precedence_count: i64 = store + .count_precedence_pool_certificates()? + .try_into() + .map_err(|error| { + warn!("Failed to convert estimate-num-keys to i64: {}", error); + StorageError::InternalStorage(InternalStorageError::UnexpectedDBState( + "Failed to convert estimate-num-keys to i64", + )) + })?; + + STORAGE_PENDING_POOL_COUNT.set(pending_count); + STORAGE_PRECEDENCE_POOL_COUNT.set(precedence_count); + Ok(store) } @@ -82,13 +124,19 @@ impl ValidatorStore { } /// Returns the number of certificates in the pending pool - pub fn count_pending_certificates(&self) -> Result { - Ok(self.pending_tables.pending_pool.iter()?.count()) + pub fn count_pending_certificates(&self) -> Result { + Ok(self + .pending_tables + .pending_pool + .property_int_value(ESTIMATE_NUM_KEYS)?) } /// Returns the number of certificates in the precedence pool - pub fn count_precedence_pool_certificates(&self) -> Result { - Ok(self.pending_tables.precedence_pool.iter()?.count()) + pub fn count_precedence_pool_certificates(&self) -> Result { + Ok(self + .pending_tables + .precedence_pool + .property_int_value(ESTIMATE_NUM_KEYS)?) } /// Try to return the [`PendingCertificateId`] for a [`CertificateId`] @@ -171,7 +219,8 @@ impl ValidatorStore { Ok(result) } - pub fn insert_pending_certificates( + #[cfg(test)] + pub(crate) fn insert_pending_certificates( &self, certificates: &[Certificate], ) -> Result, StorageError> { @@ -200,6 +249,8 @@ impl ValidatorStore { batch.write()?; + STORAGE_PENDING_POOL_COUNT.add(ids.len() as i64); + Ok(ids) } @@ -243,6 +294,7 @@ impl ValidatorStore { .pending_pool_index .insert(&certificate.id, &id)?; + STORAGE_PENDING_POOL_COUNT.inc(); debug!( "Certificate {} is now in the pending pool at index: {}", certificate.id, id @@ -252,6 +304,8 @@ impl ValidatorStore { self.pending_tables .precedence_pool .insert(&certificate.prev_id, certificate)?; + + STORAGE_PRECEDENCE_POOL_COUNT.inc(); debug!( "Certificate {} is now in the precedence pool, because the previous certificate \ {} isn't delivered yet", @@ -343,6 +397,7 @@ impl ValidatorStore { pub fn get_checkpoint_diff( &self, from: &[ProofOfDelivery], + limit_per_subnet: usize, ) -> Result>, StorageError> { // Parse the from in order to extract the different position per subnets let from_positions: HashMap = from @@ -374,7 +429,7 @@ impl ValidatorStore { .streams .prefix_iter(&(&subnet, &position.delivery_position.position))? .skip(1) - .take(100) + .take(limit_per_subnet) .map(|(_, v)| v) .collect() } else { @@ -382,7 +437,7 @@ impl ValidatorStore { .perpetual_tables .streams .prefix_iter(&(&subnet, Position::ZERO))? - .take(100) + .take(limit_per_subnet) .map(|(_, v)| v) .collect() }; @@ -423,6 +478,7 @@ impl ValidatorStore { .pending_pool_index .delete(&certificate.id)?; + STORAGE_PENDING_POOL_COUNT.dec(); Ok(certificate) } else { Err(StorageError::InternalStorage( @@ -434,7 +490,7 @@ impl ValidatorStore { } } impl ReadStore for ValidatorStore { - fn count_certificates_delivered(&self) -> Result { + fn count_certificates_delivered(&self) -> Result { self.fullnode_store.count_certificates_delivered() } @@ -519,6 +575,12 @@ impl WriteStore for ValidatorStore { .get(&certificate.certificate.id) { _ = self.pending_tables.pending_pool.delete(&pending_id); + _ = self + .pending_tables + .pending_pool_index + .delete(&certificate.certificate.id); + + STORAGE_PENDING_POOL_COUNT.dec(); } if let Ok(Some(next_certificate)) = self @@ -534,6 +596,9 @@ impl WriteStore for ValidatorStore { self.pending_tables .precedence_pool .delete(&certificate.certificate.id)?; + + STORAGE_PRECEDENCE_POOL_COUNT.dec(); + STORAGE_PENDING_POOL_COUNT.inc(); } Ok(position) diff --git a/crates/topos-tce-synchronizer/Cargo.toml b/crates/topos-tce-synchronizer/Cargo.toml index 5a1985379..257d56462 100644 --- a/crates/topos-tce-synchronizer/Cargo.toml +++ b/crates/topos-tce-synchronizer/Cargo.toml @@ -19,6 +19,7 @@ tracing.workspace = true uuid = { workspace = true, features = ["v4", "serde"] } topos-core = { workspace = true, features = ["api"] } +topos-config = { path = "../topos-config/" } topos-p2p = { path = "../topos-p2p" } topos-tce-gatekeeper = { path = "../topos-tce-gatekeeper/" } topos-tce-storage = { path = "../topos-tce-storage/" } diff --git a/crates/topos-tce-synchronizer/src/builder.rs b/crates/topos-tce-synchronizer/src/builder.rs index c0d2e5190..63b92b688 100644 --- a/crates/topos-tce-synchronizer/src/builder.rs +++ b/crates/topos-tce-synchronizer/src/builder.rs @@ -7,16 +7,15 @@ use topos_p2p::NetworkClient; use topos_tce_storage::validator::ValidatorStore; use crate::{ - checkpoints_collector::{ - CheckpointSynchronizer, CheckpointsCollectorConfig, CheckpointsCollectorError, - }, + checkpoints_collector::{CheckpointSynchronizer, CheckpointsCollectorError}, Synchronizer, SynchronizerError, SynchronizerEvent, }; +use topos_config::tce::synchronization::SynchronizationConfig; pub struct SynchronizerBuilder { network_client: Option, store: Option>, - sync_interval_seconds: u64, + config: SynchronizationConfig, /// Size of the channel producing events (default: 100) event_channel_size: usize, /// CancellationToken used to trigger shutdown of the Synchronizer @@ -28,7 +27,7 @@ impl Default for SynchronizerBuilder { Self { network_client: None, store: None, - sync_interval_seconds: 1, + config: SynchronizationConfig::default(), event_channel_size: 100, shutdown: None, } @@ -53,7 +52,7 @@ impl SynchronizerBuilder { spawn( CheckpointSynchronizer { - config: CheckpointsCollectorConfig::default(), + config: self.config, network: if let Some(network) = self.network_client { network } else { @@ -100,8 +99,8 @@ impl SynchronizerBuilder { self } - pub fn with_sync_interval_seconds(mut self, sync_interval_seconds: u64) -> Self { - self.sync_interval_seconds = sync_interval_seconds; + pub fn with_config(mut self, config: SynchronizationConfig) -> Self { + self.config = config; self } diff --git a/crates/topos-tce-synchronizer/src/checkpoints_collector/config.rs b/crates/topos-tce-synchronizer/src/checkpoints_collector/config.rs deleted file mode 100644 index 063dcb095..000000000 --- a/crates/topos-tce-synchronizer/src/checkpoints_collector/config.rs +++ /dev/null @@ -1,15 +0,0 @@ -pub struct CheckpointsCollectorConfig { - pub(crate) sync_interval_seconds: u64, -} - -impl CheckpointsCollectorConfig { - const SYNC_INTERVAL_SECONDS: u64 = 10; -} - -impl Default for CheckpointsCollectorConfig { - fn default() -> Self { - Self { - sync_interval_seconds: Self::SYNC_INTERVAL_SECONDS, - } - } -} diff --git a/crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs b/crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs index 99cb82ff6..f35ab9f9c 100644 --- a/crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs +++ b/crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs @@ -24,23 +24,22 @@ use topos_core::{ uci::{Certificate, CertificateId, SubnetId}, }; +use topos_config::tce::synchronization::SynchronizationConfig; use topos_p2p::{error::P2PError, NetworkClient, PeerId}; use topos_tce_storage::{errors::StorageError, store::ReadStore, validator::ValidatorStore}; use tracing::{debug, error, info, warn}; use uuid::Uuid; -mod config; mod error; #[cfg(test)] mod tests; -pub use config::CheckpointsCollectorConfig; pub use error::CheckpointsCollectorError; use crate::SynchronizerService; pub struct CheckpointSynchronizer { - pub(crate) config: CheckpointsCollectorConfig, + pub(crate) config: SynchronizationConfig, pub(crate) network: NetworkClient, #[allow(unused)] @@ -62,7 +61,7 @@ impl IntoFuture for CheckpointSynchronizer { fn into_future(mut self) -> Self::IntoFuture { async move { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs( - self.config.sync_interval_seconds, + self.config.interval_seconds, )); loop { @@ -156,6 +155,11 @@ impl CheckpointSynchronizer { let req = CheckpointRequest { request_id: Some(request_id.into()), checkpoint, + limit_per_subnet: self + .config + .limit_per_subnet + .try_into() + .unwrap_or(SynchronizationConfig::LIMIT_PER_SUBNET as u64), }; let mut client: SynchronizerServiceClient<_> = self diff --git a/crates/topos-tce-synchronizer/src/checkpoints_collector/tests.rs b/crates/topos-tce-synchronizer/src/checkpoints_collector/tests.rs index eb95f9a21..6e7925a90 100644 --- a/crates/topos-tce-synchronizer/src/checkpoints_collector/tests.rs +++ b/crates/topos-tce-synchronizer/src/checkpoints_collector/tests.rs @@ -30,6 +30,7 @@ fn encode() { let req = CheckpointRequest { request_id: Some(request_id), checkpoint: vec![], + limit_per_subnet: 100, }; let x: Vec = req.clone().into(); diff --git a/crates/topos-tce-synchronizer/src/lib.rs b/crates/topos-tce-synchronizer/src/lib.rs index 0f09aaf07..4b42ea5a7 100644 --- a/crates/topos-tce-synchronizer/src/lib.rs +++ b/crates/topos-tce-synchronizer/src/lib.rs @@ -1,4 +1,4 @@ -use std::{future::IntoFuture, sync::Arc}; +use std::{cmp::max, future::IntoFuture, sync::Arc}; use builder::SynchronizerBuilder; use checkpoints_collector::{CheckpointsCollectorError, CheckpointsCollectorEvent}; @@ -16,6 +16,7 @@ mod checkpoints_collector; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use tonic::{Request, Response, Status}; +use topos_config::tce::synchronization::SynchronizationConfig; use topos_core::{ api::grpc::{ shared::v1::positions::SourceStreamPosition, @@ -150,6 +151,14 @@ impl GrpcSynchronizerService for SynchronizerService { .unwrap_or(Uuid::new_v4()); debug!("Received request for checkpoint (request_id: {})", id); + let limit_per_subnet: usize = max( + request + .limit_per_subnet + .try_into() + .unwrap_or(SynchronizationConfig::LIMIT_PER_SUBNET), + SynchronizationConfig::LIMIT_PER_SUBNET, + ); + let res: Result, _> = request .checkpoint .into_iter() @@ -166,7 +175,10 @@ impl GrpcSynchronizerService for SynchronizerService { debug!("Request {} contains {} proof_of_delivery", id, res.len()); trace!("Request {} contains {:?}", id, res); - let diff = match self.validator_store.get_checkpoint_diff(&res) { + let diff = match self + .validator_store + .get_checkpoint_diff(&res, limit_per_subnet) + { Ok(diff) => { debug!( "Fetched checkpoint diff from storage for request {}, got {:?}", diff --git a/crates/topos-tce/src/lib.rs b/crates/topos-tce/src/lib.rs index 735389a01..1e7e96489 100644 --- a/crates/topos-tce/src/lib.rs +++ b/crates/topos-tce/src/lib.rs @@ -200,6 +200,7 @@ pub async fn run( let (synchronizer_runtime, synchronizer_stream) = topos_tce_synchronizer::Synchronizer::builder() + .with_config(config.synchronization.clone()) .with_shutdown(shutdown.0.child_token()) .with_store(validator_store.clone()) .with_network_client(network_client.clone())