Skip to content

Commit

Permalink
feat: adding positions to certificate (#440)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <simon.paitrault@gmail.com>
  • Loading branch information
Freyskeyd authored Jan 25, 2024
1 parent 4b0ec9b commit 5315710
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 89 deletions.
26 changes: 22 additions & 4 deletions crates/topos-core/src/api/graphql/certificate.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use async_graphql::{NewType, SimpleObject};
use serde::{Deserialize, Serialize};

use crate::uci;
use crate::{types::CertificateDelivered, uci};

use super::subnet::SubnetId;
use super::{checkpoint::SourceStreamPosition, subnet::SubnetId};

#[derive(Serialize, Deserialize, Debug, NewType)]
pub struct CertificateId(String);

#[derive(Serialize, Deserialize, Debug, SimpleObject)]
#[serde(rename_all = "camelCase")]
pub struct CertificatePositions {
source: SourceStreamPosition,
}

#[derive(Debug, Serialize, Deserialize, SimpleObject)]
#[serde(rename_all = "camelCase")]
pub struct Certificate {
pub id: CertificateId,
pub prev_id: CertificateId,
Expand All @@ -21,10 +27,19 @@ pub struct Certificate {
pub tx_root_hash: String,
pub receipts_root_hash: String,
pub verifier: u32,
pub positions: CertificatePositions,
}

impl From<&crate::uci::Certificate> for Certificate {
fn from(uci_cert: &crate::uci::Certificate) -> Self {
#[derive(Debug, Serialize, Deserialize, SimpleObject)]
pub struct Ready {
message: String,
signature: String,
}

impl From<&CertificateDelivered> for Certificate {
fn from(value: &CertificateDelivered) -> Self {
let uci_cert = &value.certificate;

Self {
id: CertificateId(uci_cert.id.to_string()),
prev_id: CertificateId(uci_cert.prev_id.to_string()),
Expand All @@ -36,6 +51,9 @@ impl From<&crate::uci::Certificate> for Certificate {
tx_root_hash: hex::encode(uci_cert.tx_root_hash),
receipts_root_hash: format!("0x{}", hex::encode(uci_cert.receipts_root_hash)),
verifier: uci_cert.verifier,
positions: CertificatePositions {
source: (&value.proof_of_delivery.delivery_position).into(),
},
}
}
}
Expand Down
21 changes: 20 additions & 1 deletion crates/topos-core/src/api/graphql/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use async_graphql::InputObject;
use async_graphql::{InputObject, SimpleObject};
use serde::{Deserialize, Serialize};

use crate::types::stream::CertificateSourceStreamPosition;

use super::{certificate::CertificateId, subnet::SubnetId};

Expand All @@ -9,6 +12,22 @@ pub struct SourceStreamPositionInput {
pub certificate_id: Option<CertificateId>,
}

#[derive(Debug, Deserialize, Serialize, SimpleObject)]
#[serde(rename_all = "camelCase")]
pub struct SourceStreamPosition {
pub source_subnet_id: SubnetId,
pub position: u64,
}

impl From<&CertificateSourceStreamPosition> for SourceStreamPosition {
fn from(value: &CertificateSourceStreamPosition) -> Self {
Self {
source_subnet_id: (&value.subnet_id).into(),
position: *value.position,
}
}
}

#[derive(InputObject)]
pub struct SourceCheckpointInput {
pub source_subnet_ids: Vec<SubnetId>,
Expand Down
6 changes: 6 additions & 0 deletions crates/topos-core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ pub struct CertificateDelivered {
pub proof_of_delivery: ProofOfDelivery,
}

impl AsRef<CertificateDelivered> for CertificateDelivered {
fn as_ref(&self) -> &Self {
self
}
}

/// Certificate's Proof of Delivery
///
/// This structure is used to prove that a certificate has been delivered.
Expand Down
8 changes: 4 additions & 4 deletions crates/topos-tce-api/src/graphql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl CertificateQuery for QueryRoot {
certificates.extend(
certificates_with_position
.into_iter()
.map(|(c, _)| c.certificate.as_ref().into()),
.map(|(ref c, _)| c.into()),
);
}

Expand All @@ -87,7 +87,7 @@ impl CertificateQuery for QueryRoot {
)
.map_err(|_| GraphQLServerError::StorageError)
.and_then(|c| {
c.map(|c| Certificate::from(&c.certificate))
c.map(|ref c| c.into())
.ok_or(GraphQLServerError::StorageError)
})
}
Expand Down Expand Up @@ -151,8 +151,8 @@ impl SubscriptionRoot {
filter
.as_ref()
.map(|v| match v {
(FilterIs::Source, id) => id == &c.source_subnet_id,
(FilterIs::Target, id) => c.target_subnets.contains(id),
(FilterIs::Source, id) => id == &c.certificate.source_subnet_id,
(FilterIs::Target, id) => c.certificate.target_subnets.contains(id),
})
.unwrap_or(true),
)
Expand Down
30 changes: 22 additions & 8 deletions crates/topos-tce-api/src/graphql/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@ use futures::{SinkExt, StreamExt};
use rstest::rstest;
use test_log::test;
use tokio::sync::{mpsc, oneshot};
use topos_core::uci::{Certificate, SubnetId, INITIAL_CERTIFICATE_ID};
use topos_test_sdk::constants::{SOURCE_SUBNET_ID_2, TARGET_SUBNET_ID_3};
use topos_core::{
types::stream::Position,
uci::{SubnetId, INITIAL_CERTIFICATE_ID},
};
use topos_test_sdk::{
certificates::{create_certificate, create_certificate_at_position},
constants::{SOURCE_SUBNET_ID_2, TARGET_SUBNET_ID_3},
};
use uuid::Uuid;

#[rstest]
Expand Down Expand Up @@ -67,12 +73,14 @@ async fn open_watch_certificate_delivered() {

tokio::time::sleep(Duration::from_millis(10)).await;

let certificate = Certificate::new_with_default_fields(
INITIAL_CERTIFICATE_ID,
SOURCE_SUBNET_ID_2,
&[TARGET_SUBNET_ID_3],
)
.unwrap();
let certificate = create_certificate_at_position(
Position::ZERO,
create_certificate(
SOURCE_SUBNET_ID_2,
&[TARGET_SUBNET_ID_3],
Some(INITIAL_CERTIFICATE_ID),
),
);

_ = notify.send(Arc::new(certificate)).await;
}
Expand Down Expand Up @@ -119,6 +127,12 @@ async fn open_watch_certificate_delivered() {
txRootHash
receiptsRootHash
verifier
positions {
source {
sourceSubnetId
position
}
}
}
}"
},
Expand Down
7 changes: 4 additions & 3 deletions crates/topos-tce-api/src/grpc/messaging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use topos_core::api::grpc::tce::v1::watch_certificates_request::OpenStream as Gr
use topos_core::api::grpc::tce::v1::watch_certificates_response::CertificatePushed as GrpcCertificatePushed;
use topos_core::api::grpc::tce::v1::watch_certificates_response::Event;
use topos_core::api::grpc::tce::v1::watch_certificates_response::StreamOpened as GrpcStreamOpened;
use topos_core::uci::{Certificate, SubnetId};
use topos_core::types::CertificateDelivered;
use topos_core::uci::SubnetId;

pub enum InboundMessage {
OpenStream(OpenStream),
Expand All @@ -17,7 +18,7 @@ pub struct OpenStream {

#[derive(Debug)]
pub struct CertificatePushed {
pub(crate) certificate: Certificate,
pub(crate) certificate: CertificateDelivered,
pub(crate) positions: Vec<TargetStreamPosition>,
}

Expand Down Expand Up @@ -71,7 +72,7 @@ impl From<OutboundMessage> for Event {
}
OutboundMessage::CertificatePushed(certificate_pushed) => {
Self::CertificatePushed(GrpcCertificatePushed {
certificate: Some(certificate_pushed.certificate.into()),
certificate: Some(certificate_pushed.certificate.certificate.into()),
positions: certificate_pushed
.positions
.into_iter()
Expand Down
5 changes: 3 additions & 2 deletions crates/topos-tce-api/src/runtime/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use super::RuntimeCommand;
use futures::Future;
use tokio::sync::{mpsc, oneshot, RwLock};
use topos_core::api::grpc::checkpoints::TargetStreamPosition;
use topos_core::api::grpc::tce::v1::StatusResponse;
use topos_core::types::CertificateDelivered;
use topos_core::uci::SubnetId;
use topos_core::{api::grpc::tce::v1::StatusResponse, uci::Certificate};
use tracing::error;

#[derive(Clone, Debug)]
Expand All @@ -19,7 +20,7 @@ pub struct RuntimeClient {
impl RuntimeClient {
pub fn dispatch_certificate(
&self,
certificate: Certificate,
certificate: CertificateDelivered,
positions: HashMap<SubnetId, TargetStreamPosition>,
) -> impl Future<Output = ()> + 'static + Send {
let sender = self.command_sender.clone();
Expand Down
3 changes: 2 additions & 1 deletion crates/topos-tce-api/src/runtime/commands.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::HashMap;
use tokio::sync::{mpsc::Sender, oneshot};
use topos_core::api::grpc::checkpoints::TargetStreamPosition;
use topos_core::types::CertificateDelivered;
use topos_core::uci::{Certificate, SubnetId};
use topos_tce_storage::types::PendingResult;
use uuid::Uuid;
Expand All @@ -13,7 +14,7 @@ use super::error::RuntimeError;
pub enum RuntimeCommand {
/// Dispatch certificate to gRPC API Runtime in order to push it to listening open streams
DispatchCertificate {
certificate: Certificate,
certificate: CertificateDelivered,
positions: HashMap<SubnetId, TargetStreamPosition>,
},
}
Expand Down
21 changes: 13 additions & 8 deletions crates/topos-tce-api/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use tokio_util::sync::CancellationToken;
use tonic_health::server::HealthReporter;
use topos_core::api::grpc::checkpoints::TargetStreamPosition;
use topos_core::api::grpc::tce::v1::api_service_server::ApiServiceServer;
use topos_core::uci::{Certificate, SubnetId};
use topos_core::types::CertificateDelivered;
use topos_core::uci::SubnetId;
use topos_tce_storage::{types::CertificateDeliveredWithPositions, StorageClient};

use tracing::{debug, error, info};
Expand Down Expand Up @@ -60,7 +61,7 @@ pub struct Runtime {
pub(crate) broadcast_stream: broadcast::Receiver<CertificateDeliveredWithPositions>,

pub(crate) storage: StorageClient,
pub(crate) transient_streams: HashMap<Uuid, Sender<Arc<Certificate>>>,
pub(crate) transient_streams: HashMap<Uuid, Sender<Arc<CertificateDelivered>>>,
/// Streams that are currently active (with a valid handshake)
pub(crate) active_streams: HashMap<Uuid, Sender<StreamCommand>>,
/// Streams that are currently in negotiation
Expand Down Expand Up @@ -99,8 +100,8 @@ impl Runtime {
}

Ok(certificate_delivered) = self.broadcast_stream.recv() => {
let certificate = certificate_delivered.0.certificate;
let certificate_id = certificate.id;
let certificate = certificate_delivered.0;
let certificate_id = certificate.certificate.id;
let positions = certificate_delivered.1;
let cmd = RuntimeCommand::DispatchCertificate {
certificate,
Expand Down Expand Up @@ -183,13 +184,17 @@ impl Runtime {
} => {
info!(
"Received DispatchCertificate for certificate cert_id: {:?}",
certificate.id
certificate.certificate.id
);
// Collect target subnets from certificate cross chain transaction list
let target_subnets = certificate.target_subnets.iter().collect::<HashSet<_>>();
let target_subnets = certificate
.certificate
.target_subnets
.iter()
.collect::<HashSet<_>>();
debug!(
"Dispatching certificate cert_id: {:?} to target subnets: {:?}",
&certificate.id, target_subnets
&certificate.certificate.id, target_subnets
);

// Notify all the transient streams that a new certificate is available
Expand Down Expand Up @@ -228,7 +233,7 @@ impl Runtime {
error!(
"Invalid target stream position for cert id {}, target \
subnet id {target_subnet_id}, dispatch failed",
&certificate.id
&certificate.certificate.id
);
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/topos-tce-api/src/runtime/sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ impl IntoFuture for SyncTask {
}
}

for (CertificateDelivered { certificate, .. }, position) in collector {
for (certificate, position) in collector {
debug!(
"Stream sync task for {} is sending {}",
self.stream_id, certificate.id
self.stream_id, certificate.certificate.id
);

if let FetchCertificatesPosition::Target(CertificateTargetStreamPosition {
Expand All @@ -171,7 +171,7 @@ impl IntoFuture for SyncTask {
target_subnet_id,
source_subnet_id,
position: *position,
certificate_id: Some(certificate.id),
certificate_id: Some(certificate.certificate.id),
}],
certificate,
})
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-api/src/stream/commands.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use topos_core::api::grpc::checkpoints::TargetStreamPosition;
use topos_core::uci::Certificate;
use topos_core::types::CertificateDelivered;

#[derive(Debug)]
pub enum StreamCommand {
PushCertificate {
certificate: Certificate,
certificate: CertificateDelivered,
positions: Vec<TargetStreamPosition>,
},
}
9 changes: 5 additions & 4 deletions crates/topos-tce-api/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use tokio::{
};
use tonic::Status;
use topos_core::api::grpc::checkpoints::{TargetCheckpoint, TargetStreamPosition};
use topos_core::uci::{Certificate, SubnetId};
use topos_core::types::CertificateDelivered;
use topos_core::uci::SubnetId;
use tracing::{debug, error, info, trace, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -40,13 +41,13 @@ pub(crate) use self::errors::{HandshakeError, StreamErrorKind};
/// implementation to notify the `runtime` when ended.
#[derive(Debug)]
pub struct TransientStream {
pub(crate) inner: mpsc::Receiver<Arc<Certificate>>,
pub(crate) inner: mpsc::Receiver<Arc<CertificateDelivered>>,
pub(crate) stream_id: Uuid,
pub(crate) notifier: Option<oneshot::Sender<Uuid>>,
}

impl futures::Stream for TransientStream {
type Item = Arc<Certificate>;
type Item = Arc<CertificateDelivered>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -168,7 +169,7 @@ impl Stream {
certificate,
positions,
} => {
let certificate_id = certificate.id;
let certificate_id = certificate.certificate.id;
if let Err(error) = self
.outbound_stream
.send(Ok((
Expand Down
Loading

0 comments on commit 5315710

Please sign in to comment.