Skip to content

Commit

Permalink
refactor: improve delivery timing (#466)
Browse files Browse the repository at this point in the history
  • Loading branch information
Freyskeyd authored Mar 12, 2024
1 parent d2ec941 commit 96e862f
Show file tree
Hide file tree
Showing 19 changed files with 454 additions and 64 deletions.
42 changes: 41 additions & 1 deletion crates/topos-core/src/api/graphql/certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ use super::{checkpoint::SourceStreamPosition, subnet::SubnetId};
#[derive(Serialize, Deserialize, Debug, NewType)]
pub struct CertificateId(String);

impl From<uci::CertificateId> for CertificateId {
fn from(value: uci::CertificateId) -> Self {
Self(value.to_string())
}
}

#[derive(Serialize, Deserialize, Debug, SimpleObject)]
#[serde(rename_all = "camelCase")]
pub struct CertificatePositions {
source: SourceStreamPosition,
}

/// A certificate that has been delivered
#[derive(Debug, Serialize, Deserialize, SimpleObject)]
#[serde(rename_all = "camelCase")]
pub struct Certificate {
Expand All @@ -30,6 +37,39 @@ pub struct Certificate {
pub positions: CertificatePositions,
}

/// A certificate that has not been delivered yet
#[derive(Debug, Serialize, Deserialize, SimpleObject)]
#[serde(rename_all = "camelCase")]
pub struct UndeliveredCertificate {
pub id: CertificateId,
pub prev_id: CertificateId,
pub proof: String,
pub signature: String,
pub source_subnet_id: SubnetId,
pub state_root: String,
pub target_subnets: Vec<SubnetId>,
pub tx_root_hash: String,
pub receipts_root_hash: String,
pub verifier: u32,
}

impl From<&uci::Certificate> for UndeliveredCertificate {
fn from(value: &crate::uci::Certificate) -> Self {
Self {
id: CertificateId(value.id.to_string()),
prev_id: CertificateId(value.prev_id.to_string()),
proof: hex::encode(&value.proof),
signature: hex::encode(&value.signature),
source_subnet_id: (&value.source_subnet_id).into(),
state_root: hex::encode(value.state_root),
target_subnets: value.target_subnets.iter().map(Into::into).collect(),
tx_root_hash: hex::encode(value.tx_root_hash),
receipts_root_hash: format!("0x{}", hex::encode(value.receipts_root_hash)),
verifier: value.verifier,
}
}
}

#[derive(Debug, Serialize, Deserialize, SimpleObject)]
pub struct Ready {
message: String,
Expand All @@ -52,7 +92,7 @@ impl From<&CertificateDelivered> for Certificate {
receipts_root_hash: format!("0x{}", hex::encode(uci_cert.receipts_root_hash)),
verifier: uci_cert.verifier,
positions: CertificatePositions {
source: (&value.proof_of_delivery.delivery_position).into(),
source: (&value.proof_of_delivery).into(),
},
}
}
Expand Down
12 changes: 7 additions & 5 deletions crates/topos-core/src/api/graphql/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_graphql::{InputObject, SimpleObject};
use serde::{Deserialize, Serialize};

use crate::types::stream::CertificateSourceStreamPosition;
use crate::types::ProofOfDelivery;

use super::{certificate::CertificateId, subnet::SubnetId};

Expand All @@ -17,13 +17,15 @@ pub struct SourceStreamPositionInput {
pub struct SourceStreamPosition {
pub source_subnet_id: SubnetId,
pub position: u64,
pub certificate_id: CertificateId,
}

impl From<&CertificateSourceStreamPosition> for SourceStreamPosition {
fn from(value: &CertificateSourceStreamPosition) -> Self {
impl From<&ProofOfDelivery> for SourceStreamPosition {
fn from(value: &ProofOfDelivery) -> Self {
Self {
source_subnet_id: (&value.subnet_id).into(),
position: *value.position,
certificate_id: value.certificate_id.into(),
source_subnet_id: (&value.delivery_position.subnet_id).into(),
position: *value.delivery_position.position,
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions crates/topos-tce-api/src/graphql/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use crate::{
},
runtime::InternalRuntimeCommand,
};
use topos_tce_storage::fullnode::FullNodeStore;
use topos_tce_storage::validator::ValidatorStore;

use super::query::SubscriptionRoot;

#[derive(Default)]
pub struct ServerBuilder {
store: Option<Arc<FullNodeStore>>,
store: Option<Arc<ValidatorStore>>,
serve_addr: Option<SocketAddr>,
runtime: Option<mpsc::Sender<InternalRuntimeCommand>>,
}
Expand All @@ -34,7 +34,7 @@ impl ServerBuilder {

self
}
pub(crate) fn store(mut self, store: Arc<FullNodeStore>) -> Self {
pub(crate) fn store(mut self, store: Arc<ValidatorStore>) -> Self {
self.store = Some(store);

self
Expand Down Expand Up @@ -62,13 +62,15 @@ impl ServerBuilder {
.take()
.expect("Cannot build GraphQL server without a FullNode store");

let fullnode_store = store.get_fullnode_store();
let runtime = self
.runtime
.take()
.expect("Cannot build GraphQL server without the internal runtime channel");

let schema: ServiceSchema = Schema::build(QueryRoot, EmptyMutation, SubscriptionRoot)
.data(store)
.data(fullnode_store)
.data(runtime)
.finish();

Expand Down
97 changes: 94 additions & 3 deletions crates/topos-tce-api/src/graphql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use async_graphql::{Context, EmptyMutation, Object, Schema, Subscription};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use tokio::sync::{mpsc, oneshot};
use topos_core::api::graphql::certificate::UndeliveredCertificate;
use topos_core::api::graphql::checkpoint::SourceStreamPosition;
use topos_core::api::graphql::errors::GraphQLServerError;
use topos_core::api::graphql::filter::SubnetFilter;
Expand All @@ -18,6 +19,7 @@ use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT};
use topos_tce_storage::fullnode::FullNodeStore;
use topos_tce_storage::store::ReadStore;

use topos_tce_storage::validator::ValidatorStore;
use tracing::debug;

use crate::runtime::InternalRuntimeCommand;
Expand Down Expand Up @@ -121,11 +123,58 @@ impl QueryRoot {
/// The values are estimated as having a precise count is costly.
async fn get_storage_pool_stats(
&self,
_ctx: &Context<'_>,
ctx: &Context<'_>,
) -> Result<HashMap<&str, i64>, GraphQLServerError> {
let mut stats = HashMap::new();
stats.insert("pending_pool", STORAGE_PENDING_POOL_COUNT.get());
stats.insert("precedence_pool", STORAGE_PRECEDENCE_POOL_COUNT.get());
stats.insert("metrics_pending_pool", STORAGE_PENDING_POOL_COUNT.get());
stats.insert(
"metrics_precedence_pool",
STORAGE_PRECEDENCE_POOL_COUNT.get(),
);

let store = ctx.data::<Arc<ValidatorStore>>().map_err(|_| {
tracing::error!("Failed to get store from context");

GraphQLServerError::ParseDataConnector
})?;

stats.insert(
"count_pending_certificates",
store
.iter_pending_pool()
.map_err(|_| GraphQLServerError::StorageError)?
.count()
.try_into()
.unwrap_or(i64::MAX),
);

stats.insert(
"count_precedence_certificates",
store
.iter_precedence_pool()
.map_err(|_| GraphQLServerError::StorageError)?
.count()
.try_into()
.unwrap_or(i64::MAX),
);

stats.insert(
"pending_pool_size",
store
.pending_pool_size()
.map_err(|_| GraphQLServerError::StorageError)?
.try_into()
.unwrap_or(i64::MAX),
);

stats.insert(
"precedence_pool_size",
store
.precedence_pool_size()
.map_err(|_| GraphQLServerError::StorageError)?
.try_into()
.unwrap_or(i64::MAX),
);

Ok(stats)
}
Expand All @@ -151,9 +200,51 @@ impl QueryRoot {
.map(|(subnet_id, head)| SourceStreamPosition {
source_subnet_id: subnet_id.into(),
position: *head.position,
certificate_id: head.certificate_id.into(),
})
.collect())
}

/// This endpoint is used to get the current pending pool.
/// It returns [`CertificateId`] and the [`PendingCertificateId`]
async fn get_pending_pool(
&self,
ctx: &Context<'_>,
) -> Result<HashMap<u64, CertificateId>, GraphQLServerError> {
let store = ctx.data::<Arc<ValidatorStore>>().map_err(|_| {
tracing::error!("Failed to get store from context");

GraphQLServerError::ParseDataConnector
})?;

Ok(store
.iter_pending_pool()
.map_err(|_| GraphQLServerError::StorageError)?
.map(|(id, certificate)| (id, certificate.id.into()))
.collect())
}

/// This endpoint is used to check if a certificate has any child certificate in the precedence pool.
async fn check_precedence(
&self,
ctx: &Context<'_>,
certificate_id: CertificateId,
) -> Result<Option<UndeliveredCertificate>, GraphQLServerError> {
let store = ctx.data::<Arc<ValidatorStore>>().map_err(|_| {
tracing::error!("Failed to get store from context");

GraphQLServerError::ParseDataConnector
})?;

store
.check_precedence(
&certificate_id
.try_into()
.map_err(|_| GraphQLServerError::ParseCertificateId)?,
)
.map_err(|_| GraphQLServerError::StorageError)
.map(|certificate| certificate.as_ref().map(Into::into))
}
}

pub struct SubscriptionRoot;
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-api/src/graphql/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ async fn open_watch_certificate_delivered() {
source {
sourceSubnetId
position
certificateId
}
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/topos-tce-api/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ impl RuntimeBuilder {
.store(
self.store
.take()
.map(|store| store.get_fullnode_store())
.expect("Unable to build GraphQL Server, Store is missing"),
)
.runtime(internal_runtime_command_sender.clone())
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-tce-api/src/runtime/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ pub enum RuntimeError {

#[error("Unexpected store error: {0}")]
Store(#[from] StorageError),

#[error("Communication error: {0}")]
CommunicationError(String),
}
Loading

0 comments on commit 96e862f

Please sign in to comment.