Skip to content

Commit

Permalink
[integration tests] addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ktatarnikov committed Jan 17, 2025
1 parent ce493fa commit a6bd8ea
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 20 deletions.
16 changes: 11 additions & 5 deletions rhio/src/nats/client/fake/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::config::NatsConfig;
use anyhow::bail;
use anyhow::{Context as AnyhowContext, Result};
use async_nats::jetstream::consumer::push::MessagesError;
use async_nats::jetstream::consumer::DeliverPolicy;
Expand Down Expand Up @@ -106,10 +107,16 @@ impl FakeNatsServer {
filter_subjects,
};

if deliver_policy == DeliverPolicy::All {
self.publish_existing_messages(&subscription, subscriber_tx.clone())
.await
.context("FakeNatsClient: Publishing existing messages for delivery policy ALL")?;
match deliver_policy {
DeliverPolicy::All => {
self.publish_existing_messages(&subscription, subscriber_tx.clone())
.await
.context(
"FakeNatsServer: Publishing existing messages for delivery policy ALL",
)?;
}
DeliverPolicy::New => (),
policy => bail!("FakeNatsServer: Unimplemented deliver policy {:?}", policy),
}

self.subscribers
Expand Down Expand Up @@ -153,7 +160,6 @@ impl FakeNatsServer {
async fn persist_message(&self, message: &Message) {
let mut storage = self.storage.lock().await;
storage.push(message.clone());
drop(storage);
}

fn distribute_to_subscribers(&self, subject: Subject, message: Message) -> Result<()> {
Expand Down
24 changes: 14 additions & 10 deletions rhio/src/nats/client/nats.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
use crate::config::NatsCredentials;
use crate::{config, StreamName};
use anyhow::{bail, Context as AnyhowContext, Result};
use async_nats::jetstream::consumer::push::MessagesError;
use async_nats::jetstream::consumer::push::{Config as ConsumerConfig, Messages};
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
use async_nats::jetstream::consumer::{Info, PushConsumer};
use async_nats::jetstream::context::Publish;
use async_nats::jetstream::Context as JetstreamContext;
use async_nats::Message;
use async_nats::{Client, Message};
use async_nats::{ConnectOptions, HeaderMap};
use async_trait::async_trait;
use bytes::Bytes;
use pin_project::pin_project;
use rand::random;
use rhio_core::{subjects_to_str, Subject};
use tracing::{span, trace, Level};

use crate::config::NatsCredentials;
use crate::{config, StreamName};

use super::types::{NatsClient, NatsMessageStream};

/// Implementation of the `NatsClient` trait for interacting with NATS JetStream.
Expand All @@ -32,18 +30,19 @@ use super::types::{NatsClient, NatsMessageStream};
/// - `create_consumer_stream`: Creates a push-based, ephemeral consumer for a NATS stream with specified filter subjects and delivery policy.
pub struct NatsClientImpl {
jetstream: JetstreamContext,
client: Client,
}

impl NatsClientImpl {
#[allow(dead_code)]
pub async fn new(nats: config::NatsConfig) -> Result<Self> {
let nats_options = connect_options(nats.credentials.clone())?;
let nats_client = async_nats::connect_with_options(nats.endpoint.clone(), nats_options)
let client = async_nats::connect_with_options(nats.endpoint.clone(), nats_options)
.await
.context(format!("connecting to NATS server {}", nats.endpoint))?;

let jetstream = async_nats::jetstream::new(nats_client.clone());
Ok(NatsClientImpl { jetstream })
let jetstream = async_nats::jetstream::new(client.clone());
Ok(NatsClientImpl { jetstream, client })
}
}

Expand Down Expand Up @@ -122,8 +121,13 @@ impl NatsClient<NatsMessages> for NatsClientImpl {
// delivery subject is the same across consumers, they'll all consume messages
// at the same time, which we avoid here by giving each consumer an unique,
// random identifier:
let random_deliver_subject: u32 = random();
format!("rhio-{random_deliver_subject}")
// @NOTE(ktatarnikov): The async_nats library example of push consumer
// (https://github.com/nats-io/nats.rs/blob/main/async-nats/examples/jetstream_push.rs)
// uses `client.new_inbox()` method to generate deliver_subject.
// The method provides stronger guarantees (globally unique) then we used previously with `random`.
// https://docs.rs/async-nats/0.38.0/async_nats/client/struct.Client.html#method.new_inbox
//
self.client.new_inbox()
},
// For rhio two different delivery policies are configured:
//
Expand Down
9 changes: 4 additions & 5 deletions rhio/src/tests/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ use crate::{
use super::fake_rhio_server::FakeRhioServer;
use anyhow::Result;

static TEST_INSTANCE_HTTP_PORT: Lazy<AtomicU16> = Lazy::new(|| AtomicU16::new(8080));
static TEST_INSTANCE_RHIO_PORT: Lazy<AtomicU16> = Lazy::new(|| AtomicU16::new(31000));
static TEST_INSTANCE_NATS_PORT: Lazy<AtomicU16> = Lazy::new(|| AtomicU16::new(4222));

/// A structure representing the setup for a two-cluster messaging system.
///
/// This setup includes two instances of `FakeRhioServer` and two instances of
Expand Down Expand Up @@ -126,9 +130,6 @@ pub fn create_two_node_messaging_setup() -> Result<TwoClusterMessagingSetup> {
Ok(setup)
}

static TEST_INSTANCE_HTTP_PORT: Lazy<AtomicU16> = Lazy::new(|| AtomicU16::new(8080));
static TEST_INSTANCE_RHIO_PORT: Lazy<AtomicU16> = Lazy::new(|| AtomicU16::new(31000));

pub fn generate_rhio_config(nats_config: &NatsConfig, s3_config: &Option<S3Config>) -> Config {
let http_port = TEST_INSTANCE_HTTP_PORT.fetch_add(1, Ordering::SeqCst);
let rhio_port = TEST_INSTANCE_RHIO_PORT.fetch_add(1, Ordering::SeqCst);
Expand All @@ -145,8 +146,6 @@ pub fn generate_rhio_config(nats_config: &NatsConfig, s3_config: &Option<S3Confi
config
}

static TEST_INSTANCE_NATS_PORT: Lazy<AtomicU16> = Lazy::new(|| AtomicU16::new(4222));

pub fn generate_nats_config() -> NatsConfig {
let nats_port = TEST_INSTANCE_NATS_PORT.fetch_add(1, Ordering::SeqCst);
NatsConfig {
Expand Down

0 comments on commit a6bd8ea

Please sign in to comment.