From 96e862f5b886a38a5c67590d1e152ab9894d6f15 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Tue, 12 Mar 2024 19:20:09 +0100 Subject: [PATCH] refactor: improve delivery timing (#466) --- .../topos-core/src/api/graphql/certificate.rs | 42 +++- .../topos-core/src/api/graphql/checkpoint.rs | 12 +- crates/topos-tce-api/src/graphql/builder.rs | 8 +- crates/topos-tce-api/src/graphql/query.rs | 97 +++++++++- crates/topos-tce-api/src/graphql/tests.rs | 1 + crates/topos-tce-api/src/runtime/builder.rs | 1 - crates/topos-tce-api/src/runtime/error.rs | 3 + crates/topos-tce-api/tests/runtime.rs | 180 +++++++++++++++++- .../src/double_echo/mod.rs | 7 +- crates/topos-tce-broadcast/src/lib.rs | 1 + .../src/task_manager/mod.rs | 18 +- .../src/tests/task_manager.rs | 3 + crates/topos-tce-storage/src/client.rs | 2 +- crates/topos-tce-storage/src/tests/mod.rs | 4 +- crates/topos-tce-storage/src/validator/mod.rs | 77 +++++--- crates/topos-tce/src/app_context.rs | 4 +- crates/topos-tce/src/app_context/api.rs | 30 ++- crates/topos-tce/src/app_context/network.rs | 24 ++- crates/topos-tce/src/lib.rs | 4 +- 19 files changed, 454 insertions(+), 64 deletions(-) diff --git a/crates/topos-core/src/api/graphql/certificate.rs b/crates/topos-core/src/api/graphql/certificate.rs index 5d83cdcfb..3de5d66dc 100644 --- a/crates/topos-core/src/api/graphql/certificate.rs +++ b/crates/topos-core/src/api/graphql/certificate.rs @@ -8,12 +8,19 @@ use super::{checkpoint::SourceStreamPosition, subnet::SubnetId}; #[derive(Serialize, Deserialize, Debug, NewType)] pub struct CertificateId(String); +impl From for CertificateId { + fn from(value: uci::CertificateId) -> Self { + Self(value.to_string()) + } +} + #[derive(Serialize, Deserialize, Debug, SimpleObject)] #[serde(rename_all = "camelCase")] pub struct CertificatePositions { source: SourceStreamPosition, } +/// A certificate that has been delivered #[derive(Debug, Serialize, Deserialize, SimpleObject)] #[serde(rename_all = "camelCase")] pub struct Certificate { @@ -30,6 +37,39 @@ pub struct Certificate { pub positions: CertificatePositions, } +/// A certificate that has not been delivered yet +#[derive(Debug, Serialize, Deserialize, SimpleObject)] +#[serde(rename_all = "camelCase")] +pub struct UndeliveredCertificate { + pub id: CertificateId, + pub prev_id: CertificateId, + pub proof: String, + pub signature: String, + pub source_subnet_id: SubnetId, + pub state_root: String, + pub target_subnets: Vec, + pub tx_root_hash: String, + pub receipts_root_hash: String, + pub verifier: u32, +} + +impl From<&uci::Certificate> for UndeliveredCertificate { + fn from(value: &crate::uci::Certificate) -> Self { + Self { + id: CertificateId(value.id.to_string()), + prev_id: CertificateId(value.prev_id.to_string()), + proof: hex::encode(&value.proof), + signature: hex::encode(&value.signature), + source_subnet_id: (&value.source_subnet_id).into(), + state_root: hex::encode(value.state_root), + target_subnets: value.target_subnets.iter().map(Into::into).collect(), + tx_root_hash: hex::encode(value.tx_root_hash), + receipts_root_hash: format!("0x{}", hex::encode(value.receipts_root_hash)), + verifier: value.verifier, + } + } +} + #[derive(Debug, Serialize, Deserialize, SimpleObject)] pub struct Ready { message: String, @@ -52,7 +92,7 @@ impl From<&CertificateDelivered> for Certificate { receipts_root_hash: format!("0x{}", hex::encode(uci_cert.receipts_root_hash)), verifier: uci_cert.verifier, positions: CertificatePositions { - source: (&value.proof_of_delivery.delivery_position).into(), + source: (&value.proof_of_delivery).into(), }, } } diff --git a/crates/topos-core/src/api/graphql/checkpoint.rs b/crates/topos-core/src/api/graphql/checkpoint.rs index 95d845d1c..7ac50de5e 100644 --- a/crates/topos-core/src/api/graphql/checkpoint.rs +++ b/crates/topos-core/src/api/graphql/checkpoint.rs @@ -1,7 +1,7 @@ use async_graphql::{InputObject, SimpleObject}; use serde::{Deserialize, Serialize}; -use crate::types::stream::CertificateSourceStreamPosition; +use crate::types::ProofOfDelivery; use super::{certificate::CertificateId, subnet::SubnetId}; @@ -17,13 +17,15 @@ pub struct SourceStreamPositionInput { pub struct SourceStreamPosition { pub source_subnet_id: SubnetId, pub position: u64, + pub certificate_id: CertificateId, } -impl From<&CertificateSourceStreamPosition> for SourceStreamPosition { - fn from(value: &CertificateSourceStreamPosition) -> Self { +impl From<&ProofOfDelivery> for SourceStreamPosition { + fn from(value: &ProofOfDelivery) -> Self { Self { - source_subnet_id: (&value.subnet_id).into(), - position: *value.position, + certificate_id: value.certificate_id.into(), + source_subnet_id: (&value.delivery_position.subnet_id).into(), + position: *value.delivery_position.position, } } } diff --git a/crates/topos-tce-api/src/graphql/builder.rs b/crates/topos-tce-api/src/graphql/builder.rs index 834b08b6b..061dfb779 100644 --- a/crates/topos-tce-api/src/graphql/builder.rs +++ b/crates/topos-tce-api/src/graphql/builder.rs @@ -14,13 +14,13 @@ use crate::{ }, runtime::InternalRuntimeCommand, }; -use topos_tce_storage::fullnode::FullNodeStore; +use topos_tce_storage::validator::ValidatorStore; use super::query::SubscriptionRoot; #[derive(Default)] pub struct ServerBuilder { - store: Option>, + store: Option>, serve_addr: Option, runtime: Option>, } @@ -34,7 +34,7 @@ impl ServerBuilder { self } - pub(crate) fn store(mut self, store: Arc) -> Self { + pub(crate) fn store(mut self, store: Arc) -> Self { self.store = Some(store); self @@ -62,6 +62,7 @@ impl ServerBuilder { .take() .expect("Cannot build GraphQL server without a FullNode store"); + let fullnode_store = store.get_fullnode_store(); let runtime = self .runtime .take() @@ -69,6 +70,7 @@ impl ServerBuilder { let schema: ServiceSchema = Schema::build(QueryRoot, EmptyMutation, SubscriptionRoot) .data(store) + .data(fullnode_store) .data(runtime) .finish(); diff --git a/crates/topos-tce-api/src/graphql/query.rs b/crates/topos-tce-api/src/graphql/query.rs index 4c2bc8751..c7132bde0 100644 --- a/crates/topos-tce-api/src/graphql/query.rs +++ b/crates/topos-tce-api/src/graphql/query.rs @@ -5,6 +5,7 @@ use async_graphql::{Context, EmptyMutation, Object, Schema, Subscription}; use async_trait::async_trait; use futures::{Stream, StreamExt}; use tokio::sync::{mpsc, oneshot}; +use topos_core::api::graphql::certificate::UndeliveredCertificate; use topos_core::api::graphql::checkpoint::SourceStreamPosition; use topos_core::api::graphql::errors::GraphQLServerError; use topos_core::api::graphql::filter::SubnetFilter; @@ -18,6 +19,7 @@ use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT}; use topos_tce_storage::fullnode::FullNodeStore; use topos_tce_storage::store::ReadStore; +use topos_tce_storage::validator::ValidatorStore; use tracing::debug; use crate::runtime::InternalRuntimeCommand; @@ -121,11 +123,58 @@ impl QueryRoot { /// The values are estimated as having a precise count is costly. async fn get_storage_pool_stats( &self, - _ctx: &Context<'_>, + ctx: &Context<'_>, ) -> Result, GraphQLServerError> { let mut stats = HashMap::new(); - stats.insert("pending_pool", STORAGE_PENDING_POOL_COUNT.get()); - stats.insert("precedence_pool", STORAGE_PRECEDENCE_POOL_COUNT.get()); + stats.insert("metrics_pending_pool", STORAGE_PENDING_POOL_COUNT.get()); + stats.insert( + "metrics_precedence_pool", + STORAGE_PRECEDENCE_POOL_COUNT.get(), + ); + + let store = ctx.data::>().map_err(|_| { + tracing::error!("Failed to get store from context"); + + GraphQLServerError::ParseDataConnector + })?; + + stats.insert( + "count_pending_certificates", + store + .iter_pending_pool() + .map_err(|_| GraphQLServerError::StorageError)? + .count() + .try_into() + .unwrap_or(i64::MAX), + ); + + stats.insert( + "count_precedence_certificates", + store + .iter_precedence_pool() + .map_err(|_| GraphQLServerError::StorageError)? + .count() + .try_into() + .unwrap_or(i64::MAX), + ); + + stats.insert( + "pending_pool_size", + store + .pending_pool_size() + .map_err(|_| GraphQLServerError::StorageError)? + .try_into() + .unwrap_or(i64::MAX), + ); + + stats.insert( + "precedence_pool_size", + store + .precedence_pool_size() + .map_err(|_| GraphQLServerError::StorageError)? + .try_into() + .unwrap_or(i64::MAX), + ); Ok(stats) } @@ -151,9 +200,51 @@ impl QueryRoot { .map(|(subnet_id, head)| SourceStreamPosition { source_subnet_id: subnet_id.into(), position: *head.position, + certificate_id: head.certificate_id.into(), }) .collect()) } + + /// This endpoint is used to get the current pending pool. + /// It returns [`CertificateId`] and the [`PendingCertificateId`] + async fn get_pending_pool( + &self, + ctx: &Context<'_>, + ) -> Result, GraphQLServerError> { + let store = ctx.data::>().map_err(|_| { + tracing::error!("Failed to get store from context"); + + GraphQLServerError::ParseDataConnector + })?; + + Ok(store + .iter_pending_pool() + .map_err(|_| GraphQLServerError::StorageError)? + .map(|(id, certificate)| (id, certificate.id.into())) + .collect()) + } + + /// This endpoint is used to check if a certificate has any child certificate in the precedence pool. + async fn check_precedence( + &self, + ctx: &Context<'_>, + certificate_id: CertificateId, + ) -> Result, GraphQLServerError> { + let store = ctx.data::>().map_err(|_| { + tracing::error!("Failed to get store from context"); + + GraphQLServerError::ParseDataConnector + })?; + + store + .check_precedence( + &certificate_id + .try_into() + .map_err(|_| GraphQLServerError::ParseCertificateId)?, + ) + .map_err(|_| GraphQLServerError::StorageError) + .map(|certificate| certificate.as_ref().map(Into::into)) + } } pub struct SubscriptionRoot; diff --git a/crates/topos-tce-api/src/graphql/tests.rs b/crates/topos-tce-api/src/graphql/tests.rs index fc249d5d5..0114f92cd 100644 --- a/crates/topos-tce-api/src/graphql/tests.rs +++ b/crates/topos-tce-api/src/graphql/tests.rs @@ -131,6 +131,7 @@ async fn open_watch_certificate_delivered() { source { sourceSubnetId position + certificateId } } } diff --git a/crates/topos-tce-api/src/runtime/builder.rs b/crates/topos-tce-api/src/runtime/builder.rs index be7563ffd..3353ea204 100644 --- a/crates/topos-tce-api/src/runtime/builder.rs +++ b/crates/topos-tce-api/src/runtime/builder.rs @@ -118,7 +118,6 @@ impl RuntimeBuilder { .store( self.store .take() - .map(|store| store.get_fullnode_store()) .expect("Unable to build GraphQL Server, Store is missing"), ) .runtime(internal_runtime_command_sender.clone()) diff --git a/crates/topos-tce-api/src/runtime/error.rs b/crates/topos-tce-api/src/runtime/error.rs index a5ee2d20f..e36a724f7 100644 --- a/crates/topos-tce-api/src/runtime/error.rs +++ b/crates/topos-tce-api/src/runtime/error.rs @@ -16,4 +16,7 @@ pub enum RuntimeError { #[error("Unexpected store error: {0}")] Store(#[from] StorageError), + + #[error("Communication error: {0}")] + CommunicationError(String), } diff --git a/crates/topos-tce-api/tests/runtime.rs b/crates/topos-tce-api/tests/runtime.rs index cb7ceeab2..3110f4d8b 100644 --- a/crates/topos-tce-api/tests/runtime.rs +++ b/crates/topos-tce-api/tests/runtime.rs @@ -1,6 +1,8 @@ use futures::Stream; use rstest::rstest; use serde::Deserialize; +use std::collections::HashMap; +use std::sync::Arc; use std::time::Duration; use test_log::test; use tokio::sync::{broadcast, mpsc}; @@ -13,6 +15,7 @@ use topos_core::api::grpc::shared::v1::checkpoints::TargetCheckpoint; use topos_core::api::grpc::shared::v1::positions::TargetStreamPosition; use topos_core::types::stream::Position; use topos_core::types::CertificateDelivered; +use topos_core::uci::CertificateId; use topos_core::{ api::grpc::tce::v1::{ api_service_client::ApiServiceClient, @@ -24,6 +27,7 @@ use topos_core::{ use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT}; use topos_tce_api::{Runtime, RuntimeEvent}; use topos_tce_storage::types::CertificateDeliveredWithPositions; +use topos_tce_storage::validator::ValidatorStore; use topos_tce_storage::StorageClient; use topos_test_sdk::certificates::{ create_certificate, create_certificate_at_position, create_certificate_chain, @@ -583,6 +587,7 @@ async fn can_query_graphql_endpoint_for_certificates( source {{ sourceSubnetId position + certificateId }} }} }} @@ -670,8 +675,8 @@ async fn check_storage_pool_stats( #[derive(Debug, Deserialize)] struct PoolStats { - pending_pool: u64, - precedence_pool: u64, + metrics_pending_pool: u64, + metrics_precedence_pool: u64, } let client = reqwest::Client::new(); @@ -688,6 +693,173 @@ async fn check_storage_pool_stats( .await .unwrap(); - assert_eq!(response.data.get_storage_pool_stats.pending_pool, 10); - assert_eq!(response.data.get_storage_pool_stats.precedence_pool, 200); + assert_eq!( + response.data.get_storage_pool_stats.metrics_pending_pool, + 10 + ); + assert_eq!( + response.data.get_storage_pool_stats.metrics_precedence_pool, + 200 + ); +} + +#[rstest] +#[timeout(Duration::from_secs(4))] +#[test(tokio::test)] +async fn get_pending_pool( + broadcast_stream: broadcast::Receiver, +) { + let addr = get_available_addr(); + let graphql_addr = get_available_addr(); + let metrics_addr = get_available_addr(); + + // launch data store + let certificates = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 15); + + let fullnode_store = create_fullnode_store::default().await; + + let store: Arc = + create_validator_store(&[], futures::future::ready(fullnode_store.clone())).await; + + for certificate in &certificates { + _ = store.insert_pending_certificate(&certificate.certificate); + } + + let storage_client = StorageClient::new(store.clone()); + + let (_runtime_client, _launcher, _ctx) = Runtime::builder() + .with_broadcast_stream(broadcast_stream) + .storage(storage_client) + .store(store) + .serve_grpc_addr(addr) + .serve_graphql_addr(graphql_addr) + .serve_metrics_addr(metrics_addr) + .build_and_launch() + .await; + + // Wait for server to boot + tokio::time::sleep(Duration::from_millis(100)).await; + + let query = "query { getPendingPool }".to_string(); + + #[derive(Debug, Deserialize)] + struct Response { + data: PendingPool, + } + + #[derive(Debug, Deserialize)] + #[serde(rename_all = "camelCase")] + struct PendingPool { + #[serde(rename = "getPendingPool")] + pool: HashMap, + } + + let client = reqwest::Client::new(); + + let mut response = client + .post(format!("http://{}", graphql_addr)) + .json(&serde_json::json!({ + "query": query, + })) + .send() + .await + .unwrap() + .json::() + .await + .unwrap(); + + assert_eq!(response.data.pool.len(), 1); + let first: CertificateId = response + .data + .pool + .remove(&1) + .unwrap() + .as_bytes() + .try_into() + .unwrap(); + + assert_eq!(first, certificates[0].certificate.id); +} + +#[rstest] +#[timeout(Duration::from_secs(4))] +#[test(tokio::test)] +async fn check_precedence( + broadcast_stream: broadcast::Receiver, +) { + let addr = get_available_addr(); + let graphql_addr = get_available_addr(); + let metrics_addr = get_available_addr(); + + // launch data store + let certificates = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 15); + + let fullnode_store = create_fullnode_store::default().await; + + let store: Arc = + create_validator_store(&[], futures::future::ready(fullnode_store.clone())).await; + + for certificate in &certificates { + _ = store.insert_pending_certificate(&certificate.certificate); + } + + let storage_client = StorageClient::new(store.clone()); + + let (_runtime_client, _launcher, _ctx) = Runtime::builder() + .with_broadcast_stream(broadcast_stream) + .storage(storage_client) + .store(store) + .serve_grpc_addr(addr) + .serve_graphql_addr(graphql_addr) + .serve_metrics_addr(metrics_addr) + .build_and_launch() + .await; + + // Wait for server to boot + tokio::time::sleep(Duration::from_millis(100)).await; + + let certificate_one = certificates[0].certificate.id; + + let query = format!( + r#" + query {{ checkPrecedence(certificateId: "{}") {{ id }} }} + "#, + certificate_one + ); + + #[derive(Debug, Deserialize)] + struct Response { + data: CheckPrecedenceResponse, + } + + #[derive(Debug, Deserialize)] + #[serde(rename_all = "camelCase")] + struct CheckPrecedenceResponse { + check_precedence: CheckPrecedence, + } + + #[derive(Debug, Deserialize)] + #[serde(rename_all = "camelCase")] + struct CheckPrecedence { + id: String, + } + + let client = reqwest::Client::new(); + + let response = client + .post(format!("http://{}", graphql_addr)) + .json(&serde_json::json!({ + "query": query, + })) + .send() + .await + .unwrap() + .json::() + .await + .unwrap(); + + assert_eq!( + TryInto::::try_into(response.data.check_precedence.id.as_bytes()).unwrap(), + certificates[1].certificate.id + ); } diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index b3ac469bf..3b9e0274f 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -137,6 +137,12 @@ impl DoubleEcho { command if self.subscriptions.is_some() => { match command { + DoubleEchoCommand::Broadcast { cert, need_gossip, pending_id } => { + _ = self + .task_manager_message_sender + .send(DoubleEchoCommand::Broadcast { need_gossip, cert, pending_id }) + .await; + } DoubleEchoCommand::Echo { certificate_id, validator_id, signature } => { // Check if source is part of known_validators if !self.validators.contains(&validator_id) { @@ -173,7 +179,6 @@ impl DoubleEcho { self.handle_ready(certificate_id, validator_id, signature).await }, - _ => {} } }, diff --git a/crates/topos-tce-broadcast/src/lib.rs b/crates/topos-tce-broadcast/src/lib.rs index 9e407c178..0c9604959 100644 --- a/crates/topos-tce-broadcast/src/lib.rs +++ b/crates/topos-tce-broadcast/src/lib.rs @@ -88,6 +88,7 @@ pub enum DoubleEchoCommand { Broadcast { need_gossip: bool, cert: Certificate, + pending_id: u64, }, /// When echo reply received diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index edf569f52..d25e3525d 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -96,11 +96,7 @@ impl TaskManager { Ok(pendings) => { debug!("Received {} pending certificates", pendings.len()); for (pending_id, certificate) in pendings { - debug!( - "Creating task for pending certificate {} at position {} if needed", - certificate.id, pending_id - ); - self.create_task(&certificate, true); + self.create_task(&certificate, true, pending_id); self.latest_pending_id = pending_id; } } @@ -132,10 +128,10 @@ impl TaskManager { .push(msg); }; } - DoubleEchoCommand::Broadcast { ref cert, need_gossip } => { + DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { trace!("Received broadcast message for certificate {} ", cert.id); - self.create_task(cert, need_gossip) + self.create_task(cert, need_gossip, pending_id) } } } @@ -202,7 +198,7 @@ impl TaskManager { /// Create a new task for the given certificate and add it to the running tasks. /// If the previous certificate is not available yet, the task will be created but not started. /// This method is called when a pending certificate is fetched from the storage. - fn create_task(&mut self, cert: &Certificate, need_gossip: bool) { + fn create_task(&mut self, cert: &Certificate, need_gossip: bool, pending_id: u64) { match self.tasks.entry(cert.id) { std::collections::hash_map::Entry::Vacant(entry) => { let broadcast_state = BroadcastState::new( @@ -242,10 +238,14 @@ impl TaskManager { cert.id, cert.prev_id ); } + debug!( + "Creating task for pending certificate {} at position {} if needed", + cert.id, pending_id + ); entry.insert(task_context); } std::collections::hash_map::Entry::Occupied(_) => { - debug!( + trace!( "Received broadcast message for certificate {} but it is already being \ processed", cert.id diff --git a/crates/topos-tce-broadcast/src/tests/task_manager.rs b/crates/topos-tce-broadcast/src/tests/task_manager.rs index 4fd9ac96a..4c6442616 100644 --- a/crates/topos-tce-broadcast/src/tests/task_manager.rs +++ b/crates/topos-tce-broadcast/src/tests/task_manager.rs @@ -65,6 +65,7 @@ async fn can_start(#[future] create_validator_store: Arc) { .send(crate::DoubleEchoCommand::Broadcast { need_gossip: false, cert: child.certificate.clone(), + pending_id: 0, }) .await; @@ -72,6 +73,7 @@ async fn can_start(#[future] create_validator_store: Arc) { .send(crate::DoubleEchoCommand::Broadcast { need_gossip: false, cert: parent.certificate.clone(), + pending_id: 0, }) .await; @@ -79,6 +81,7 @@ async fn can_start(#[future] create_validator_store: Arc) { .send(crate::DoubleEchoCommand::Broadcast { need_gossip: false, cert: parent.certificate.clone(), + pending_id: 0, }) .await; diff --git a/crates/topos-tce-storage/src/client.rs b/crates/topos-tce-storage/src/client.rs index b7babdf99..770792a1e 100644 --- a/crates/topos-tce-storage/src/client.rs +++ b/crates/topos-tce-storage/src/client.rs @@ -35,7 +35,7 @@ impl StorageClient { pub async fn get_pending_certificates( &self, ) -> Result, StorageError> { - self.store.get_pending_certificates() + Ok(self.store.iter_pending_pool()?.collect()) } pub async fn fetch_certificates( diff --git a/crates/topos-tce-storage/src/tests/mod.rs b/crates/topos-tce-storage/src/tests/mod.rs index 4e2fc1384..94dd99337 100644 --- a/crates/topos-tce-storage/src/tests/mod.rs +++ b/crates/topos-tce-storage/src/tests/mod.rs @@ -640,7 +640,7 @@ async fn get_pending_certificates(store: Arc) { ) .unwrap(); - let pending_certificates = store.get_pending_certificates().unwrap(); + let pending_certificates = store.iter_pending_pool().unwrap().collect::>(); assert_eq!( expected_pending_certificates.len(), pending_certificates.len() @@ -654,7 +654,7 @@ async fn get_pending_certificates(store: Arc) { let cert_to_remove = expected_pending_certificates.remove(8); store.delete_pending_certificate(&cert_to_remove.0).unwrap(); - let pending_certificates = store.get_pending_certificates().unwrap(); + let pending_certificates = store.iter_pending_pool().unwrap().collect::>(); assert_eq!( expected_pending_certificates.len(), pending_certificates.len() diff --git a/crates/topos-tce-storage/src/validator/mod.rs b/crates/topos-tce-storage/src/validator/mod.rs index 272a17c9d..33fccb112 100644 --- a/crates/topos-tce-storage/src/validator/mod.rs +++ b/crates/topos-tce-storage/src/validator/mod.rs @@ -98,26 +98,19 @@ impl ValidatorStore { None::<&[u8]>, ); - let pending_count: i64 = - store - .count_pending_certificates()? - .try_into() - .map_err(|error| { - error!("Failed to convert estimate-num-keys to i64: {}", error); - StorageError::InternalStorage(InternalStorageError::UnexpectedDBState( - "Failed to convert estimate-num-keys to i64", - )) - })?; - - let precedence_count: i64 = store - .count_precedence_pool_certificates()? - .try_into() - .map_err(|error| { - error!("Failed to convert estimate-num-keys to i64: {}", error); - StorageError::InternalStorage(InternalStorageError::UnexpectedDBState( - "Failed to convert estimate-num-keys to i64", - )) - })?; + let pending_count: i64 = store.pending_pool_size()?.try_into().map_err(|error| { + error!("Failed to convert estimate-num-keys to i64: {}", error); + StorageError::InternalStorage(InternalStorageError::UnexpectedDBState( + "Failed to convert estimate-num-keys to i64", + )) + })?; + + let precedence_count: i64 = store.precedence_pool_size()?.try_into().map_err(|error| { + error!("Failed to convert estimate-num-keys to i64: {}", error); + StorageError::InternalStorage(InternalStorageError::UnexpectedDBState( + "Failed to convert estimate-num-keys to i64", + )) + })?; STORAGE_PENDING_POOL_COUNT.set(pending_count); STORAGE_PRECEDENCE_POOL_COUNT.set(precedence_count); @@ -131,7 +124,7 @@ impl ValidatorStore { } /// Returns the number of certificates in the pending pool - pub fn count_pending_certificates(&self) -> Result { + pub fn pending_pool_size(&self) -> Result { Ok(self .pending_tables .pending_pool @@ -139,7 +132,7 @@ impl ValidatorStore { } /// Returns the number of certificates in the precedence pool - pub fn count_precedence_pool_certificates(&self) -> Result { + pub fn precedence_pool_size(&self) -> Result { Ok(self .pending_tables .precedence_pool @@ -166,11 +159,35 @@ impl ValidatorStore { Ok(self.pending_tables.pending_pool.get(pending_id)?) } - /// Returns the entire pending_pool - pub fn get_pending_certificates( + /// Returns an iterator over the pending pool + /// + /// Note: this can be slow on large datasets. + #[doc(hidden)] + pub fn iter_pending_pool( &self, - ) -> Result, StorageError> { - Ok(self.pending_tables.pending_pool.iter()?.collect()) + ) -> Result + '_, StorageError> { + Ok(self.pending_tables.pending_pool.iter()?) + } + + /// Returns an iterator over the pending pool starting at a given `PendingCertificateId` + /// + /// Note: this can be slow on large datasets. + #[doc(hidden)] + pub fn iter_pending_pool_at( + &self, + pending_id: &PendingCertificateId, + ) -> Result + '_, StorageError> { + Ok(self.pending_tables.pending_pool.iter_at(pending_id)?) + } + + /// Returns an iterator over the precedence pool + /// + /// Note: this can be slow on large datasets. + #[doc(hidden)] + pub fn iter_precedence_pool( + &self, + ) -> Result + '_, StorageError> { + Ok(self.pending_tables.precedence_pool.iter()?) } pub fn get_next_pending_certificates( @@ -190,6 +207,14 @@ impl ValidatorStore { .collect()) } + /// Returns the [Certificate] (if any) that is currently in the precedence pool for the given [CertificateId] + pub fn check_precedence( + &self, + certificate_id: &CertificateId, + ) -> Result, StorageError> { + Ok(self.pending_tables.precedence_pool.get(certificate_id)?) + } + // TODO: Performance issue on this one as we iter over all the pending certificates // We need to improve how we request the pending certificates. pub fn get_pending_certificates_for_subnets( diff --git a/crates/topos-tce/src/app_context.rs b/crates/topos-tce/src/app_context.rs index deb282c38..fedc50b66 100644 --- a/crates/topos-tce/src/app_context.rs +++ b/crates/topos-tce/src/app_context.rs @@ -162,13 +162,13 @@ impl AppContext { let pending_certificates = self .validator_store - .count_pending_certificates() + .pending_pool_size() .map_err(|error| format!("Unable to count pending certificates: {error}")) .unwrap(); let precedence_pool_certificates = self .validator_store - .count_precedence_pool_certificates() + .precedence_pool_size() .map_err(|error| format!("Unable to count precedence pool certificates: {error}")) .unwrap(); diff --git a/crates/topos-tce/src/app_context/api.rs b/crates/topos-tce/src/app_context/api.rs index a645a3822..61b09c939 100644 --- a/crates/topos-tce/src/app_context/api.rs +++ b/crates/topos-tce/src/app_context/api.rs @@ -4,6 +4,7 @@ use topos_core::uci::{Certificate, SubnetId}; use topos_metrics::CERTIFICATE_DELIVERY_LATENCY; use topos_tce_api::RuntimeError; use topos_tce_api::RuntimeEvent as ApiEvent; +use topos_tce_broadcast::DoubleEchoCommand; use topos_tce_storage::errors::{InternalStorageError, StorageError}; use topos_tce_storage::types::PendingResult; use tracing::debug; @@ -24,12 +25,37 @@ impl AppContext { .insert_pending_certificate(&certificate) { Ok(Some(pending_id)) => { + let certificate_id = certificate.id; debug!( "Certificate {} from subnet {} has been inserted into pending pool", - certificate.id, certificate.source_subnet_id + certificate_id, certificate.source_subnet_id ); - sender.send(Ok(PendingResult::InPending(pending_id))) + if self + .tce_cli + .get_double_echo_channel() + .send(DoubleEchoCommand::Broadcast { + need_gossip: true, + cert: *certificate, + pending_id, + }) + .await + .is_err() + { + error!( + "Unable to send DoubleEchoCommand::Broadcast command to double \ + echo for {}", + certificate_id + ); + + sender.send(Err(RuntimeError::CommunicationError( + "Unable to send DoubleEchoCommand::Broadcast command to double \ + echo" + .to_string(), + ))) + } else { + sender.send(Ok(PendingResult::InPending(pending_id))) + } } Ok(None) => { debug!( diff --git a/crates/topos-tce/src/app_context/network.rs b/crates/topos-tce/src/app_context/network.rs index 61732cbdd..cd44da79d 100644 --- a/crates/topos-tce/src/app_context/network.rs +++ b/crates/topos-tce/src/app_context/network.rs @@ -43,12 +43,32 @@ impl AppContext { ); match self.validator_store.insert_pending_certificate(&cert) { - Ok(Some(_)) => { + Ok(Some(pending_id)) => { + let certificate_id = cert.id; debug!( "Certificate {} has been inserted into pending pool", - cert.id + certificate_id ); + + if self + .tce_cli + .get_double_echo_channel() + .send(DoubleEchoCommand::Broadcast { + need_gossip: false, + cert, + pending_id, + }) + .await + .is_err() + { + error!( + "Unable to send DoubleEchoCommand::Broadcast command \ + to double echo for {}", + certificate_id + ); + } } + Ok(None) => { debug!( "Certificate {} from subnet {} has been inserted into \ diff --git a/crates/topos-tce/src/lib.rs b/crates/topos-tce/src/lib.rs index 3062e983d..89ae8e4f1 100644 --- a/crates/topos-tce/src/lib.rs +++ b/crates/topos-tce/src/lib.rs @@ -116,11 +116,11 @@ pub async fn run( .map_err(|error| format!("Unable to count certificates delivered: {error}"))?; let pending_certificates = validator_store - .count_pending_certificates() + .pending_pool_size() .map_err(|error| format!("Unable to count pending certificates: {error}"))?; let precedence_pool_certificates = validator_store - .count_precedence_pool_certificates() + .precedence_pool_size() .map_err(|error| format!("Unable to count precedence pool certificates: {error}"))?; info!(