Skip to content

Commit

Permalink
[integration tests] message replication tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ktatarnikov committed Jan 15, 2025
1 parent c989de5 commit db7c093
Show file tree
Hide file tree
Showing 20 changed files with 1,401 additions and 212 deletions.
73 changes: 52 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ futures-lite = "2.5.0"
futures-util = "0.3.30"
iroh-blobs = "0.25.0"
iroh-io = { version = "0.6.1", features = ["x-http"] }
rust-s3 = { version = "0.35.1", features = ["tokio"] }
rust-s3 = { version = "0.35.1", features = ["tokio", "blocking"] }
thiserror = "2.0.3"
tokio = { version = "1.41.1", features = ["full"] }
tokio-stream = "0.1.15"
Expand All @@ -43,4 +43,8 @@ tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tempfile = "3.10.1"
rusty-hook = "^0.11.2"
once_cell = "1.20.2"
dashmap = "6.1.0"
futures = { version = "0.3.28", default-features = false, features = ["std"] }
pin-project = "1.1.8"

6 changes: 6 additions & 0 deletions rhio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ tokio-stream.workspace = true
tokio-util.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
pin-project.workspace = true
bytes.workspace = true
futures.workspace = true

[dev-dependencies]
figment = { workspace = true, features = ["test"] }
once_cell.workspace = true
dashmap.workspace = true
serde_json.workspace = true
4 changes: 2 additions & 2 deletions rhio/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Default for S3Config {
}
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct NatsConfig {
pub endpoint: String,
pub credentials: Option<NatsCredentials>,
Expand All @@ -181,7 +181,7 @@ impl Default for NatsConfig {
}
}

#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Debug, Default, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub struct NatsCredentials {
pub nkey: Option<String>,
pub username: Option<String>,
Expand Down
26 changes: 20 additions & 6 deletions rhio/src/context_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ use crate::blobs::{blobs_config, Blobs};
use crate::config::PRIVATE_KEY_ENV;
use crate::config::{load_config, Config};
use crate::health::run_http_server;
#[cfg(test)]
use crate::nats::client::fake::client::FakeNatsClient;
#[cfg(not(test))]
use crate::nats::client::nats::NatsClientImpl;
use crate::network::sync::RhioSyncProtocol;
use crate::network::Panda;
use crate::tracing::setup_tracing;
Expand All @@ -32,6 +36,7 @@ pub struct ContextBuilder {

impl ContextBuilder {
pub fn new(config: Config, private_key: PrivateKey) -> Self {
setup_tracing(config.log_level.clone());
ContextBuilder {
public_key: private_key.public_key(),
config,
Expand Down Expand Up @@ -93,15 +98,15 @@ impl ContextBuilder {
.context("failed to initialize Rhio node")
})?;

// Launch HTTP server in separate runtime to not block rhio runtime.
let http_runtime = Builder::new_current_thread()
.enable_io()
.thread_name("http-server")
.build()
.expect("http server tokio runtime");

let cancellation_token = CancellationToken::new();
// Launch HTTP server in separate runtime to not block rhio runtime.
let http_handle = self.start_http_server(&http_runtime, cancellation_token.clone());

Ok(Context::new(
node,
self.config.clone(),
Expand All @@ -114,9 +119,16 @@ impl ContextBuilder {
}

async fn init_rhio_node(config: Config, private_key: PrivateKey) -> Result<Node> {
let nats = Nats::new(config.clone()).await?;
// 1. Depends on the context - tests or prod, we configure corresponding nats client.
#[cfg(not(test))]
let nats_client = NatsClientImpl::new(config.nats.clone()).await?;
#[cfg(test)]
let nats_client = FakeNatsClient::new(config.nats.clone())?;

// 2. Configure Nats consumer streams management
let nats = Nats::new(nats_client).await?;

// 2. Configure rhio peer-to-peer network.
// 3. Configure rhio peer-to-peer network.
let (node_config, p2p_network_config) = Node::configure_p2p_network(&config).await?;

let blob_store = store_from_config(&config).await?;
Expand All @@ -136,12 +148,12 @@ impl ContextBuilder {
.sync(sync_config);

let (watcher_tx, watcher_rx) = mpsc::channel(512);
// 3. Configure and set up blob store and connection handlers for blob replication.
// 4. Configure and set up blob store and connection handlers for blob replication.
let (network, blobs, watcher_rx) = if config.s3.is_some() {
let (network, blobs_handler) =
BlobsHandler::from_builder_with_config(builder, blob_store.clone(), blobs_config())
.await?;
// 3.1. Start a service which watches the S3 buckets for changes.
// 4.1. we also start a service which watches the S3 buckets for changes.
let blobs = Blobs::new(blob_store.clone(), blobs_handler, watcher_tx);
(network, Some(blobs), watcher_rx)
} else {
Expand All @@ -158,6 +170,7 @@ impl ContextBuilder {
.ok_or_else(|| anyhow!("socket is not bind to any interface"))?;
let panda = Panda::new(network);

// 6. Creating rhio Node responsible for managing s3, nats and p2p network
let options = NodeOptions {
public_key: node_id,
node_config,
Expand All @@ -168,6 +181,7 @@ impl ContextBuilder {
Node::new(nats, blobs, watcher_rx, panda, options).await
}

/// Starts the HTTP server with health endpoint.
fn start_http_server(
&self,
runtime: &Runtime,
Expand Down
3 changes: 3 additions & 0 deletions rhio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@ pub use topic::{

pub use node::rhio::Node;

#[cfg(test)]
mod tests;

pub(crate) type JoinErrToStr =
Box<dyn Fn(tokio::task::JoinError) -> String + Send + Sync + 'static>;
Loading

0 comments on commit db7c093

Please sign in to comment.