From c989de50426e8699f3f3792f51da86887a34e3bf Mon Sep 17 00:00:00 2001 From: ktatarnikov Date: Tue, 7 Jan 2025 15:01:13 +0100 Subject: [PATCH 1/3] [integration tests] refactoring initialization functionality into ContextBuilder and Context --- rhio/src/context.rs | 244 ++++++++++++++++++++++++++++++++++++ rhio/src/context_builder.rs | 185 +++++++++++++++++++++++++++ rhio/src/lib.rs | 2 + rhio/src/main.rs | 193 ++-------------------------- rhio/src/node/config.rs | 6 + rhio/src/node/rhio.rs | 86 ++++--------- 6 files changed, 475 insertions(+), 241 deletions(-) create mode 100644 rhio/src/context.rs create mode 100644 rhio/src/context_builder.rs diff --git a/rhio/src/context.rs b/rhio/src/context.rs new file mode 100644 index 00000000..409fa633 --- /dev/null +++ b/rhio/src/context.rs @@ -0,0 +1,244 @@ +use std::collections::HashMap; + +use crate::{Node, StreamName}; + +use anyhow::{bail, Context as AnyhowContext, Result}; +use p2panda_core::PublicKey; +use tokio::runtime::Runtime; +use tokio::signal; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tracing::info; + +use crate::config::{Config, LocalNatsSubject, RemoteNatsSubject, RemoteS3Bucket}; +use crate::health::HTTP_HEALTH_ROUTE; +use crate::{ + FilesSubscription, FilteredMessageStream, MessagesSubscription, Publication, Subscription, +}; +use rhio_core::Subject; + +pub struct Context { + node: Node, + config: Config, + public_key: PublicKey, + http_handle: JoinHandle>, + http_runtime: Runtime, + cancellation_token: CancellationToken, + rhio_runtime: Runtime, +} + +impl Context { + pub fn new( + node: Node, + config: Config, + public_key: PublicKey, + http_handle: JoinHandle>, + http_runtime: Runtime, + cancellation_token: CancellationToken, + rhio_runtime: Runtime, + ) -> Self { + Context { + node, + config, + public_key, + http_handle, + http_runtime, + cancellation_token, + rhio_runtime, + } + } + + pub fn configure(&self) -> Result<()> { + self.rhio_runtime.block_on(async { + self.configure_inner() + .await + .context("failed to configure Node") + }) + } + + async fn configure_inner(&self) -> Result<()> { + if self.config.s3.is_some() { + if let Some(publish) = &self.config.publish { + self.publish_s3(publish.s3_buckets.clone()).await?; + } + if let Some(subscribe) = &self.config.subscribe { + self.subscribe_s3(subscribe.s3_buckets.clone()).await?; + } + } + if let Some(publish) = &self.config.publish { + self.publish_nats(publish.nats_subjects.clone()).await?; + } + if let Some(subscribe) = &self.config.subscribe { + self.subscribe_nats(subscribe.nats_subjects.clone()).await?; + } + Ok(()) + } + + async fn publish_s3(&self, s3_buckets: Vec) -> Result<()> { + for bucket_name in s3_buckets { + // Assign our own public key to S3 bucket info. + self.node + .publish(Publication::Files { + bucket_name, + public_key: self.public_key, + }) + .await?; + } + Ok(()) + } + + async fn subscribe_s3(&self, s3_buckets: Vec) -> Result<()> { + for RemoteS3Bucket { + local_bucket_name, + remote_bucket_name, + public_key: remote_public_key, + } in s3_buckets + { + self.node + .subscribe(Subscription::Files(FilesSubscription { + remote_bucket_name, + local_bucket_name, + public_key: remote_public_key, + })) + .await?; + } + + Ok(()) + } + + async fn subscribe_nats(&self, subjects: Vec) -> Result<()> { + // Multiple subjects can be used on top of a stream and we want to group them over one + // public key and stream name pair. + let mut stream_public_key_map = HashMap::<(StreamName, PublicKey), Vec>::new(); + for RemoteNatsSubject { + stream_name, + subject, + public_key: remote_public_key, + } in subjects + { + stream_public_key_map + .entry((stream_name, remote_public_key)) + .and_modify(|subjects| { + subjects.push(subject.clone()); + }) + .or_insert(vec![subject]); + } + + // Finally we want to group these subscriptions by public key. + let mut subscription_map = HashMap::>::new(); + for ((stream_name, remote_public_key), subjects) in stream_public_key_map.into_iter() { + let filtered_stream = FilteredMessageStream { + subjects, + stream_name, + }; + subscription_map + .entry(remote_public_key) + .and_modify(|filtered_streams| filtered_streams.push(filtered_stream.clone())) + .or_insert(vec![filtered_stream]); + } + + for (remote_public_key, filtered_streams) in subscription_map.into_iter() { + let subscription = Subscription::Messages(MessagesSubscription { + filtered_streams, + public_key: remote_public_key, + }); + self.node.subscribe(subscription).await?; + } + Ok(()) + } + + async fn publish_nats(&self, nats_subjects: Vec) -> Result<()> { + // Multiple subjects can be used on top of a stream and we want to group them over one + // public key and stream name pair. This leaves us with the following structure: + // + // Streams Public Key Subjects Topic Id (for Gossip) + // ======= ========== ======== ===================== + // 1 I A a + // 2 I B a + // 3 II A b + // 1 II B b + // 1 II C b + let mut stream_public_key_map = HashMap::>::new(); + for LocalNatsSubject { + stream_name, + subject, + } in nats_subjects + { + stream_public_key_map + .entry(stream_name) + .and_modify(|subjects| { + subjects.push(subject.clone()); + }) + .or_insert(vec![subject.clone()]); + } + + for (stream_name, subjects) in stream_public_key_map.into_iter() { + self.node + .publish(Publication::Messages { + filtered_stream: FilteredMessageStream { + subjects, + stream_name, + }, + // Assign our own public key to NATS subject info. + public_key: self.public_key, + }) + .await?; + } + Ok(()) + } + + pub fn log_configuration(&self) { + let addresses: Vec = self + .node + .direct_addresses() + .iter() + .map(|addr| addr.to_string()) + .collect(); + info!("‣ network id:"); + info!(" - {}", self.config.node.network_id); + info!("‣ node public key:"); + info!(" - {}", self.public_key); + info!("‣ node addresses:"); + for address in addresses { + info!(" - {}", address); + } + info!("‣ health endpoint:"); + info!( + " - 0.0.0.0:{}{}", + self.config.node.http_bind_port, HTTP_HEALTH_ROUTE + ); + } + + pub fn wait_for_termination(&self) -> Result<()> { + let cloned_token = self.cancellation_token.clone(); + self.http_runtime.block_on(async move { + tokio::select! { + _ = cloned_token.cancelled() => bail!("HTTP server was cancelled"), + _ = signal::ctrl_c() => {}, + }; + Ok(()) + }) + } + + /// Shuts down the context, including the node and associated runtimes. + /// + /// This method performs the following steps: + /// 1. Shuts down the node asynchronously. + /// 2. Aborts the HTTP handle. + /// 3. Shuts down the HTTP runtime in the background. + /// 4. Shuts down the Rhio runtime in the background. + /// + /// Returns a `Result` indicating the success or failure of the shutdown process. + pub fn shutdown(self) -> Result<()> { + self.rhio_runtime.block_on(async move { + self.node + .shutdown() + .await + .context("Failure during node shutdown") + })?; + self.http_handle.abort(); + self.http_runtime.shutdown_background(); + self.rhio_runtime.shutdown_background(); + Ok(()) + } +} diff --git a/rhio/src/context_builder.rs b/rhio/src/context_builder.rs new file mode 100644 index 00000000..b9d0de8a --- /dev/null +++ b/rhio/src/context_builder.rs @@ -0,0 +1,185 @@ +use crate::context::Context; +use crate::node::rhio::NodeOptions; + +use crate::{blobs::store_from_config, nats::Nats, Node}; + +use anyhow::{anyhow, Context as AnyhowContext, Result}; +use p2panda_blobs::Blobs as BlobsHandler; +use p2panda_core::{PrivateKey, PublicKey}; +use p2panda_net::SyncConfiguration; +use p2panda_net::{NetworkBuilder, ResyncConfiguration}; +use tokio::runtime::{Builder, Runtime}; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +use crate::blobs::{blobs_config, Blobs}; +use crate::config::PRIVATE_KEY_ENV; +use crate::config::{load_config, Config}; +use crate::health::run_http_server; +use crate::network::sync::RhioSyncProtocol; +use crate::network::Panda; +use crate::tracing::setup_tracing; +use figment::providers::Env; +use rhio_core::load_private_key_from_file; + +#[derive(Debug)] +pub struct ContextBuilder { + config: Config, + private_key: PrivateKey, + public_key: PublicKey, +} + +impl ContextBuilder { + pub fn new(config: Config, private_key: PrivateKey) -> Self { + ContextBuilder { + public_key: private_key.public_key(), + config, + private_key, + } + } + /// Load the configuration from the environment and initializes context builder + pub fn from_cli() -> Result { + let config = load_config()?; + setup_tracing(config.log_level.clone()); + + // Load the private key from either an environment variable _or_ a file specified in the + // config. The environment variable takes priority. + let private_key = match Env::var(PRIVATE_KEY_ENV) { + Some(private_key_hex) => PrivateKey::try_from(&hex::decode(&private_key_hex)?[..])?, + None => load_private_key_from_file(&config.node.private_key_path).context(format!( + "could not load private key from file {}", + config.node.private_key_path.display(), + ))?, + }; + + let public_key = private_key.public_key(); + Ok(ContextBuilder { + config, + private_key, + public_key, + }) + } + /// Attempts to build and start the `Context`. + /// + /// This method initializes the Rhio runtime and HTTP server runtime, then + /// starts the Rhio node and HTTP server. The HTTP server is launched in a + /// separate runtime to avoid blocking the Rhio runtime. + /// + /// # Returns + /// + /// * `Ok(Context)` - If the context is successfully built and started. + /// * `Err(anyhow::Error)` - If there is an error during the initialization + /// process. + /// + /// # Errors + /// + /// This function will return an error if: + /// * The Rhio tokio runtime cannot be built. + /// * The Rhio node initialization fails. + /// * The HTTP server tokio runtime cannot be built. + /// * The HTTP server fails to start. + /// + pub fn try_build_and_start(&self) -> Result { + let rhio_runtime = Builder::new_multi_thread() + .enable_all() + .thread_name("rhio") + .build() + .expect("Rhio tokio runtime"); + + let node = rhio_runtime.block_on(async move { + ContextBuilder::init_rhio_node(self.config.clone(), self.private_key.clone()) + .await + .context("failed to initialize Rhio node") + })?; + + let http_runtime = Builder::new_current_thread() + .enable_io() + .thread_name("http-server") + .build() + .expect("http server tokio runtime"); + + let cancellation_token = CancellationToken::new(); + // Launch HTTP server in separate runtime to not block rhio runtime. + let http_handle = self.start_http_server(&http_runtime, cancellation_token.clone()); + Ok(Context::new( + node, + self.config.clone(), + self.public_key, + http_handle, + http_runtime, + cancellation_token, + rhio_runtime, + )) + } + + async fn init_rhio_node(config: Config, private_key: PrivateKey) -> Result { + let nats = Nats::new(config.clone()).await?; + + // 2. Configure rhio peer-to-peer network. + let (node_config, p2p_network_config) = Node::configure_p2p_network(&config).await?; + + let blob_store = store_from_config(&config).await?; + + let sync_protocol = RhioSyncProtocol::new( + node_config.clone(), + nats.clone(), + blob_store.clone(), + private_key.clone(), + ); + + let sync_config = + SyncConfiguration::new(sync_protocol).resync(ResyncConfiguration::default()); + + let builder = NetworkBuilder::from_config(p2p_network_config) + .private_key(private_key.clone()) + .sync(sync_config); + + let (watcher_tx, watcher_rx) = mpsc::channel(512); + // 3. Configure and set up blob store and connection handlers for blob replication. + let (network, blobs, watcher_rx) = if config.s3.is_some() { + let (network, blobs_handler) = + BlobsHandler::from_builder_with_config(builder, blob_store.clone(), blobs_config()) + .await?; + // 3.1. Start a service which watches the S3 buckets for changes. + let blobs = Blobs::new(blob_store.clone(), blobs_handler, watcher_tx); + (network, Some(blobs), watcher_rx) + } else { + let network = builder.build().await?; + (network, None, watcher_rx) + }; + + // 5. Move all networking logic into dedicated "p2panda" actor, dealing with p2p + // networking, data replication and gossipping. + let node_id = network.node_id(); + let direct_addresses = network + .direct_addresses() + .await + .ok_or_else(|| anyhow!("socket is not bind to any interface"))?; + let panda = Panda::new(network); + + let options = NodeOptions { + public_key: node_id, + node_config, + private_key: private_key.clone(), + direct_addresses, + }; + + Node::new(nats, blobs, watcher_rx, panda, options).await + } + + fn start_http_server( + &self, + runtime: &Runtime, + cancellation_token: CancellationToken, + ) -> JoinHandle> { + let http_bind_port = self.config.node.http_bind_port; + runtime.spawn(async move { + let result = run_http_server(http_bind_port) + .await + .context("failed to start http server with health endpoint"); + cancellation_token.cancel(); + result + }) + } +} diff --git a/rhio/src/lib.rs b/rhio/src/lib.rs index c314facc..6f47dbbf 100644 --- a/rhio/src/lib.rs +++ b/rhio/src/lib.rs @@ -1,5 +1,7 @@ mod blobs; pub mod config; +pub mod context; +pub mod context_builder; pub mod health; mod nats; mod network; diff --git a/rhio/src/main.rs b/rhio/src/main.rs index 385dced4..ebfef982 100644 --- a/rhio/src/main.rs +++ b/rhio/src/main.rs @@ -1,194 +1,24 @@ -use std::collections::HashMap; - -use anyhow::{bail, Context, Result}; -use figment::providers::Env; -use p2panda_core::{PrivateKey, PublicKey}; -use rhio::config::{ - load_config, LocalNatsSubject, RemoteNatsSubject, RemoteS3Bucket, PRIVATE_KEY_ENV, -}; -use rhio::health::{run_http_server, HTTP_HEALTH_ROUTE}; -use rhio::tracing::setup_tracing; -use rhio::{ - FilesSubscription, FilteredMessageStream, MessagesSubscription, Node, Publication, StreamName, - Subscription, -}; -use rhio_core::{load_private_key_from_file, Subject}; -use tokio::runtime::Builder; -use tokio::sync::oneshot; +use anyhow::Result; +use rhio::context_builder::ContextBuilder; use tracing::info; -#[tokio::main] -async fn main() -> Result<()> { - let config = load_config()?; - setup_tracing(config.log_level.clone()); - - // Load the private key from either an environment variable _or_ a file specified in the - // config. The environment variable takes priority. - let private_key = match Env::var(PRIVATE_KEY_ENV) { - Some(private_key_hex) => PrivateKey::try_from(&hex::decode(&private_key_hex)?[..])?, - None => load_private_key_from_file(&config.node.private_key_path).context(format!( - "could not load private key from file {}", - config.node.private_key_path.display(), - ))?, - }; - - let public_key = private_key.public_key(); - - let node = Node::spawn(config.clone(), private_key).await?; - - hello_rhio(); - let addresses: Vec = node - .direct_addresses() - .iter() - .map(|addr| addr.to_string()) - .collect(); - info!("‣ network id:"); - info!(" - {}", config.node.network_id); - info!("‣ node public key:"); - info!(" - {}", public_key); - info!("‣ node addresses:"); - for address in addresses { - info!(" - {}", address); - } - info!("‣ health endpoint:"); - info!( - " - 0.0.0.0:{}{}", - config.node.http_bind_port, HTTP_HEALTH_ROUTE - ); - - if let Some(publish) = config.publish { - for bucket_name in publish.s3_buckets { - // Assign our own public key to S3 bucket info. - node.publish(Publication::Files { - bucket_name, - public_key, - }) - .await?; - } - - // Multiple subjects can be used on top of a stream and we want to group them over one - // public key and stream name pair. This leaves us with the following structure: - // - // Streams Public Key Subjects Topic Id (for Gossip) - // ======= ========== ======== ===================== - // 1 I A a - // 2 I B a - // 3 II A b - // 1 II B b - // 1 II C b - let mut stream_public_key_map = HashMap::>::new(); - for LocalNatsSubject { - stream_name, - subject, - } in publish.nats_subjects - { - stream_public_key_map - .entry(stream_name) - .and_modify(|subjects| { - subjects.push(subject.clone()); - }) - .or_insert(vec![subject]); - } - - for (stream_name, subjects) in stream_public_key_map.into_iter() { - node.publish(Publication::Messages { - filtered_stream: FilteredMessageStream { - subjects, - stream_name, - }, - // Assign our own public key to NATS subject info. - public_key, - }) - .await?; - } - }; +fn main() -> Result<()> { + let context_builder = ContextBuilder::from_cli()?; + let context = context_builder.try_build_and_start()?; - if let Some(subscribe) = config.subscribe { - for RemoteS3Bucket { - local_bucket_name, - remote_bucket_name, - public_key: remote_public_key, - } in subscribe.s3_buckets - { - node.subscribe(Subscription::Files(FilesSubscription { - remote_bucket_name, - local_bucket_name, - public_key: remote_public_key, - })) - .await?; - } + context.configure()?; - // Multiple subjects can be used on top of a stream and we want to group them over one - // public key and stream name pair. - let mut stream_public_key_map = HashMap::<(StreamName, PublicKey), Vec>::new(); - for RemoteNatsSubject { - stream_name, - subject, - public_key: remote_public_key, - } in subscribe.nats_subjects - { - stream_public_key_map - .entry((stream_name, remote_public_key)) - .and_modify(|subjects| { - subjects.push(subject.clone()); - }) - .or_insert(vec![subject]); - } + log_hello_rhio(); + context.log_configuration(); - // Finally we want to group these subscriptions by public key. - let mut subscription_map = HashMap::>::new(); - for ((stream_name, remote_public_key), subjects) in stream_public_key_map.into_iter() { - let filtered_stream = FilteredMessageStream { - subjects, - stream_name, - }; - subscription_map - .entry(remote_public_key) - .and_modify(|filtered_streams| filtered_streams.push(filtered_stream.clone())) - .or_insert(vec![filtered_stream]); - } + context.wait_for_termination()?; - for (remote_public_key, filtered_streams) in subscription_map.into_iter() { - let subscription = Subscription::Messages(MessagesSubscription { - filtered_streams, - public_key: remote_public_key, - }); - node.subscribe(subscription).await?; - } - }; - - // Launch HTTP server in separate thread to not block rhio runtime. - let (http_error_tx, http_error_rx) = oneshot::channel::>(); - std::thread::spawn(move || { - let runtime = Builder::new_current_thread() - .enable_io() - .thread_name("http-server") - .build() - .expect("http server tokio runtime"); - - let result = runtime.block_on(async move { - run_http_server(config.node.http_bind_port) - .await - .context("failed to start http server with health endpoint")?; - Ok(()) - }); - - http_error_tx.send(result).expect("sending http error"); - }); - - tokio::select! { - Ok(Err(err)) = http_error_rx => bail!(err), - _ = tokio::signal::ctrl_c() => {}, - } - - info!(""); info!("shutting down"); - node.shutdown().await?; - Ok(()) + context.shutdown() } -fn hello_rhio() { +fn log_hello_rhio() { r#" ___ ___ ___ /\ \ /\__\ ___ /\ \ /::\ \ /:/ / /\ \ /::\ \ @@ -203,4 +33,5 @@ fn hello_rhio() { "# .split("\n") .for_each(|line| info!("{}", line)); + info!(""); } diff --git a/rhio/src/node/config.rs b/rhio/src/node/config.rs index 30cef0b4..d87382eb 100644 --- a/rhio/src/node/config.rs +++ b/rhio/src/node/config.rs @@ -14,6 +14,12 @@ pub struct NodeConfig { inner: Arc>, } +impl Default for NodeConfig { + fn default() -> Self { + Self::new() + } +} + #[derive(Debug)] struct NodeConfigInner { subscriptions: Vec, diff --git a/rhio/src/node/rhio.rs b/rhio/src/node/rhio.rs index c561102c..768d37f0 100644 --- a/rhio/src/node/rhio.rs +++ b/rhio/src/node/rhio.rs @@ -3,26 +3,33 @@ use std::net::SocketAddr; use anyhow::{anyhow, Result}; use futures_util::future::{MapErr, Shared}; use futures_util::{FutureExt, TryFutureExt}; -use p2panda_blobs::Blobs as BlobsHandler; use p2panda_core::{Hash, PrivateKey, PublicKey}; -use p2panda_net::{ - Config as NetworkConfig, NetworkBuilder, ResyncConfiguration, SyncConfiguration, -}; +use p2panda_net::Config as NetworkConfig; +use s3::error::S3Error; +use tokio::sync::mpsc::Receiver; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinError; use tokio_util::task::AbortOnDropHandle; use tracing::error; -use crate::blobs::{blobs_config, store_from_config, Blobs}; +use crate::blobs::watcher::S3Event; +use crate::blobs::Blobs; use crate::config::Config; use crate::nats::Nats; -use crate::network::sync::RhioSyncProtocol; use crate::network::Panda; use crate::node::actor::{NodeActor, ToNodeActor}; use crate::node::config::NodeConfig; use crate::topic::{Publication, Subscription}; use crate::JoinErrToStr; +#[derive(Debug, Clone, Default)] +pub struct NodeOptions { + pub public_key: PublicKey, + pub private_key: PrivateKey, + pub direct_addresses: Vec, + pub node_config: NodeConfig, +} + pub struct Node { node_id: PublicKey, direct_addresses: Vec, @@ -31,61 +38,20 @@ pub struct Node { } impl Node { - /// Configure and spawn a node. - pub async fn spawn(config: Config, private_key: PrivateKey) -> Result { - // 1. Connect to NATS server and consume streams filtered by NATS subjects. - let nats = Nats::new(config.clone()).await?; - - // 2. Configure rhio peer-to-peer network. - let (node_config, p2p_network_config) = Node::configure_p2p_network(&config).await?; - - let blob_store = store_from_config(&config).await?; - - let sync_protocol = RhioSyncProtocol::new( - node_config.clone(), - nats.clone(), - blob_store.clone(), - private_key.clone(), - ); - - let sync_config = - SyncConfiguration::new(sync_protocol).resync(ResyncConfiguration::default()); - - let builder = NetworkBuilder::from_config(p2p_network_config) - .private_key(private_key.clone()) - .sync(sync_config); - - // 3. Configure and set up blob store and connection handlers for blob replication. - let (network, blobs, watcher_rx) = if config.s3.is_some() { - let (network, blobs_handler) = - BlobsHandler::from_builder_with_config(builder, blob_store.clone(), blobs_config()) - .await?; - // 3.1. Start a service which watches the S3 buckets for changes. - let (watcher_tx, watcher_rx) = mpsc::channel(512); - - let blobs = Blobs::new(blob_store.clone(), blobs_handler, watcher_tx); - (network, Some(blobs), watcher_rx) - } else { - let network = builder.build().await?; - let (_dummy_watcher_tx, dummy_watcher_rx) = mpsc::channel(512); - (network, None, dummy_watcher_rx) - }; - - // 5. Move all networking logic into dedicated "p2panda" actor, dealing with p2p - // networking, data replication and gossipping. - let node_id = network.node_id(); - let direct_addresses = network - .direct_addresses() - .await - .ok_or_else(|| anyhow!("socket is not bind to any interface"))?; - let panda = Panda::new(network); - + /// Create a new Rhio node. + pub async fn new( + nats: Nats, + blobs: Option, + watcher_rx: Receiver>, + panda: Panda, + options: NodeOptions, + ) -> Result { // 6. Finally spawn actor which orchestrates the "business logic", with the help of the // blob store, p2panda network and NATS JetStream consumers. let (node_actor_tx, node_actor_rx) = mpsc::channel(512); let node_actor = NodeActor::new( - node_config, - private_key, + options.node_config, + options.private_key, nats, panda, blobs, @@ -103,8 +69,8 @@ impl Node { .shared(); let node = Node { - node_id, - direct_addresses, + node_id: options.public_key, + direct_addresses: options.direct_addresses, node_actor_tx, actor_handle: actor_drop_handle, }; @@ -112,7 +78,7 @@ impl Node { Ok(node) } - async fn configure_p2p_network( + pub async fn configure_p2p_network( config: &Config, ) -> Result<(NodeConfig, NetworkConfig), anyhow::Error> { let node_config = NodeConfig::new(); From db7c09382f1f552ba5c2e57e7c1a8b0314e070f6 Mon Sep 17 00:00:00 2001 From: ktatarnikov Date: Wed, 15 Jan 2025 12:57:10 +0100 Subject: [PATCH 2/3] [integration tests] message replication tests --- Cargo.lock | 73 +++++--- Cargo.toml | 6 +- rhio/Cargo.toml | 6 + rhio/src/config.rs | 4 +- rhio/src/context_builder.rs | 26 ++- rhio/src/lib.rs | 3 + rhio/src/nats/actor.rs | 85 +++++---- rhio/src/nats/client/fake/blocking.rs | 132 ++++++++++++++ rhio/src/nats/client/fake/client.rs | 244 ++++++++++++++++++++++++++ rhio/src/nats/client/fake/mod.rs | 3 + rhio/src/nats/client/fake/server.rs | 177 +++++++++++++++++++ rhio/src/nats/client/mod.rs | 5 + rhio/src/nats/client/nats.rs | 237 +++++++++++++++++++++++++ rhio/src/nats/client/types.rs | 70 ++++++++ rhio/src/nats/consumer.rs | 142 ++++----------- rhio/src/nats/mod.rs | 46 ++--- rhio/src/tests/configuration.rs | 238 +++++++++++++++++++++++++ rhio/src/tests/fake_rhio_server.rs | 33 ++++ rhio/src/tests/message_replication.rs | 80 +++++++++ rhio/src/tests/mod.rs | 3 + 20 files changed, 1401 insertions(+), 212 deletions(-) create mode 100644 rhio/src/nats/client/fake/blocking.rs create mode 100644 rhio/src/nats/client/fake/client.rs create mode 100644 rhio/src/nats/client/fake/mod.rs create mode 100644 rhio/src/nats/client/fake/server.rs create mode 100644 rhio/src/nats/client/mod.rs create mode 100644 rhio/src/nats/client/nats.rs create mode 100644 rhio/src/nats/client/types.rs create mode 100644 rhio/src/tests/configuration.rs create mode 100644 rhio/src/tests/fake_rhio_server.rs create mode 100644 rhio/src/tests/message_replication.rs create mode 100644 rhio/src/tests/mod.rs diff --git a/Cargo.lock b/Cargo.lock index a8a60a28..06cf0c08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -317,7 +317,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.2", "hyper-util", "itoa", "matchit", @@ -478,6 +478,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block_on_proc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b872f3528eeeb4370ee73b51194dc1cd93680c2d0eb6c7a223889038d2c1a167" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "bounded-integer" version = "0.5.7" @@ -504,9 +514,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" dependencies = [ "serde", ] @@ -846,6 +856,20 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -1529,7 +1553,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" dependencies = [ "cfg-if", - "dashmap", + "dashmap 5.5.3", "futures", "futures-timer", "no-std-compat", @@ -1843,9 +1867,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.1" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +checksum = "256fb8d4bd6413123cc9d91832d78325c48ff41677595be797d90f42969beae0" dependencies = [ "bytes", "futures-channel", @@ -1869,7 +1893,7 @@ checksum = "5ee4be2c948921a1a5320b629c4193916ed787a7f7f293fd3f7f5a6c9de74155" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.4.1", + "hyper 1.5.2", "hyper-util", "rustls", "rustls-pki-types", @@ -1903,7 +1927,7 @@ dependencies = [ "futures-util", "http 1.1.0", "http-body 1.0.1", - "hyper 1.4.1", + "hyper 1.5.2", "pin-project-lite", "socket2", "tokio", @@ -2309,7 +2333,7 @@ dependencies = [ "anyhow", "erased_set", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.2", "hyper-util", "once_cell", "prometheus-client", @@ -2347,7 +2371,7 @@ dependencies = [ "hostname", "http 1.1.0", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.2", "hyper-util", "igd-next", "iroh-base", @@ -2991,9 +3015,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.19.0" +version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" [[package]] name = "oneshot" @@ -3364,18 +3388,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.5" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.5" +version = "1.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" dependencies = [ "proc-macro2", "quote", @@ -4067,7 +4091,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.2", "hyper-rustls", "hyper-util", "ipnet", @@ -4124,22 +4148,28 @@ dependencies = [ "async-nats", "async-trait", "axum", + "bytes", "ciborium", "clap", + "dashmap 6.1.0", "directories", "figment", + "futures", "futures-util", "hex", "loole", + "once_cell", "p2panda-blobs", "p2panda-core", "p2panda-net", "p2panda-sync", + "pin-project", "rand 0.8.5", "rhio-blobs", "rhio-core", "rust-s3", "serde", + "serde_json", "tokio", "tokio-stream", "tokio-util", @@ -4256,6 +4286,7 @@ dependencies = [ "aws-creds", "aws-region", "base64 0.22.1", + "block_on_proc", "bytes", "cfg-if", "futures", @@ -4522,9 +4553,9 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.215" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" +checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" dependencies = [ "serde_derive", ] @@ -4559,9 +4590,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.215" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" +checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 8a0e34cd..fdb506eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ futures-lite = "2.5.0" futures-util = "0.3.30" iroh-blobs = "0.25.0" iroh-io = { version = "0.6.1", features = ["x-http"] } -rust-s3 = { version = "0.35.1", features = ["tokio"] } +rust-s3 = { version = "0.35.1", features = ["tokio", "blocking"] } thiserror = "2.0.3" tokio = { version = "1.41.1", features = ["full"] } tokio-stream = "0.1.15" @@ -43,4 +43,8 @@ tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tempfile = "3.10.1" rusty-hook = "^0.11.2" +once_cell = "1.20.2" +dashmap = "6.1.0" +futures = { version = "0.3.28", default-features = false, features = ["std"] } +pin-project = "1.1.8" diff --git a/rhio/Cargo.toml b/rhio/Cargo.toml index 0fac89cf..7506eecd 100644 --- a/rhio/Cargo.toml +++ b/rhio/Cargo.toml @@ -41,6 +41,12 @@ tokio-stream.workspace = true tokio-util.workspace = true tracing.workspace = true tracing-subscriber.workspace = true +pin-project.workspace = true +bytes.workspace = true +futures.workspace = true [dev-dependencies] figment = { workspace = true, features = ["test"] } +once_cell.workspace = true +dashmap.workspace = true +serde_json.workspace = true diff --git a/rhio/src/config.rs b/rhio/src/config.rs index 2041ca40..e5bf2a3e 100644 --- a/rhio/src/config.rs +++ b/rhio/src/config.rs @@ -166,7 +166,7 @@ impl Default for S3Config { } } -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)] pub struct NatsConfig { pub endpoint: String, pub credentials: Option, @@ -181,7 +181,7 @@ impl Default for NatsConfig { } } -#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Hash)] pub struct NatsCredentials { pub nkey: Option, pub username: Option, diff --git a/rhio/src/context_builder.rs b/rhio/src/context_builder.rs index b9d0de8a..7acc5366 100644 --- a/rhio/src/context_builder.rs +++ b/rhio/src/context_builder.rs @@ -17,6 +17,10 @@ use crate::blobs::{blobs_config, Blobs}; use crate::config::PRIVATE_KEY_ENV; use crate::config::{load_config, Config}; use crate::health::run_http_server; +#[cfg(test)] +use crate::nats::client::fake::client::FakeNatsClient; +#[cfg(not(test))] +use crate::nats::client::nats::NatsClientImpl; use crate::network::sync::RhioSyncProtocol; use crate::network::Panda; use crate::tracing::setup_tracing; @@ -32,6 +36,7 @@ pub struct ContextBuilder { impl ContextBuilder { pub fn new(config: Config, private_key: PrivateKey) -> Self { + setup_tracing(config.log_level.clone()); ContextBuilder { public_key: private_key.public_key(), config, @@ -93,15 +98,15 @@ impl ContextBuilder { .context("failed to initialize Rhio node") })?; + // Launch HTTP server in separate runtime to not block rhio runtime. let http_runtime = Builder::new_current_thread() .enable_io() .thread_name("http-server") .build() .expect("http server tokio runtime"); - let cancellation_token = CancellationToken::new(); - // Launch HTTP server in separate runtime to not block rhio runtime. let http_handle = self.start_http_server(&http_runtime, cancellation_token.clone()); + Ok(Context::new( node, self.config.clone(), @@ -114,9 +119,16 @@ impl ContextBuilder { } async fn init_rhio_node(config: Config, private_key: PrivateKey) -> Result { - let nats = Nats::new(config.clone()).await?; + // 1. Depends on the context - tests or prod, we configure corresponding nats client. + #[cfg(not(test))] + let nats_client = NatsClientImpl::new(config.nats.clone()).await?; + #[cfg(test)] + let nats_client = FakeNatsClient::new(config.nats.clone())?; + + // 2. Configure Nats consumer streams management + let nats = Nats::new(nats_client).await?; - // 2. Configure rhio peer-to-peer network. + // 3. Configure rhio peer-to-peer network. let (node_config, p2p_network_config) = Node::configure_p2p_network(&config).await?; let blob_store = store_from_config(&config).await?; @@ -136,12 +148,12 @@ impl ContextBuilder { .sync(sync_config); let (watcher_tx, watcher_rx) = mpsc::channel(512); - // 3. Configure and set up blob store and connection handlers for blob replication. + // 4. Configure and set up blob store and connection handlers for blob replication. let (network, blobs, watcher_rx) = if config.s3.is_some() { let (network, blobs_handler) = BlobsHandler::from_builder_with_config(builder, blob_store.clone(), blobs_config()) .await?; - // 3.1. Start a service which watches the S3 buckets for changes. + // 4.1. we also start a service which watches the S3 buckets for changes. let blobs = Blobs::new(blob_store.clone(), blobs_handler, watcher_tx); (network, Some(blobs), watcher_rx) } else { @@ -158,6 +170,7 @@ impl ContextBuilder { .ok_or_else(|| anyhow!("socket is not bind to any interface"))?; let panda = Panda::new(network); + // 6. Creating rhio Node responsible for managing s3, nats and p2p network let options = NodeOptions { public_key: node_id, node_config, @@ -168,6 +181,7 @@ impl ContextBuilder { Node::new(nats, blobs, watcher_rx, panda, options).await } + /// Starts the HTTP server with health endpoint. fn start_http_server( &self, runtime: &Runtime, diff --git a/rhio/src/lib.rs b/rhio/src/lib.rs index 6f47dbbf..79d599f1 100644 --- a/rhio/src/lib.rs +++ b/rhio/src/lib.rs @@ -16,5 +16,8 @@ pub use topic::{ pub use node::rhio::Node; +#[cfg(test)] +mod tests; + pub(crate) type JoinErrToStr = Box String + Send + Sync + 'static>; diff --git a/rhio/src/nats/actor.rs b/rhio/src/nats/actor.rs index dfd14767..c6c8377c 100644 --- a/rhio/src/nats/actor.rs +++ b/rhio/src/nats/actor.rs @@ -1,15 +1,19 @@ use std::collections::HashMap; use anyhow::{Context, Result}; -use async_nats::jetstream::context::Publish; -use async_nats::jetstream::{consumer::DeliverPolicy, Context as JetstreamContext}; -use async_nats::{Client as NatsClient, HeaderMap}; +use async_nats::jetstream::consumer::DeliverPolicy; +use async_nats::HeaderMap; use rand::random; use rhio_core::{subjects_to_str, Subject}; use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error}; -use crate::nats::consumer::{Consumer, ConsumerId, JetStreamEvent, StreamName}; +use crate::nats::{ + client::types::NatsClient, + consumer::{Consumer, ConsumerId, JetStreamEvent, StreamName}, +}; + +use super::client::types::NatsMessageStream; pub enum ToNatsActor { Publish { @@ -86,19 +90,25 @@ pub enum ToNatsActor { }, } -pub struct NatsActor { +pub struct NatsActor +where + M: NatsMessageStream + Send + Sync + Unpin + 'static, + N: NatsClient, +{ inbox: mpsc::Receiver, - nats_jetstream: JetstreamContext, - consumers: HashMap, + nats_client: Box, + consumers: HashMap>, } -impl NatsActor { - pub fn new(nats_client: NatsClient, inbox: mpsc::Receiver) -> Self { - let nats_jetstream = async_nats::jetstream::new(nats_client.clone()); - +impl NatsActor +where + M: NatsMessageStream + Send + Sync + Unpin + 'static, + N: NatsClient, +{ + pub fn new(nats_client: Box, inbox: mpsc::Receiver) -> Self { Self { inbox, - nats_jetstream, + nats_client, consumers: HashMap::new(), } } @@ -190,17 +200,9 @@ impl NatsActor { ) -> Result<()> { debug!(%subject, ?payload, bytes = payload.len(), "publish NATS message"); - let mut publish = Publish::build().payload(payload.into()); - if let Some(headers) = headers { - publish = publish.headers(headers); - } - let server_ack = self.nats_jetstream.send_publish(subject, publish).await?; - - // Wait until the server confirmed receiving this message, to make sure it got delivered - // and persisted. - if wait_for_ack { - server_ack.await.context("publish message to nats server")?; - } + self.nats_client + .publish(wait_for_ack, subject, payload.into(), headers) + .await?; Ok(()) } @@ -225,15 +227,17 @@ impl NatsActor { stream_name.clone(), format!("{filter_subjects_str}-{random_id}"), ); + let (messages, consumer_info) = self + .nats_client + .create_consumer_stream( + stream_name.clone(), + filter_subjects.clone(), + deliver_policy, + ) + .await?; - let mut consumer = Consumer::new( - &self.nats_jetstream, - stream_name, - filter_subjects, - deliver_policy, - topic_id, - ) - .await?; + let mut consumer = + Consumer::new(messages, consumer_info, stream_name, topic_id).await?; let rx = consumer.subscribe(); self.consumers.insert(consumer_id.clone(), consumer); @@ -246,14 +250,17 @@ impl NatsActor { let rx = match self.consumers.get_mut(&consumer_id) { Some(consumer) => consumer.subscribe(), None => { - let mut consumer = Consumer::new( - &self.nats_jetstream, - stream_name, - filter_subjects, - deliver_policy, - topic_id, - ) - .await?; + let (messages, consumer_info) = self + .nats_client + .create_consumer_stream( + stream_name.clone(), + filter_subjects.clone(), + deliver_policy, + ) + .await?; + + let mut consumer = + Consumer::new(messages, consumer_info, stream_name, topic_id).await?; let rx = consumer.subscribe(); self.consumers.insert(consumer_id.clone(), consumer); rx diff --git a/rhio/src/nats/client/fake/blocking.rs b/rhio/src/nats/client/fake/blocking.rs new file mode 100644 index 00000000..fc9923d5 --- /dev/null +++ b/rhio/src/nats/client/fake/blocking.rs @@ -0,0 +1,132 @@ +use crate::nats::HeaderMap; +use anyhow::Result; +use async_nats::jetstream::consumer::DeliverPolicy; +use async_nats::Message; +use bytes::Bytes; +use rhio_core::Subject; +use std::marker::PhantomData; +use std::sync::Arc; +use tokio::runtime::Runtime; + +use crate::StreamName; + +use super::{ + super::types::{NatsClient, NatsMessageStream}, + client::Consumer, +}; + +/// A blocking client for interacting with NATS. +/// +/// This client wraps an asynchronous NATS client and provides blocking +/// methods for publishing messages and creating consumers. +/// +/// # Type Parameters +/// +/// * `T` - The type of the NATS client. +/// * `M` - The type of the NATS message stream. +/// +/// # Methods +/// +/// * `new(inner: T, runtime: Arc) -> Self` +/// - Creates a new `BlockingClient`. +/// +/// * `publish(&self, subject: String, payload: Bytes, headers: Option) -> Result<()>` +/// - Publishes a message to the given subject with the specified payload and headers. +/// +/// * `create_consumer(&self, stream_name: StreamName, filter_subjects: Vec, _deliver_policy: DeliverPolicy) -> Result>` +/// - Creates a new consumer for the specified stream and subjects with the given delivery policy. +/// +/// A blocking consumer for receiving messages from NATS. +/// +/// This consumer wraps an asynchronous consumer and provides a blocking +/// method for receiving messages. +/// +/// # Type Parameters +/// +/// * `M` - The type of the NATS message stream. +/// +/// # Fields +/// +/// * `consumer` - The inner asynchronous consumer. +/// * `runtime` - The Tokio runtime used for blocking operations. +/// +/// # Methods +/// +/// * `new(consumer: Consumer, runtime: Arc) -> BlockingConsumer` +/// - Creates a new `BlockingConsumer`. +/// +/// * `recv_count(&mut self, timeout: std::time::Duration, count: usize) -> Result>` +/// - Receives a specified number of messages with a timeout. +pub struct BlockingClient +where + T: NatsClient, + M: NatsMessageStream, +{ + /// The inner asynchronous NATS client. + inner: T, + /// The Tokio runtime used for blocking operations. + runtime: Arc, + phantom: PhantomData, +} + +impl BlockingClient +where + T: NatsClient, + M: NatsMessageStream, +{ + pub fn new(inner: T, runtime: Arc) -> Self { + Self { + inner, + runtime, + phantom: PhantomData, + } + } + + pub fn publish( + &self, + subject: String, + payload: Bytes, + headers: Option, + ) -> Result<()> { + self.runtime + .block_on(async { self.inner.publish(false, subject, payload, headers).await }) + } + + pub fn create_consumer( + &self, + stream_name: StreamName, + filter_subjects: Vec, + _deliver_policy: DeliverPolicy, + ) -> Result> { + self.runtime.block_on(async { + let (messages, _) = self + .inner + .create_consumer_stream(stream_name, filter_subjects, _deliver_policy) + .await?; + Ok(BlockingConsumer::new( + Consumer::new(messages), + self.runtime.clone(), + )) + }) + } +} + +pub struct BlockingConsumer { + consumer: Consumer, + runtime: Arc, +} + +impl BlockingConsumer { + pub fn new(consumer: Consumer, runtime: Arc) -> BlockingConsumer { + BlockingConsumer { consumer, runtime } + } + + pub fn recv_count( + &mut self, + timeout: std::time::Duration, + count: usize, + ) -> Result> { + self.runtime + .block_on(async { self.consumer.recv_timeout(timeout, count).await }) + } +} diff --git a/rhio/src/nats/client/fake/client.rs b/rhio/src/nats/client/fake/client.rs new file mode 100644 index 00000000..03fcc090 --- /dev/null +++ b/rhio/src/nats/client/fake/client.rs @@ -0,0 +1,244 @@ +use crate::config::NatsConfig; +use crate::nats::client::fake::server::TEST_FAKE_SERVER; +use crate::nats::HeaderMap; +use anyhow::{Context as AnyhowContext, Result}; +use async_nats::jetstream::consumer::push::MessagesError; +use async_nats::jetstream::consumer::DeliverPolicy; +use async_nats::jetstream::consumer::{Info, SequenceInfo}; +use async_nats::Message; +use async_trait::async_trait; +use bytes::Bytes; +use futures::StreamExt; +use loole::RecvStream; +use pin_project::pin_project; +use pin_project::pinned_drop; +use rand::random; +use rhio_core::Subject; +use s3::creds::time::OffsetDateTime; +use std::pin::Pin; +use std::str::FromStr; +use std::sync::Arc; +use tokio::pin; +use tracing::info; + +use crate::StreamName; + +use super::super::types::{NatsClient, NatsMessageStream}; +use super::server::{FakeNatsServer, FakeSubscription}; + +/// `FakeNatsClient` is a mock implementation of a NATS client used for testing purposes. +/// It interacts with a `FakeNatsServer` to simulate NATS server behavior. +/// +/// # Fields +/// - `client_id`: A unique identifier for the client. +/// - `server`: A reference-counted pointer to the `FakeNatsServer`. +/// +/// # Methods +/// - `new(config: NatsConfig) -> Result`: +/// Creates a new `FakeNatsClient` instance with a unique client ID and associates it with a `FakeNatsServer`. +/// +/// # Trait Implementations +/// Implements the `NatsClient` trait for `FakeNatsClient`: +/// - `create_consumer_stream(&self, stream_name: StreamName, filter_subjects: Vec, deliver_policy: DeliverPolicy) -> Result<(FakeNatsMessages, Info)>`: +/// Creates a consumer stream with the specified parameters and returns a `FakeNatsMessages` stream and consumer info. +/// +/// - `publish(&self, _wait_for_ack: bool, subject: String, payload: Bytes, headers: Option) -> Result<()>`: +/// Publishes a message to the specified subject with optional headers. +pub struct FakeNatsClient { + client_id: String, + server: Arc, +} + +impl FakeNatsClient { + pub fn new(config: NatsConfig) -> Result { + let client_id = format!("rhio-{}", random::().to_string()); + info!("creating client {client_id} for config {config:?}"); + + let server = TEST_FAKE_SERVER + .entry(config) + .or_insert_with(|| Arc::new(FakeNatsServer::new())) + .value() + .clone(); + Ok(FakeNatsClient { client_id, server }) + } +} + +#[async_trait] +impl NatsClient for FakeNatsClient { + async fn create_consumer_stream( + &self, + stream_name: StreamName, + filter_subjects: Vec, + deliver_policy: DeliverPolicy, + ) -> Result<(FakeNatsMessages, Info)> { + let (subscription, receiver) = self + .server + .add_subscription(self.client_id.clone(), filter_subjects, deliver_policy) + .await + .context("FakeNatsClient: create consumer stream")?; + + let info = to_fake_consumer_info(&self.client_id, &stream_name); + Ok(( + FakeNatsMessages { + messages: receiver.into_stream(), + server: self.server.clone(), + client_id: self.client_id.clone(), + subscription, + }, + info, + )) + } + + async fn publish( + &self, + _wait_for_ack: bool, + subject: String, + payload: Bytes, + headers: Option, + ) -> Result<()> { + let message = to_nats_message(subject.clone(), payload, headers); + let subject = Subject::from_str(&subject)?; + + self.server + .publish(subject, message) + .await + .context("FakeNatsClient: publish message to server")?; + + Ok(()) + } +} + +#[pin_project(PinnedDrop)] +pub struct FakeNatsMessages { + #[pin] + messages: RecvStream>, + server: Arc, + client_id: String, + subscription: FakeSubscription, +} + +impl NatsMessageStream for FakeNatsMessages {} + +impl futures::Stream for FakeNatsMessages { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.project(); + this.messages.poll_next(cx) + } +} + +#[pinned_drop] +impl PinnedDrop for FakeNatsMessages { + fn drop(self: Pin<&mut Self>) { + let this = self.project(); + this.server + .remove_subscription(this.client_id, &this.subscription); + } +} + +/// `Consumer` is a struct that wraps around a stream of NATS messages and provides handy methods to receive messages with a timeout. +/// +/// # Type Parameters +/// - `M`: A type that implements the `NatsMessageStream` trait, representing a stream of NATS messages. +/// +/// # Fields +/// - `messages`: A pinned boxed stream of NATS messages. +/// +/// # Methods +/// - `new(messages: M) -> Consumer`: +/// Creates a new `Consumer` instance with the provided message stream. +/// +/// - `recv_timeout(&mut self, timeout: std::time::Duration, count: usize) -> Result>`: +/// Receives a specified number of messages from the stream within a given timeout duration. If the timeout is reached before the specified number of messages are received, an error is returned. +/// +/// - `recv_count(&mut self, count: usize) -> Result>`: +/// Receives a specified number of messages from the stream. This method is used internally by `recv_timeout`. +/// +/// # Example +/// ```rust +/// use std::time::Duration; +/// use async_nats::Message; +/// use anyhow::Result; +/// +/// async fn example(mut consumer: Consumer) -> Result<()> { +/// let messages: Vec = consumer.recv_timeout(Duration::from_secs(5), 10).await?; +/// for message in messages { +/// println!("Received message: {:?}", message); +/// } +/// Ok(()) +/// } +/// ``` +pub struct Consumer { + messages: Pin>, +} + +impl Consumer { + pub fn new(messages: M) -> Consumer { + Consumer { + messages: Box::pin(messages), + } + } + + pub async fn recv_timeout( + &mut self, + timeout: std::time::Duration, + count: usize, + ) -> Result> { + tokio::time::timeout(timeout, async { self.recv_count(count).await }).await? + } + + async fn recv_count(&mut self, count: usize) -> Result> { + let mut result = vec![]; + while let Some(maybe_message) = self.messages.next().await { + let message = + maybe_message.context("Consumer: receiving message from fake message stream")?; + result.push(message); + if result.len() >= count { + break; + } + } + + Ok(result) + } +} + +fn to_fake_consumer_info(client_id: &String, stream_name: &String) -> Info { + Info { + stream_name: stream_name.clone(), + name: client_id.clone(), + created: OffsetDateTime::now_utc(), + config: Default::default(), + delivered: SequenceInfo { + consumer_sequence: 0, + stream_sequence: 0, + last_active: None, + }, + ack_floor: SequenceInfo { + consumer_sequence: 0, + stream_sequence: 0, + last_active: None, + }, + num_ack_pending: 0, + num_redelivered: 0, + num_waiting: 0, + num_pending: 0, + cluster: None, + push_bound: false, + } +} + +fn to_nats_message(subject: String, data: Bytes, headers: Option) -> Message { + Message { + subject: async_nats::Subject::from(subject.clone()), + reply: None, + payload: data, + headers, + status: None, + description: None, + length: 0, + } +} diff --git a/rhio/src/nats/client/fake/mod.rs b/rhio/src/nats/client/fake/mod.rs new file mode 100644 index 00000000..c13ad691 --- /dev/null +++ b/rhio/src/nats/client/fake/mod.rs @@ -0,0 +1,3 @@ +pub mod blocking; +pub mod client; +pub mod server; diff --git a/rhio/src/nats/client/fake/server.rs b/rhio/src/nats/client/fake/server.rs new file mode 100644 index 00000000..fc1c8e76 --- /dev/null +++ b/rhio/src/nats/client/fake/server.rs @@ -0,0 +1,177 @@ +use crate::config::NatsConfig; +use anyhow::{Context as AnyhowContext, Result}; +use async_nats::jetstream::consumer::push::MessagesError; +use async_nats::jetstream::consumer::DeliverPolicy; +use async_nats::Message; +use dashmap::DashMap; +use loole::Receiver; +use loole::Sender; +use once_cell::sync::Lazy; +use rhio_core::Subject; +use std::str::FromStr; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::info; + +type ClientId = String; +type MessageSender = Sender>; +type MessageReceiver = Receiver>; + +pub static TEST_FAKE_SERVER: Lazy>> = + Lazy::new(|| DashMap::new()); + +/// Represents a subscription to a set of subjects in the fake NATS server. +/// +/// A `FakeSubscription` contains a unique subscriber ID and a list of subjects +/// that the subscriber is interested in. It provides functionality to check if +/// a given subject matches any of the filter subjects in the subscription. +/// +/// # Fields +/// +/// * `subscriber_id` - A unique identifier for the subscriber. +/// * `filter_subjects` - A list of subjects that the subscriber is interested in. +/// +/// # Methods +/// +/// * `match_subject` - Checks if a given subject matches any of the filter subjects in the subscription. +/// +#[derive(Clone, Default, Debug, PartialEq, Eq, Hash)] +pub struct FakeSubscription { + subscriber_id: u64, + filter_subjects: Vec, +} + +impl FakeSubscription { + fn match_subject(&self, subject: &Subject) -> bool { + self.filter_subjects + .iter() + .any(|filter_subject| filter_subject.is_matching(&subject)) + } +} + +/// Represents a fake NATS server for testing purposes. +/// +/// This server allows for the creation of subscriptions and the publishing of messages +/// to those subscriptions. It maintains a list of subscribers and their respective +/// subscriptions, and can distribute messages to the appropriate subscribers based on +/// the subject of the message. +/// +/// # Fields +/// +/// * `subscribers` - A map of client IDs to their respective subscriptions and message senders. +/// * `storage` - A mutex-protected vector of messages that have been published to the server. +/// * `subscription_ids` - An atomic counter for generating unique subscription IDs. +/// +/// # Methods +/// +/// * `new` - Creates a new instance of the fake NATS server. +/// * `add_subscription` - Adds a new subscription for a client with specified filter subjects and delivery policy. +/// * `publish_existing_messages` - Publishes existing messages to a subscriber based on their subscription. +/// * `remove_subscription` - Removes a subscription for a client. +/// * `publish` - Publishes a message to the server and distributes it to the appropriate subscribers. +/// * `persist_message` - Persists a message to the server's storage. +/// * `distribute_to_subscribers` - Distributes a message to the appropriate subscribers based on the subject. +/// +pub struct FakeNatsServer { + subscribers: DashMap>, + storage: Mutex>, + subscription_ids: AtomicU64, +} + +impl FakeNatsServer { + pub fn new() -> FakeNatsServer { + FakeNatsServer { + subscribers: DashMap::new(), + subscription_ids: AtomicU64::new(1), + storage: Mutex::new(vec![]), + } + } +} + +impl FakeNatsServer { + pub async fn add_subscription( + &self, + client_id: String, + filter_subjects: Vec, + deliver_policy: DeliverPolicy, + ) -> Result<(FakeSubscription, MessageReceiver)> { + let subscriber_id = self.subscription_ids.fetch_add(1, Ordering::AcqRel); + + info!(%client_id, ?subscriber_id, ?filter_subjects, "add message subscription to FakeNatsServer"); + + let (subscriber_tx, subscriber_rx) = loole::unbounded::>(); + let subscription = FakeSubscription { + subscriber_id, + filter_subjects, + }; + + if deliver_policy == DeliverPolicy::All { + self.publish_existing_messages(&subscription, subscriber_tx.clone()) + .await + .context("FakeNatsClient: Publishing existing messages for delivery policy ALL")?; + } + + self.subscribers + .entry(client_id) + .or_insert_with(DashMap::new) + .insert(subscription.clone(), subscriber_tx); + + Ok((subscription, subscriber_rx)) + } + + async fn publish_existing_messages( + &self, + subscription: &FakeSubscription, + subscriber_tx: Sender>, + ) -> Result<()> { + let storage = self.storage.lock().await; + for message in storage.iter() { + let subject: rhio_core::Subject = + rhio_core::Subject::from_str(message.subject.as_str())?; + if subscription.match_subject(&subject) { + subscriber_tx + .send(Ok(message.clone())) + .context("FakeNatsClient: Publishing messages")?; + } + } + Ok(()) + } + + pub fn remove_subscription(&self, client_id: &String, subscription: &FakeSubscription) { + info!(%client_id, client_id, ?subscription, "drop subscription from FakeNatsServer"); + if let Some(subscribers) = self.subscribers.get(client_id) { + subscribers.remove(&subscription); + } + } + + pub async fn publish(&self, subject: Subject, message: Message) -> Result<()> { + self.persist_message(&message).await; + self.distribute_to_subscribers(subject, message) + } + + async fn persist_message(&self, message: &Message) { + let mut storage = self.storage.lock().await; + storage.push(message.clone()); + drop(storage); + } + + fn distribute_to_subscribers(&self, subject: Subject, message: Message) -> Result<()> { + for subscription_entry in self.subscribers.iter() { + let subscriptions = subscription_entry.value(); + + for subscription in subscriptions.iter() { + if subscription.key().match_subject(&subject) { + subscription + .value() + .send(Ok(message.clone())) + .context(format!( + "FakeNatsClient: Send message to subscriber {:?}", + subscription.key() + ))?; + } + } + } + Ok(()) + } +} diff --git a/rhio/src/nats/client/mod.rs b/rhio/src/nats/client/mod.rs new file mode 100644 index 00000000..3841ee78 --- /dev/null +++ b/rhio/src/nats/client/mod.rs @@ -0,0 +1,5 @@ +pub mod nats; +pub mod types; + +#[cfg(test)] +pub mod fake; diff --git a/rhio/src/nats/client/nats.rs b/rhio/src/nats/client/nats.rs new file mode 100644 index 00000000..7360d96b --- /dev/null +++ b/rhio/src/nats/client/nats.rs @@ -0,0 +1,237 @@ +use anyhow::{bail, Context as AnyhowContext, Result}; +use async_nats::jetstream::consumer::push::MessagesError; +use async_nats::jetstream::consumer::push::{Config as ConsumerConfig, Messages}; +use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy}; +use async_nats::jetstream::consumer::{Info, PushConsumer}; +use async_nats::jetstream::context::Publish; +use async_nats::jetstream::Context as JetstreamContext; +use async_nats::Message; +use async_nats::{ConnectOptions, HeaderMap}; +use async_trait::async_trait; +use bytes::Bytes; +use pin_project::pin_project; +use rand::random; +use rhio_core::{subjects_to_str, Subject}; +use tracing::{span, trace, Level}; + +use crate::config::NatsCredentials; +use crate::{config, StreamName}; + +use super::types::{NatsClient, NatsMessageStream}; + +/// Implementation of the `NatsClient` trait for interacting with NATS JetStream. +/// +/// This struct provides methods to publish messages to NATS subjects and create consumers for +/// NATS streams. The consumers are push-based, ephemeral, and do not acknowledge messages, allowing +/// them to be replayed upon process restart. +/// +/// # Methods +/// +/// - `new`: Creates a new instance of `NatsClientImpl` by connecting to the NATS server. +/// - `publish`: Publishes a message to a NATS subject with optional headers and waits for an acknowledgment if specified. +/// - `create_consumer_stream`: Creates a push-based, ephemeral consumer for a NATS stream with specified filter subjects and delivery policy. +pub struct NatsClientImpl { + jetstream: JetstreamContext, +} + +impl NatsClientImpl { + #[allow(dead_code)] + pub async fn new(nats: config::NatsConfig) -> Result { + let nats_options = connect_options(nats.credentials.clone())?; + let nats_client = async_nats::connect_with_options(nats.endpoint.clone(), nats_options) + .await + .context(format!("connecting to NATS server {}", nats.endpoint))?; + + let jetstream = async_nats::jetstream::new(nats_client.clone()); + Ok(NatsClientImpl { jetstream }) + } +} + +#[async_trait] +impl NatsClient for NatsClientImpl { + async fn publish( + &self, + wait_for_ack: bool, + subject: String, + payload: Bytes, + headers: Option, + ) -> Result<()> { + let mut publish = Publish::build().payload(payload); + if let Some(headers) = headers { + publish = publish.headers(headers); + } + + let server_ack = self + .jetstream + .send_publish(subject, publish) + .await + .context("publish message to nats server")?; + + // Wait until the server confirmed receiving this message, to make sure it got delivered + // and persisted. + if wait_for_ack { + server_ack + .await + .context("acknowledgement of the published message")?; + } + + Ok(()) + } + /// Create a consumer for a NATS stream and returns its wrapper for testability purposes. + /// + /// The consumers used here are push-based, "un-acking" and ephemeral, meaning that no state of + /// the consumer is persisted on the NATS server and no message is marked as "read" to be able + /// to re-play them again when the process restarts. + /// + /// Since NATS streams are also used for persistance with their own wide range of limit + /// configurations, rhio does not create any streams automatically but merely consumes them. + /// This allows rhio operators to have full flexibility over the nature of the stream. This is + /// why for every published subject a "stream name" needs to be mentioned. + async fn create_consumer_stream( + &self, + stream_name: StreamName, + filter_subjects: Vec, + deliver_policy: DeliverPolicy, + ) -> Result<(NatsMessages, Info)> { + let mut consumer: PushConsumer = self + .jetstream + // Streams need to already be created on the server, if not, this method will fail + // here. Note that no checks are applied here for validating if the NATS stream + // configuration is compatible with rhio's design. + .get_stream(&stream_name) + .await + .context(format!( + "create or get '{}' stream from nats server", + stream_name, + ))? + .create_consumer(ConsumerConfig { + // Setting a delivery subject is crucial for making this consumer push-based. We + // need to create a push based consumer as pull-based ones are required to + // explicitly acknowledge messages. + // + // @NOTE(adz): Unclear to me what this really does other than it is required to be + // set for push-consumers? The documentation says: "The subject to deliver messages + // to. Setting this field decides whether the consumer is push or pull-based. With + // a deliver subject, the server will push messages to clients subscribed to this + // subject." https://docs.nats.io/nats-concepts/jetstream/consumers#push-specific + // + // .. it seems to not matter what the value inside this field is, we will still + // receive all messages from that stream, optionally filtered by "filter_subject"? + deliver_subject: { + // @NOTE(adz): Another thing I couldn't find documented was that if this + // delivery subject is the same across consumers, they'll all consume messages + // at the same time, which we avoid here by giving each consumer an unique, + // random identifier: + let random_deliver_subject: u32 = random(); + format!("rhio-{random_deliver_subject}") + }, + // For rhio two different delivery policies are configured: + // + // 1. Live-Mode: We're only interested in _upcoming_ messages as this consumer will + // only be used to forward NATS messages into the gossip overlay. This happens + // when a rhio node decided to "publish" a NATS subject, the created consumer + // lives as long as the process. + // 2. Sync-Session: Here we want to load and exchange _past_ messages, usually + // loading all messages from after a given timestamp. This happens when a remote + // rhio node requests data from a NATS subject from us, the created consumer + // lives as long as the sync session with this remote peer. + // TODO (konstantin) the sync session case needs to be tested + deliver_policy, + // We filter the given stream based on this subject filter, like this we can have + // different "views" on the same stream. + filter_subjects: filter_subjects + .iter() + .map(|subject| subject.to_string()) + .collect(), + // This is an ephemeral consumer which will not be persisted on the server / the + // progress of the consumer will not be remembered. We do this by _not_ setting + // "durable_name". + durable_name: None, + // Do _not_ acknowledge every incoming message, as we want to receive them _again_ + // after rhio got restarted. The to-be-consumed stream needs to accommodate for + // this setting and accept an unlimited amount of un-acked message deliveries. + ack_policy: AckPolicy::None, + ..Default::default() + }) + .await + .context(format!( + "create ephemeral jetstream consumer for '{}' stream", + stream_name, + ))?; + + // Retrieve info about the consumer to learn how many messages are currently persisted on + // the server (number of "pending messages"). These are the messages we need to download + // first before we can continue. + let consumer_info = consumer.info().await?.clone(); + let consumer_name = consumer_info.name.clone(); + let num_pending = consumer_info.num_pending; + + let span = span!(Level::TRACE, "consumer", id = %consumer_name); + let deliver_policy_str = match deliver_policy { + DeliverPolicy::All => "all", + DeliverPolicy::New => "new", + _ => unimplemented!(), + }; + let filter_subjects_str = subjects_to_str(filter_subjects); + trace!( + parent: &span, + stream = %stream_name, + subject = %filter_subjects_str, + deliver_policy = deliver_policy_str, + num_pending = num_pending, + "create consumer for NATS" + ); + + let messages = consumer.messages().await.context("get message stream")?; + + Ok((NatsMessages { messages }, consumer_info)) + } +} + +#[pin_project] +pub struct NatsMessages { + #[pin] + messages: Messages, +} + +impl NatsMessageStream for NatsMessages {} + +impl futures::Stream for NatsMessages { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.project(); + this.messages.poll_next(cx).map_ok(|message_with_context| { + // We decouple async message from the inner static message contents + // for testability purposes. + // The `context` from async message is not used later anymore. + let (msg, _) = message_with_context.split(); + msg + }) + } +} + +fn connect_options(config: Option) -> Result { + let Some(credentials) = config else { + return Ok(ConnectOptions::default()); + }; + + let options = match ( + credentials.nkey, + credentials.token, + credentials.username, + credentials.password, + ) { + (Some(nkey), None, None, None) => ConnectOptions::with_nkey(nkey), + (None, Some(token), None, None) => ConnectOptions::with_token(token), + (None, None, Some(username), Some(password)) => { + ConnectOptions::with_user_and_password(username, password) + } + _ => bail!("ambigious nats credentials configuration"), + }; + + Ok(options) +} diff --git a/rhio/src/nats/client/types.rs b/rhio/src/nats/client/types.rs new file mode 100644 index 00000000..3f02b7df --- /dev/null +++ b/rhio/src/nats/client/types.rs @@ -0,0 +1,70 @@ +use anyhow::Result; +use async_nats::jetstream::consumer::push::MessagesError; +use async_nats::jetstream::consumer::{DeliverPolicy, Info}; +use async_nats::HeaderMap; +use async_nats::Message; +use async_trait::async_trait; +use bytes::Bytes; +use futures::Stream; +use rhio_core::Subject; + +use crate::StreamName; + +/// Represents a stream of NATS JetStream messages. +/// +/// This trait is used to define a stream of messages that can be consumed from a NATS JetStream consumer. +/// The stream yields `Result` items. +pub trait NatsMessageStream: Stream> + Sized {} + +/// A trait for a NATS client that interacts with NATS JetStream. +/// +/// This trait defines the necessary methods for creating a consumer stream and publishing messages to NATS. +/// +/// # Type Parameters +/// +/// * `M` - A type that implements the `NatsMessageStream` trait. +/// +/// # Methods +/// +/// * `create_consumer_stream` - Creates a consumer stream for a given stream name, filter subjects, and delivery policy. +/// * `publish` - Publishes a message to a given subject with an optional payload and headers. +#[async_trait] +pub trait NatsClient: Sized { + /// Creates a consumer stream for a given stream name, filter subjects, and delivery policy. + /// + /// # Arguments + /// + /// * `stream_name` - The name of the stream to consume from. + /// * `filter_subjects` - A vector of subjects to filter the messages. + /// * `deliver_policy` - The delivery policy for the consumer. + /// + /// # Returns + /// + /// A tuple containing the consumer stream and its information. + async fn create_consumer_stream( + &self, + stream_name: StreamName, + filter_subjects: Vec, + deliver_policy: DeliverPolicy, + ) -> Result<(M, Info)>; + + /// Publishes a message to a given subject with an optional payload and headers. + /// + /// # Arguments + /// + /// * `wait_for_ack` - Whether to wait for an acknowledgment from the server. + /// * `subject` - The subject to publish the message to. + /// * `payload` - The payload of the message. + /// * `headers` - Optional headers to include with the message. + /// + /// # Returns + /// + /// A result indicating success or failure. + async fn publish( + &self, + wait_for_ack: bool, + subject: String, + payload: Bytes, + headers: Option, + ) -> Result<()>; +} diff --git a/rhio/src/nats/consumer.rs b/rhio/src/nats/consumer.rs index a001b846..bf7c278c 100644 --- a/rhio/src/nats/consumer.rs +++ b/rhio/src/nats/consumer.rs @@ -1,15 +1,10 @@ -use anyhow::{Context, Result}; +use anyhow::Result; use async_nats::error::Error; -use async_nats::jetstream::consumer::push::{ - Config as ConsumerConfig, Messages, MessagesErrorKind, -}; -use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy, PushConsumer}; -use async_nats::jetstream::{Context as JetstreamContext, Message as MessageWithContext}; +use async_nats::jetstream::consumer::push::MessagesErrorKind; +use async_nats::jetstream::consumer::Info; use async_nats::Message as NatsMessage; use futures_util::future::{MapErr, Shared}; use futures_util::{FutureExt, TryFutureExt}; -use rand::random; -use rhio_core::{subjects_to_str, Subject}; use tokio::task::JoinError; use tokio_stream::StreamExt; use tokio_util::task::AbortOnDropHandle; @@ -17,6 +12,8 @@ use tracing::{error, span, trace, Level, Span}; use crate::JoinErrToStr; +use super::client::types::NatsMessageStream; + pub type StreamName = String; #[derive(Clone, Debug, Eq, PartialEq, Hash)] @@ -70,9 +67,12 @@ enum ConsumerStatus { /// what the limits (duration, size, interest) of the retention are. In rhio we use streams for /// permament storage: messages are kept forever (for now). Streams consume normal NATS subjects, /// any message published on those subjects will be captured in the defined storage system. -pub struct ConsumerActor { +pub struct ConsumerActor +where + M: NatsMessageStream + Unpin, +{ subscribers_tx: loole::Sender, - messages: Messages, + messages: M, num_pending: u64, status: ConsumerStatus, stream_name: StreamName, @@ -80,10 +80,13 @@ pub struct ConsumerActor { _span: Span, } -impl ConsumerActor { +impl ConsumerActor +where + M: NatsMessageStream + Unpin, +{ pub fn new( subscribers_tx: loole::Sender, - messages: Messages, + messages: M, num_pending: u64, stream_name: StreamName, topic_id: [u8; 32], @@ -127,7 +130,7 @@ impl ConsumerActor { async fn on_message( &mut self, - message: Result>, + message: Result>, ) -> Result<()> { if let Err(err) = self.on_message_inner(message).await { error!(parent: &self._span, "consuming nats stream failed: {err}"); @@ -146,13 +149,13 @@ impl ConsumerActor { async fn on_message_inner( &mut self, - message: Result>, + message: Result>, ) -> Result<()> { let message = message?; self.subscribers_tx.send(JetStreamEvent::Message { is_init: matches!(self.status, ConsumerStatus::Initializing), - message: message.message.clone(), + message: message.clone(), topic_id: self.topic_id, })?; @@ -179,22 +182,32 @@ impl ConsumerActor { } } -impl Drop for ConsumerActor { +impl Drop for ConsumerActor +where + M: NatsMessageStream + Unpin, +{ fn drop(&mut self) { trace!(parent: &self._span, "drop consumer"); } } #[derive(Debug, Clone)] -pub struct Consumer { +pub struct Consumer +where + M: NatsMessageStream + Send + Sync + Unpin + 'static, +{ #[allow(dead_code)] subscribers_tx: loole::Sender, subscribers_rx: loole::Receiver, #[allow(dead_code)] actor_handle: Shared, JoinErrToStr>>, + phantom: std::marker::PhantomData, } -impl Consumer { +impl Consumer +where + M: NatsMessageStream + Send + Sync + Unpin + 'static, +{ /// Create a consumer of a NATS stream. /// /// The consumers used here are push-based, "un-acking" and ephemeral, meaning that no state of @@ -206,102 +219,16 @@ impl Consumer { /// This allows rhio operators to have full flexibility over the nature of the stream. This is /// why for every published subject a "stream name" needs to be mentioned. pub async fn new( - context: &JetstreamContext, + messages: M, + consumer_info: Info, stream_name: StreamName, - filter_subjects: Vec, - deliver_policy: DeliverPolicy, topic_id: [u8; 32], ) -> Result { - let mut consumer: PushConsumer = context - // Streams need to already be created on the server, if not, this method will fail - // here. Note that no checks are applied here for validating if the NATS stream - // configuration is compatible with rhio's design. - .get_stream(&stream_name) - .await - .context(format!( - "create or get '{}' stream from nats server", - stream_name, - ))? - .create_consumer(ConsumerConfig { - // Setting a delivery subject is crucial for making this consumer push-based. We - // need to create a push based consumer as pull-based ones are required to - // explicitly acknowledge messages. - // - // @NOTE(adz): Unclear to me what this really does other than it is required to be - // set for push-consumers? The documentation says: "The subject to deliver messages - // to. Setting this field decides whether the consumer is push or pull-based. With - // a deliver subject, the server will push messages to clients subscribed to this - // subject." https://docs.nats.io/nats-concepts/jetstream/consumers#push-specific - // - // .. it seems to not matter what the value inside this field is, we will still - // receive all messages from that stream, optionally filtered by "filter_subject"? - deliver_subject: { - // @NOTE(adz): Another thing I couldn't find documented was that if this - // delivery subject is the same across consumers, they'll all consume messages - // at the same time, which we avoid here by giving each consumer an unique, - // random identifier: - let random_deliver_subject: u32 = random(); - format!("rhio-{random_deliver_subject}") - }, - // For rhio two different delivery policies are configured: - // - // 1. Live-Mode: We're only interested in _upcoming_ messages as this consumer will - // only be used to forward NATS messages into the gossip overlay. This happens - // when a rhio node decided to "publish" a NATS subject, the created consumer - // lives as long as the process. - // 2. Sync-Session: Here we want to load and exchange _past_ messages, usually - // loading all messages from after a given timestamp. This happens when a remote - // rhio node requests data from a NATS subject from us, the created consumer - // lives as long as the sync session with this remote peer. - deliver_policy, - // We filter the given stream based on this subject filter, like this we can have - // different "views" on the same stream. - filter_subjects: filter_subjects - .iter() - .map(|subject| subject.to_string()) - .collect(), - // This is an ephemeral consumer which will not be persisted on the server / the - // progress of the consumer will not be remembered. We do this by _not_ setting - // "durable_name". - durable_name: None, - // Do _not_ acknowledge every incoming message, as we want to receive them _again_ - // after rhio got restarted. The to-be-consumed stream needs to accommodate for - // this setting and accept an unlimited amount of un-acked message deliveries. - ack_policy: AckPolicy::None, - ..Default::default() - }) - .await - .context(format!( - "create ephemeral jetstream consumer for '{}' stream", - stream_name, - ))?; + let (subscribers_tx, subscribers_rx) = loole::bounded(256); - // Retrieve info about the consumer to learn how many messages are currently persisted on - // the server (number of "pending messages"). These are the messages we need to download - // first before we can continue. - let consumer_info = consumer.info().await?; let consumer_name = consumer_info.name.clone(); let num_pending = consumer_info.num_pending; - - let messages = consumer.messages().await.context("get message stream")?; - - let (subscribers_tx, subscribers_rx) = loole::bounded(256); - let span = span!(Level::TRACE, "consumer", id = %consumer_name); - let deliver_policy_str = match deliver_policy { - DeliverPolicy::All => "all", - DeliverPolicy::New => "new", - _ => unimplemented!(), - }; - let filter_subjects_str = subjects_to_str(filter_subjects); - trace!( - parent: &span, - stream = %stream_name, - subject = %filter_subjects_str, - deliver_policy = deliver_policy_str, - num_pending = num_pending, - "create consumer for NATS" - ); let consumer_actor = ConsumerActor::new( subscribers_tx.clone(), @@ -326,6 +253,7 @@ impl Consumer { subscribers_tx, subscribers_rx, actor_handle: actor_drop_handle, + phantom: std::marker::PhantomData, }) } diff --git a/rhio/src/nats/mod.rs b/rhio/src/nats/mod.rs index 42216ea2..d7116cf5 100644 --- a/rhio/src/nats/mod.rs +++ b/rhio/src/nats/mod.rs @@ -1,9 +1,11 @@ mod actor; +pub mod client; mod consumer; -use anyhow::{bail, Context, Result}; +use anyhow::Result; use async_nats::jetstream::consumer::DeliverPolicy; -use async_nats::{ConnectOptions, HeaderMap}; +use async_nats::HeaderMap; +use client::types::{NatsClient, NatsMessageStream}; use futures_util::future::{MapErr, Shared}; use futures_util::{FutureExt, TryFutureExt}; use rhio_core::Subject; @@ -12,7 +14,6 @@ use tokio::task::JoinError; use tokio_util::task::AbortOnDropHandle; use tracing::error; -use crate::config::{Config, NatsCredentials}; use crate::nats::actor::{NatsActor, ToNatsActor}; pub use crate::nats::consumer::{ConsumerId, JetStreamEvent, StreamName}; use crate::JoinErrToStr; @@ -25,19 +26,14 @@ pub struct Nats { } impl Nats { - pub async fn new(config: Config) -> Result { - let nats_options = connect_options(config.nats.credentials.clone())?; - let nats_client = - async_nats::connect_with_options(config.nats.endpoint.clone(), nats_options) - .await - .context(format!( - "connecting to NATS server {}", - config.nats.endpoint - ))?; - + pub async fn new(client: N) -> Result + where + M: NatsMessageStream + Send + Sync + Unpin + 'static, + N: NatsClient + 'static + Send + Sync, + { // Start the main NATS JetStream actor to dynamically maintain "stream consumers". let (nats_actor_tx, nats_actor_rx) = mpsc::channel(512); - let nats_actor = NatsActor::new(nats_client, nats_actor_rx); + let nats_actor = NatsActor::new(Box::new(client), nats_actor_rx); let actor_handle = tokio::task::spawn(async move { if let Err(err) = nats_actor.run().await { @@ -122,25 +118,3 @@ impl Nats { Ok(()) } } - -fn connect_options(config: Option) -> Result { - let Some(credentials) = config else { - return Ok(ConnectOptions::default()); - }; - - let options = match ( - credentials.nkey, - credentials.token, - credentials.username, - credentials.password, - ) { - (Some(nkey), None, None, None) => ConnectOptions::with_nkey(nkey), - (None, Some(token), None, None) => ConnectOptions::with_token(token), - (None, None, Some(username), Some(password)) => { - ConnectOptions::with_user_and_password(username, password) - } - _ => bail!("ambigious nats credentials configuration"), - }; - - Ok(options) -} diff --git a/rhio/src/tests/configuration.rs b/rhio/src/tests/configuration.rs new file mode 100644 index 00000000..434178b9 --- /dev/null +++ b/rhio/src/tests/configuration.rs @@ -0,0 +1,238 @@ +use anyhow::Context; +use once_cell::sync::Lazy; +use p2panda_core::{PrivateKey, PublicKey}; +use rhio_core::Subject; +use std::{ + path::PathBuf, + str::FromStr, + sync::{ + atomic::{AtomicU16, Ordering}, + Arc, + }, +}; +use tokio::runtime::Builder; +use tracing::info; + +use crate::{ + config::{ + Config, LocalNatsSubject, NatsConfig, PublishConfig, RemoteNatsSubject, S3Config, + SubscribeConfig, + }, + nats::client::fake::{ + blocking::BlockingClient, + client::{FakeNatsClient, FakeNatsMessages}, + }, +}; + +use super::fake_rhio_server::FakeRhioServer; +use anyhow::Result; + +/// A structure representing the setup for a two-cluster messaging system. +/// +/// This setup includes two instances of `FakeRhioServer` and two instances of +/// `BlockingClient` with `FakeNatsClient` and `FakeNatsMessages`. +/// +pub struct TwoClusterMessagingSetup { + pub(crate) rhio_source: FakeRhioServer, + pub(crate) rhio_target: FakeRhioServer, + + pub(crate) nats_source: BlockingClient, + pub(crate) nats_target: BlockingClient, +} + +/// Creates a two-node messaging setup for testing purposes. +/// +/// This function sets up two instances of `FakeRhioServer` and two instances of +/// `BlockingClient` with `FakeNatsClient` and `FakeNatsMessages`. It configures +/// the necessary network and message subscription settings for the nodes to +/// communicate with each other. +/// +/// # Returns +/// +/// A `Result` containing a `TwoClusterMessagingSetup` structure if the setup +/// is successful, or an `anyhow::Error` if an error occurs during the setup. +/// +/// # Example +/// +/// ```rust +/// let setup = create_two_node_messaging_setup().expect("Failed to create two-node messaging setup"); +/// ``` +/// +/// # Errors +/// +/// This function will return an error if there is an issue creating the NATS clients +/// or starting the `FakeRhioServer` instances. +pub fn create_two_node_messaging_setup() -> Result { + let nats_source_config = generate_nats_config(); + let nats_target_config = generate_nats_config(); + info!("nats source config {:?}", nats_source_config); + info!("nats target config {:?}", nats_target_config); + + let mut rhio_source_config = generate_rhio_config(&nats_source_config, &None); + let rhio_source_private_key = PrivateKey::new(); + + let mut rhio_target_config = generate_rhio_config(&nats_target_config, &None); + let rhio_target_private_key = PrivateKey::new(); + + configure_network(vec![ + (&mut rhio_source_config, &rhio_source_private_key), + (&mut rhio_target_config, &rhio_target_private_key), + ]); + + info!("rhio source config {:?} ", rhio_source_config.node); + info!("rhio target config {:?} ", rhio_target_config.node); + + configure_message_subscription( + &mut rhio_source_config, + &rhio_source_private_key.public_key(), + &mut rhio_target_config, + &"test-stream", + &"test.subject1", + ); + + let test_runtime = Arc::new( + Builder::new_multi_thread() + .enable_io() + .enable_time() + .thread_name("test-runtime") + .worker_threads(5) + .build() + .expect("test tokio runtime"), + ); + + let nats_source = BlockingClient::new( + FakeNatsClient::new(rhio_source_config.nats.clone()).context("Source FakeNatsClient")?, + test_runtime.clone(), + ); + let nats_target = BlockingClient::new( + FakeNatsClient::new(rhio_target_config.nats.clone()).context("Target FakeNatsClient")?, + test_runtime, + ); + + let rhio_source = + FakeRhioServer::try_start(rhio_source_config.clone(), rhio_source_private_key.clone()) + .context("Source RhioServer")?; + let rhio_target = + FakeRhioServer::try_start(rhio_target_config.clone(), rhio_target_private_key.clone()) + .context("Target RhioServer")?; + + let setup = TwoClusterMessagingSetup { + rhio_source, + rhio_target, + nats_source, + nats_target, + }; + + Ok(setup) +} + +static TEST_INSTANCE_HTTP_PORT: Lazy = Lazy::new(|| AtomicU16::new(8080)); +static TEST_INSTANCE_RHIO_PORT: Lazy = Lazy::new(|| AtomicU16::new(31000)); + +pub fn generate_rhio_config(nats_config: &NatsConfig, s3_config: &Option) -> Config { + let http_port = TEST_INSTANCE_HTTP_PORT.fetch_add(1, Ordering::SeqCst); + let rhio_port = TEST_INSTANCE_RHIO_PORT.fetch_add(1, Ordering::SeqCst); + + let mut config = Config::default(); + config.s3 = s3_config.clone(); + config.nats = nats_config.clone(); + config.node.bind_port = rhio_port; + config.node.http_bind_port = http_port; + config.node.known_nodes = vec![]; + config.node.private_key_path = PathBuf::from("/tmp/rhio_private_key"); + config.node.network_id = "test".to_string(); + config.log_level = Some("=INFO".to_string()); + config +} + +static TEST_INSTANCE_NATS_PORT: Lazy = Lazy::new(|| AtomicU16::new(4222)); + +pub fn generate_nats_config() -> NatsConfig { + let nats_port = TEST_INSTANCE_NATS_PORT.fetch_add(1, Ordering::SeqCst); + NatsConfig { + endpoint: format!("nats://localhost:{}", nats_port), + credentials: None, + } +} + +pub fn configure_network(nodes: Vec<(&mut Config, &PrivateKey)>) { + let mut nodes = nodes; + for i in 0..nodes.len() { + let mut known_nodes = vec![]; + for j in 0..nodes.len() { + if i != j { + let (node_config, private_key) = &nodes[j]; + known_nodes.push(crate::config::KnownNode { + public_key: private_key.public_key(), + direct_addresses: vec![format!("127.0.0.1:{}", node_config.node.bind_port)], + }); + } + } + nodes[i].0.node.known_nodes = known_nodes; + } +} + +/// Configures message subscription between a publisher and a subscriber. +/// +/// This function sets up the necessary configurations for a publisher to publish messages +/// to a specific subject and stream, and for a subscriber to subscribe to those messages. +/// +/// # Arguments +/// +/// * `publisher` - A mutable reference to the configuration of the publisher node. +/// * `publisher_pub_key` - The public key of the publisher node. +/// * `subscriber` - A mutable reference to the configuration of the subscriber node. +/// * `stream` - The name of the stream to which messages will be published. +/// * `subject` - The subject under which messages will be published and subscribed to. +/// +/// +/// # Example +/// +/// ```rust +/// let mut publisher_config = Config::default(); +/// let publisher_private_key = PrivateKey::new(); +/// let mut subscriber_config = Config::default(); +/// +/// configure_message_subscription( +/// &mut publisher_config, +/// &publisher_private_key.public_key(), +/// &mut subscriber_config, +/// "test-stream", +/// "test.subject1", +/// ); +/// ``` + +pub fn configure_message_subscription( + publisher: &mut Config, + publisher_pub_key: &PublicKey, + subscriber: &mut Config, + stream: &str, + subject: &str, +) { + if publisher.publish.is_none() { + publisher.publish = Some(PublishConfig { + s3_buckets: vec![], + nats_subjects: vec![], + }); + } + if subscriber.subscribe.is_none() { + subscriber.subscribe = Some(SubscribeConfig { + s3_buckets: vec![], + nats_subjects: vec![], + }); + } + if let Some(publish_config) = &mut publisher.publish { + publish_config.nats_subjects.push(LocalNatsSubject { + subject: Subject::from_str(subject).unwrap(), + stream_name: stream.into(), + }); + } + + if let Some(subscriber_config) = &mut subscriber.subscribe { + subscriber_config.nats_subjects.push(RemoteNatsSubject { + public_key: publisher_pub_key.clone(), + subject: Subject::from_str(subject).unwrap(), + stream_name: stream.into(), + }); + } +} diff --git a/rhio/src/tests/fake_rhio_server.rs b/rhio/src/tests/fake_rhio_server.rs new file mode 100644 index 00000000..8daa130c --- /dev/null +++ b/rhio/src/tests/fake_rhio_server.rs @@ -0,0 +1,33 @@ +use crate::{config::Config, context::Context, context_builder::ContextBuilder}; +use anyhow::Result; +use p2panda_core::PrivateKey; + +/// A fake server for testing purposes in the Rhio project. +/// +/// The `FakeRhioServer` struct provides methods to start and discard a fake server instance. +/// +/// # Fields +/// +/// * `context` - The context in which the server operates. +/// +/// # Methods +/// +/// * `try_start` - Attempts to start the fake server with the given configuration and private key. +/// * `discard` - Shuts down the fake server and cleans up resources. +pub struct FakeRhioServer { + context: Context, +} + +impl FakeRhioServer { + pub fn try_start(config: Config, private_key: PrivateKey) -> Result { + let builder = ContextBuilder::new(config, private_key); + let context = builder.try_build_and_start()?; + context.configure()?; + context.log_configuration(); + Ok(FakeRhioServer { context }) + } + + pub fn discard(self) -> Result<()> { + self.context.shutdown() + } +} diff --git a/rhio/src/tests/message_replication.rs b/rhio/src/tests/message_replication.rs new file mode 100644 index 00000000..a6781a13 --- /dev/null +++ b/rhio/src/tests/message_replication.rs @@ -0,0 +1,80 @@ +use std::{collections::HashSet, str::FromStr, time::Duration}; + +use crate::{ + tests::configuration::{create_two_node_messaging_setup, TwoClusterMessagingSetup}, + tracing::setup_tracing, +}; +use anyhow::Result; +use async_nats::{jetstream::consumer::DeliverPolicy, HeaderMap}; +use bytes::Bytes; +use rhio_core::Subject; +use tracing::info; + +#[test] +pub fn test_e2e_message_replication() -> Result<()> { + setup_tracing(Some("=INFO".into())); + + let TwoClusterMessagingSetup { + rhio_source, + rhio_target, + nats_source, + nats_target, + } = create_two_node_messaging_setup()?; + + // This timeout is quite arbitrary. Ideally, we need to wait till network between peers is established. + // It seems there is no simple way to learn this at the moment. + std::thread::sleep(Duration::from_secs(5)); + info!("environment started"); + + nats_source.publish("test.subject1".into(), "test message".into(), None)?; + + let mut consumer = nats_target.create_consumer( + "test-stream".into(), + vec![Subject::from_str("test.subject1")?], + DeliverPolicy::All, + )?; + + let messages = consumer.recv_count(Duration::from_secs(10), 1)?; + assert_eq!(messages.len(), 1); + + let message = messages.first().unwrap(); + assert_message( + "test message", + "test.subject1", + &vec!["X-Rhio-Signature", "X-Rhio-PublicKey"], + message, + ); + + rhio_source.discard()?; + rhio_target.discard()?; + Ok(()) +} + +fn assert_message( + expected_payload: &'static str, + expected_subject: &'static str, + expected_headers: &Vec<&str>, + actual: &async_nats::Message, +) { + let headers = actual + .headers + .clone() + .take() + .unwrap_or(HeaderMap::default()); + let actual_header_names = headers + .iter() + .map(|(name, _)| name.as_ref()) + .collect::>(); + + let expected_header_names = expected_headers + .iter() + .map(|a| *a) + .collect::>(); + + assert_eq!(expected_header_names, actual_header_names); + assert_eq!(Bytes::from(expected_payload), actual.payload); + assert_eq!( + async_nats::Subject::from_static(expected_subject), + actual.subject + ); +} diff --git a/rhio/src/tests/mod.rs b/rhio/src/tests/mod.rs new file mode 100644 index 00000000..c2da5d11 --- /dev/null +++ b/rhio/src/tests/mod.rs @@ -0,0 +1,3 @@ +pub mod configuration; +pub mod fake_rhio_server; +pub mod message_replication; From a6bd8ea49ca1de7d291c6b60238b6ed1e131f9cf Mon Sep 17 00:00:00 2001 From: ktatarnikov Date: Fri, 17 Jan 2025 11:33:33 +0100 Subject: [PATCH 3/3] [integration tests] addressing review comments --- rhio/src/nats/client/fake/server.rs | 16 +++++++++++----- rhio/src/nats/client/nats.rs | 24 ++++++++++++++---------- rhio/src/tests/configuration.rs | 9 ++++----- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/rhio/src/nats/client/fake/server.rs b/rhio/src/nats/client/fake/server.rs index fc1c8e76..46eb9162 100644 --- a/rhio/src/nats/client/fake/server.rs +++ b/rhio/src/nats/client/fake/server.rs @@ -1,4 +1,5 @@ use crate::config::NatsConfig; +use anyhow::bail; use anyhow::{Context as AnyhowContext, Result}; use async_nats::jetstream::consumer::push::MessagesError; use async_nats::jetstream::consumer::DeliverPolicy; @@ -106,10 +107,16 @@ impl FakeNatsServer { filter_subjects, }; - if deliver_policy == DeliverPolicy::All { - self.publish_existing_messages(&subscription, subscriber_tx.clone()) - .await - .context("FakeNatsClient: Publishing existing messages for delivery policy ALL")?; + match deliver_policy { + DeliverPolicy::All => { + self.publish_existing_messages(&subscription, subscriber_tx.clone()) + .await + .context( + "FakeNatsServer: Publishing existing messages for delivery policy ALL", + )?; + } + DeliverPolicy::New => (), + policy => bail!("FakeNatsServer: Unimplemented deliver policy {:?}", policy), } self.subscribers @@ -153,7 +160,6 @@ impl FakeNatsServer { async fn persist_message(&self, message: &Message) { let mut storage = self.storage.lock().await; storage.push(message.clone()); - drop(storage); } fn distribute_to_subscribers(&self, subject: Subject, message: Message) -> Result<()> { diff --git a/rhio/src/nats/client/nats.rs b/rhio/src/nats/client/nats.rs index 7360d96b..29ae178e 100644 --- a/rhio/src/nats/client/nats.rs +++ b/rhio/src/nats/client/nats.rs @@ -1,3 +1,5 @@ +use crate::config::NatsCredentials; +use crate::{config, StreamName}; use anyhow::{bail, Context as AnyhowContext, Result}; use async_nats::jetstream::consumer::push::MessagesError; use async_nats::jetstream::consumer::push::{Config as ConsumerConfig, Messages}; @@ -5,18 +7,14 @@ use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy}; use async_nats::jetstream::consumer::{Info, PushConsumer}; use async_nats::jetstream::context::Publish; use async_nats::jetstream::Context as JetstreamContext; -use async_nats::Message; +use async_nats::{Client, Message}; use async_nats::{ConnectOptions, HeaderMap}; use async_trait::async_trait; use bytes::Bytes; use pin_project::pin_project; -use rand::random; use rhio_core::{subjects_to_str, Subject}; use tracing::{span, trace, Level}; -use crate::config::NatsCredentials; -use crate::{config, StreamName}; - use super::types::{NatsClient, NatsMessageStream}; /// Implementation of the `NatsClient` trait for interacting with NATS JetStream. @@ -32,18 +30,19 @@ use super::types::{NatsClient, NatsMessageStream}; /// - `create_consumer_stream`: Creates a push-based, ephemeral consumer for a NATS stream with specified filter subjects and delivery policy. pub struct NatsClientImpl { jetstream: JetstreamContext, + client: Client, } impl NatsClientImpl { #[allow(dead_code)] pub async fn new(nats: config::NatsConfig) -> Result { let nats_options = connect_options(nats.credentials.clone())?; - let nats_client = async_nats::connect_with_options(nats.endpoint.clone(), nats_options) + let client = async_nats::connect_with_options(nats.endpoint.clone(), nats_options) .await .context(format!("connecting to NATS server {}", nats.endpoint))?; - let jetstream = async_nats::jetstream::new(nats_client.clone()); - Ok(NatsClientImpl { jetstream }) + let jetstream = async_nats::jetstream::new(client.clone()); + Ok(NatsClientImpl { jetstream, client }) } } @@ -122,8 +121,13 @@ impl NatsClient for NatsClientImpl { // delivery subject is the same across consumers, they'll all consume messages // at the same time, which we avoid here by giving each consumer an unique, // random identifier: - let random_deliver_subject: u32 = random(); - format!("rhio-{random_deliver_subject}") + // @NOTE(ktatarnikov): The async_nats library example of push consumer + // (https://github.com/nats-io/nats.rs/blob/main/async-nats/examples/jetstream_push.rs) + // uses `client.new_inbox()` method to generate deliver_subject. + // The method provides stronger guarantees (globally unique) then we used previously with `random`. + // https://docs.rs/async-nats/0.38.0/async_nats/client/struct.Client.html#method.new_inbox + // + self.client.new_inbox() }, // For rhio two different delivery policies are configured: // diff --git a/rhio/src/tests/configuration.rs b/rhio/src/tests/configuration.rs index 434178b9..05abcc9d 100644 --- a/rhio/src/tests/configuration.rs +++ b/rhio/src/tests/configuration.rs @@ -27,6 +27,10 @@ use crate::{ use super::fake_rhio_server::FakeRhioServer; use anyhow::Result; +static TEST_INSTANCE_HTTP_PORT: Lazy = Lazy::new(|| AtomicU16::new(8080)); +static TEST_INSTANCE_RHIO_PORT: Lazy = Lazy::new(|| AtomicU16::new(31000)); +static TEST_INSTANCE_NATS_PORT: Lazy = Lazy::new(|| AtomicU16::new(4222)); + /// A structure representing the setup for a two-cluster messaging system. /// /// This setup includes two instances of `FakeRhioServer` and two instances of @@ -126,9 +130,6 @@ pub fn create_two_node_messaging_setup() -> Result { Ok(setup) } -static TEST_INSTANCE_HTTP_PORT: Lazy = Lazy::new(|| AtomicU16::new(8080)); -static TEST_INSTANCE_RHIO_PORT: Lazy = Lazy::new(|| AtomicU16::new(31000)); - pub fn generate_rhio_config(nats_config: &NatsConfig, s3_config: &Option) -> Config { let http_port = TEST_INSTANCE_HTTP_PORT.fetch_add(1, Ordering::SeqCst); let rhio_port = TEST_INSTANCE_RHIO_PORT.fetch_add(1, Ordering::SeqCst); @@ -145,8 +146,6 @@ pub fn generate_rhio_config(nats_config: &NatsConfig, s3_config: &Option = Lazy::new(|| AtomicU16::new(4222)); - pub fn generate_nats_config() -> NatsConfig { let nats_port = TEST_INSTANCE_NATS_PORT.fetch_add(1, Ordering::SeqCst); NatsConfig {