From a6bd8ea49ca1de7d291c6b60238b6ed1e131f9cf Mon Sep 17 00:00:00 2001 From: ktatarnikov Date: Fri, 17 Jan 2025 11:33:33 +0100 Subject: [PATCH] [integration tests] addressing review comments --- rhio/src/nats/client/fake/server.rs | 16 +++++++++++----- rhio/src/nats/client/nats.rs | 24 ++++++++++++++---------- rhio/src/tests/configuration.rs | 9 ++++----- 3 files changed, 29 insertions(+), 20 deletions(-) diff --git a/rhio/src/nats/client/fake/server.rs b/rhio/src/nats/client/fake/server.rs index fc1c8e7..46eb916 100644 --- a/rhio/src/nats/client/fake/server.rs +++ b/rhio/src/nats/client/fake/server.rs @@ -1,4 +1,5 @@ use crate::config::NatsConfig; +use anyhow::bail; use anyhow::{Context as AnyhowContext, Result}; use async_nats::jetstream::consumer::push::MessagesError; use async_nats::jetstream::consumer::DeliverPolicy; @@ -106,10 +107,16 @@ impl FakeNatsServer { filter_subjects, }; - if deliver_policy == DeliverPolicy::All { - self.publish_existing_messages(&subscription, subscriber_tx.clone()) - .await - .context("FakeNatsClient: Publishing existing messages for delivery policy ALL")?; + match deliver_policy { + DeliverPolicy::All => { + self.publish_existing_messages(&subscription, subscriber_tx.clone()) + .await + .context( + "FakeNatsServer: Publishing existing messages for delivery policy ALL", + )?; + } + DeliverPolicy::New => (), + policy => bail!("FakeNatsServer: Unimplemented deliver policy {:?}", policy), } self.subscribers @@ -153,7 +160,6 @@ impl FakeNatsServer { async fn persist_message(&self, message: &Message) { let mut storage = self.storage.lock().await; storage.push(message.clone()); - drop(storage); } fn distribute_to_subscribers(&self, subject: Subject, message: Message) -> Result<()> { diff --git a/rhio/src/nats/client/nats.rs b/rhio/src/nats/client/nats.rs index 7360d96..29ae178 100644 --- a/rhio/src/nats/client/nats.rs +++ b/rhio/src/nats/client/nats.rs @@ -1,3 +1,5 @@ +use crate::config::NatsCredentials; +use crate::{config, StreamName}; use anyhow::{bail, Context as AnyhowContext, Result}; use async_nats::jetstream::consumer::push::MessagesError; use async_nats::jetstream::consumer::push::{Config as ConsumerConfig, Messages}; @@ -5,18 +7,14 @@ use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy}; use async_nats::jetstream::consumer::{Info, PushConsumer}; use async_nats::jetstream::context::Publish; use async_nats::jetstream::Context as JetstreamContext; -use async_nats::Message; +use async_nats::{Client, Message}; use async_nats::{ConnectOptions, HeaderMap}; use async_trait::async_trait; use bytes::Bytes; use pin_project::pin_project; -use rand::random; use rhio_core::{subjects_to_str, Subject}; use tracing::{span, trace, Level}; -use crate::config::NatsCredentials; -use crate::{config, StreamName}; - use super::types::{NatsClient, NatsMessageStream}; /// Implementation of the `NatsClient` trait for interacting with NATS JetStream. @@ -32,18 +30,19 @@ use super::types::{NatsClient, NatsMessageStream}; /// - `create_consumer_stream`: Creates a push-based, ephemeral consumer for a NATS stream with specified filter subjects and delivery policy. pub struct NatsClientImpl { jetstream: JetstreamContext, + client: Client, } impl NatsClientImpl { #[allow(dead_code)] pub async fn new(nats: config::NatsConfig) -> Result { let nats_options = connect_options(nats.credentials.clone())?; - let nats_client = async_nats::connect_with_options(nats.endpoint.clone(), nats_options) + let client = async_nats::connect_with_options(nats.endpoint.clone(), nats_options) .await .context(format!("connecting to NATS server {}", nats.endpoint))?; - let jetstream = async_nats::jetstream::new(nats_client.clone()); - Ok(NatsClientImpl { jetstream }) + let jetstream = async_nats::jetstream::new(client.clone()); + Ok(NatsClientImpl { jetstream, client }) } } @@ -122,8 +121,13 @@ impl NatsClient for NatsClientImpl { // delivery subject is the same across consumers, they'll all consume messages // at the same time, which we avoid here by giving each consumer an unique, // random identifier: - let random_deliver_subject: u32 = random(); - format!("rhio-{random_deliver_subject}") + // @NOTE(ktatarnikov): The async_nats library example of push consumer + // (https://github.com/nats-io/nats.rs/blob/main/async-nats/examples/jetstream_push.rs) + // uses `client.new_inbox()` method to generate deliver_subject. + // The method provides stronger guarantees (globally unique) then we used previously with `random`. + // https://docs.rs/async-nats/0.38.0/async_nats/client/struct.Client.html#method.new_inbox + // + self.client.new_inbox() }, // For rhio two different delivery policies are configured: // diff --git a/rhio/src/tests/configuration.rs b/rhio/src/tests/configuration.rs index 434178b..05abcc9 100644 --- a/rhio/src/tests/configuration.rs +++ b/rhio/src/tests/configuration.rs @@ -27,6 +27,10 @@ use crate::{ use super::fake_rhio_server::FakeRhioServer; use anyhow::Result; +static TEST_INSTANCE_HTTP_PORT: Lazy = Lazy::new(|| AtomicU16::new(8080)); +static TEST_INSTANCE_RHIO_PORT: Lazy = Lazy::new(|| AtomicU16::new(31000)); +static TEST_INSTANCE_NATS_PORT: Lazy = Lazy::new(|| AtomicU16::new(4222)); + /// A structure representing the setup for a two-cluster messaging system. /// /// This setup includes two instances of `FakeRhioServer` and two instances of @@ -126,9 +130,6 @@ pub fn create_two_node_messaging_setup() -> Result { Ok(setup) } -static TEST_INSTANCE_HTTP_PORT: Lazy = Lazy::new(|| AtomicU16::new(8080)); -static TEST_INSTANCE_RHIO_PORT: Lazy = Lazy::new(|| AtomicU16::new(31000)); - pub fn generate_rhio_config(nats_config: &NatsConfig, s3_config: &Option) -> Config { let http_port = TEST_INSTANCE_HTTP_PORT.fetch_add(1, Ordering::SeqCst); let rhio_port = TEST_INSTANCE_RHIO_PORT.fetch_add(1, Ordering::SeqCst); @@ -145,8 +146,6 @@ pub fn generate_rhio_config(nats_config: &NatsConfig, s3_config: &Option = Lazy::new(|| AtomicU16::new(4222)); - pub fn generate_nats_config() -> NatsConfig { let nats_port = TEST_INSTANCE_NATS_PORT.fetch_add(1, Ordering::SeqCst); NatsConfig {