diff --git a/rhio/src/context.rs b/rhio/src/context.rs new file mode 100644 index 0000000..409fa63 --- /dev/null +++ b/rhio/src/context.rs @@ -0,0 +1,244 @@ +use std::collections::HashMap; + +use crate::{Node, StreamName}; + +use anyhow::{bail, Context as AnyhowContext, Result}; +use p2panda_core::PublicKey; +use tokio::runtime::Runtime; +use tokio::signal; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tracing::info; + +use crate::config::{Config, LocalNatsSubject, RemoteNatsSubject, RemoteS3Bucket}; +use crate::health::HTTP_HEALTH_ROUTE; +use crate::{ + FilesSubscription, FilteredMessageStream, MessagesSubscription, Publication, Subscription, +}; +use rhio_core::Subject; + +pub struct Context { + node: Node, + config: Config, + public_key: PublicKey, + http_handle: JoinHandle>, + http_runtime: Runtime, + cancellation_token: CancellationToken, + rhio_runtime: Runtime, +} + +impl Context { + pub fn new( + node: Node, + config: Config, + public_key: PublicKey, + http_handle: JoinHandle>, + http_runtime: Runtime, + cancellation_token: CancellationToken, + rhio_runtime: Runtime, + ) -> Self { + Context { + node, + config, + public_key, + http_handle, + http_runtime, + cancellation_token, + rhio_runtime, + } + } + + pub fn configure(&self) -> Result<()> { + self.rhio_runtime.block_on(async { + self.configure_inner() + .await + .context("failed to configure Node") + }) + } + + async fn configure_inner(&self) -> Result<()> { + if self.config.s3.is_some() { + if let Some(publish) = &self.config.publish { + self.publish_s3(publish.s3_buckets.clone()).await?; + } + if let Some(subscribe) = &self.config.subscribe { + self.subscribe_s3(subscribe.s3_buckets.clone()).await?; + } + } + if let Some(publish) = &self.config.publish { + self.publish_nats(publish.nats_subjects.clone()).await?; + } + if let Some(subscribe) = &self.config.subscribe { + self.subscribe_nats(subscribe.nats_subjects.clone()).await?; + } + Ok(()) + } + + async fn publish_s3(&self, s3_buckets: Vec) -> Result<()> { + for bucket_name in s3_buckets { + // Assign our own public key to S3 bucket info. + self.node + .publish(Publication::Files { + bucket_name, + public_key: self.public_key, + }) + .await?; + } + Ok(()) + } + + async fn subscribe_s3(&self, s3_buckets: Vec) -> Result<()> { + for RemoteS3Bucket { + local_bucket_name, + remote_bucket_name, + public_key: remote_public_key, + } in s3_buckets + { + self.node + .subscribe(Subscription::Files(FilesSubscription { + remote_bucket_name, + local_bucket_name, + public_key: remote_public_key, + })) + .await?; + } + + Ok(()) + } + + async fn subscribe_nats(&self, subjects: Vec) -> Result<()> { + // Multiple subjects can be used on top of a stream and we want to group them over one + // public key and stream name pair. + let mut stream_public_key_map = HashMap::<(StreamName, PublicKey), Vec>::new(); + for RemoteNatsSubject { + stream_name, + subject, + public_key: remote_public_key, + } in subjects + { + stream_public_key_map + .entry((stream_name, remote_public_key)) + .and_modify(|subjects| { + subjects.push(subject.clone()); + }) + .or_insert(vec![subject]); + } + + // Finally we want to group these subscriptions by public key. + let mut subscription_map = HashMap::>::new(); + for ((stream_name, remote_public_key), subjects) in stream_public_key_map.into_iter() { + let filtered_stream = FilteredMessageStream { + subjects, + stream_name, + }; + subscription_map + .entry(remote_public_key) + .and_modify(|filtered_streams| filtered_streams.push(filtered_stream.clone())) + .or_insert(vec![filtered_stream]); + } + + for (remote_public_key, filtered_streams) in subscription_map.into_iter() { + let subscription = Subscription::Messages(MessagesSubscription { + filtered_streams, + public_key: remote_public_key, + }); + self.node.subscribe(subscription).await?; + } + Ok(()) + } + + async fn publish_nats(&self, nats_subjects: Vec) -> Result<()> { + // Multiple subjects can be used on top of a stream and we want to group them over one + // public key and stream name pair. This leaves us with the following structure: + // + // Streams Public Key Subjects Topic Id (for Gossip) + // ======= ========== ======== ===================== + // 1 I A a + // 2 I B a + // 3 II A b + // 1 II B b + // 1 II C b + let mut stream_public_key_map = HashMap::>::new(); + for LocalNatsSubject { + stream_name, + subject, + } in nats_subjects + { + stream_public_key_map + .entry(stream_name) + .and_modify(|subjects| { + subjects.push(subject.clone()); + }) + .or_insert(vec![subject.clone()]); + } + + for (stream_name, subjects) in stream_public_key_map.into_iter() { + self.node + .publish(Publication::Messages { + filtered_stream: FilteredMessageStream { + subjects, + stream_name, + }, + // Assign our own public key to NATS subject info. + public_key: self.public_key, + }) + .await?; + } + Ok(()) + } + + pub fn log_configuration(&self) { + let addresses: Vec = self + .node + .direct_addresses() + .iter() + .map(|addr| addr.to_string()) + .collect(); + info!("‣ network id:"); + info!(" - {}", self.config.node.network_id); + info!("‣ node public key:"); + info!(" - {}", self.public_key); + info!("‣ node addresses:"); + for address in addresses { + info!(" - {}", address); + } + info!("‣ health endpoint:"); + info!( + " - 0.0.0.0:{}{}", + self.config.node.http_bind_port, HTTP_HEALTH_ROUTE + ); + } + + pub fn wait_for_termination(&self) -> Result<()> { + let cloned_token = self.cancellation_token.clone(); + self.http_runtime.block_on(async move { + tokio::select! { + _ = cloned_token.cancelled() => bail!("HTTP server was cancelled"), + _ = signal::ctrl_c() => {}, + }; + Ok(()) + }) + } + + /// Shuts down the context, including the node and associated runtimes. + /// + /// This method performs the following steps: + /// 1. Shuts down the node asynchronously. + /// 2. Aborts the HTTP handle. + /// 3. Shuts down the HTTP runtime in the background. + /// 4. Shuts down the Rhio runtime in the background. + /// + /// Returns a `Result` indicating the success or failure of the shutdown process. + pub fn shutdown(self) -> Result<()> { + self.rhio_runtime.block_on(async move { + self.node + .shutdown() + .await + .context("Failure during node shutdown") + })?; + self.http_handle.abort(); + self.http_runtime.shutdown_background(); + self.rhio_runtime.shutdown_background(); + Ok(()) + } +} diff --git a/rhio/src/context_builder.rs b/rhio/src/context_builder.rs new file mode 100644 index 0000000..b9d0de8 --- /dev/null +++ b/rhio/src/context_builder.rs @@ -0,0 +1,185 @@ +use crate::context::Context; +use crate::node::rhio::NodeOptions; + +use crate::{blobs::store_from_config, nats::Nats, Node}; + +use anyhow::{anyhow, Context as AnyhowContext, Result}; +use p2panda_blobs::Blobs as BlobsHandler; +use p2panda_core::{PrivateKey, PublicKey}; +use p2panda_net::SyncConfiguration; +use p2panda_net::{NetworkBuilder, ResyncConfiguration}; +use tokio::runtime::{Builder, Runtime}; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; + +use crate::blobs::{blobs_config, Blobs}; +use crate::config::PRIVATE_KEY_ENV; +use crate::config::{load_config, Config}; +use crate::health::run_http_server; +use crate::network::sync::RhioSyncProtocol; +use crate::network::Panda; +use crate::tracing::setup_tracing; +use figment::providers::Env; +use rhio_core::load_private_key_from_file; + +#[derive(Debug)] +pub struct ContextBuilder { + config: Config, + private_key: PrivateKey, + public_key: PublicKey, +} + +impl ContextBuilder { + pub fn new(config: Config, private_key: PrivateKey) -> Self { + ContextBuilder { + public_key: private_key.public_key(), + config, + private_key, + } + } + /// Load the configuration from the environment and initializes context builder + pub fn from_cli() -> Result { + let config = load_config()?; + setup_tracing(config.log_level.clone()); + + // Load the private key from either an environment variable _or_ a file specified in the + // config. The environment variable takes priority. + let private_key = match Env::var(PRIVATE_KEY_ENV) { + Some(private_key_hex) => PrivateKey::try_from(&hex::decode(&private_key_hex)?[..])?, + None => load_private_key_from_file(&config.node.private_key_path).context(format!( + "could not load private key from file {}", + config.node.private_key_path.display(), + ))?, + }; + + let public_key = private_key.public_key(); + Ok(ContextBuilder { + config, + private_key, + public_key, + }) + } + /// Attempts to build and start the `Context`. + /// + /// This method initializes the Rhio runtime and HTTP server runtime, then + /// starts the Rhio node and HTTP server. The HTTP server is launched in a + /// separate runtime to avoid blocking the Rhio runtime. + /// + /// # Returns + /// + /// * `Ok(Context)` - If the context is successfully built and started. + /// * `Err(anyhow::Error)` - If there is an error during the initialization + /// process. + /// + /// # Errors + /// + /// This function will return an error if: + /// * The Rhio tokio runtime cannot be built. + /// * The Rhio node initialization fails. + /// * The HTTP server tokio runtime cannot be built. + /// * The HTTP server fails to start. + /// + pub fn try_build_and_start(&self) -> Result { + let rhio_runtime = Builder::new_multi_thread() + .enable_all() + .thread_name("rhio") + .build() + .expect("Rhio tokio runtime"); + + let node = rhio_runtime.block_on(async move { + ContextBuilder::init_rhio_node(self.config.clone(), self.private_key.clone()) + .await + .context("failed to initialize Rhio node") + })?; + + let http_runtime = Builder::new_current_thread() + .enable_io() + .thread_name("http-server") + .build() + .expect("http server tokio runtime"); + + let cancellation_token = CancellationToken::new(); + // Launch HTTP server in separate runtime to not block rhio runtime. + let http_handle = self.start_http_server(&http_runtime, cancellation_token.clone()); + Ok(Context::new( + node, + self.config.clone(), + self.public_key, + http_handle, + http_runtime, + cancellation_token, + rhio_runtime, + )) + } + + async fn init_rhio_node(config: Config, private_key: PrivateKey) -> Result { + let nats = Nats::new(config.clone()).await?; + + // 2. Configure rhio peer-to-peer network. + let (node_config, p2p_network_config) = Node::configure_p2p_network(&config).await?; + + let blob_store = store_from_config(&config).await?; + + let sync_protocol = RhioSyncProtocol::new( + node_config.clone(), + nats.clone(), + blob_store.clone(), + private_key.clone(), + ); + + let sync_config = + SyncConfiguration::new(sync_protocol).resync(ResyncConfiguration::default()); + + let builder = NetworkBuilder::from_config(p2p_network_config) + .private_key(private_key.clone()) + .sync(sync_config); + + let (watcher_tx, watcher_rx) = mpsc::channel(512); + // 3. Configure and set up blob store and connection handlers for blob replication. + let (network, blobs, watcher_rx) = if config.s3.is_some() { + let (network, blobs_handler) = + BlobsHandler::from_builder_with_config(builder, blob_store.clone(), blobs_config()) + .await?; + // 3.1. Start a service which watches the S3 buckets for changes. + let blobs = Blobs::new(blob_store.clone(), blobs_handler, watcher_tx); + (network, Some(blobs), watcher_rx) + } else { + let network = builder.build().await?; + (network, None, watcher_rx) + }; + + // 5. Move all networking logic into dedicated "p2panda" actor, dealing with p2p + // networking, data replication and gossipping. + let node_id = network.node_id(); + let direct_addresses = network + .direct_addresses() + .await + .ok_or_else(|| anyhow!("socket is not bind to any interface"))?; + let panda = Panda::new(network); + + let options = NodeOptions { + public_key: node_id, + node_config, + private_key: private_key.clone(), + direct_addresses, + }; + + Node::new(nats, blobs, watcher_rx, panda, options).await + } + + fn start_http_server( + &self, + runtime: &Runtime, + cancellation_token: CancellationToken, + ) -> JoinHandle> { + let http_bind_port = self.config.node.http_bind_port; + runtime.spawn(async move { + let result = run_http_server(http_bind_port) + .await + .context("failed to start http server with health endpoint"); + cancellation_token.cancel(); + result + }) + } +} diff --git a/rhio/src/lib.rs b/rhio/src/lib.rs index c314fac..6f47dbb 100644 --- a/rhio/src/lib.rs +++ b/rhio/src/lib.rs @@ -1,5 +1,7 @@ mod blobs; pub mod config; +pub mod context; +pub mod context_builder; pub mod health; mod nats; mod network; diff --git a/rhio/src/main.rs b/rhio/src/main.rs index 385dced..ebfef98 100644 --- a/rhio/src/main.rs +++ b/rhio/src/main.rs @@ -1,194 +1,24 @@ -use std::collections::HashMap; - -use anyhow::{bail, Context, Result}; -use figment::providers::Env; -use p2panda_core::{PrivateKey, PublicKey}; -use rhio::config::{ - load_config, LocalNatsSubject, RemoteNatsSubject, RemoteS3Bucket, PRIVATE_KEY_ENV, -}; -use rhio::health::{run_http_server, HTTP_HEALTH_ROUTE}; -use rhio::tracing::setup_tracing; -use rhio::{ - FilesSubscription, FilteredMessageStream, MessagesSubscription, Node, Publication, StreamName, - Subscription, -}; -use rhio_core::{load_private_key_from_file, Subject}; -use tokio::runtime::Builder; -use tokio::sync::oneshot; +use anyhow::Result; +use rhio::context_builder::ContextBuilder; use tracing::info; -#[tokio::main] -async fn main() -> Result<()> { - let config = load_config()?; - setup_tracing(config.log_level.clone()); - - // Load the private key from either an environment variable _or_ a file specified in the - // config. The environment variable takes priority. - let private_key = match Env::var(PRIVATE_KEY_ENV) { - Some(private_key_hex) => PrivateKey::try_from(&hex::decode(&private_key_hex)?[..])?, - None => load_private_key_from_file(&config.node.private_key_path).context(format!( - "could not load private key from file {}", - config.node.private_key_path.display(), - ))?, - }; - - let public_key = private_key.public_key(); - - let node = Node::spawn(config.clone(), private_key).await?; - - hello_rhio(); - let addresses: Vec = node - .direct_addresses() - .iter() - .map(|addr| addr.to_string()) - .collect(); - info!("‣ network id:"); - info!(" - {}", config.node.network_id); - info!("‣ node public key:"); - info!(" - {}", public_key); - info!("‣ node addresses:"); - for address in addresses { - info!(" - {}", address); - } - info!("‣ health endpoint:"); - info!( - " - 0.0.0.0:{}{}", - config.node.http_bind_port, HTTP_HEALTH_ROUTE - ); - - if let Some(publish) = config.publish { - for bucket_name in publish.s3_buckets { - // Assign our own public key to S3 bucket info. - node.publish(Publication::Files { - bucket_name, - public_key, - }) - .await?; - } - - // Multiple subjects can be used on top of a stream and we want to group them over one - // public key and stream name pair. This leaves us with the following structure: - // - // Streams Public Key Subjects Topic Id (for Gossip) - // ======= ========== ======== ===================== - // 1 I A a - // 2 I B a - // 3 II A b - // 1 II B b - // 1 II C b - let mut stream_public_key_map = HashMap::>::new(); - for LocalNatsSubject { - stream_name, - subject, - } in publish.nats_subjects - { - stream_public_key_map - .entry(stream_name) - .and_modify(|subjects| { - subjects.push(subject.clone()); - }) - .or_insert(vec![subject]); - } - - for (stream_name, subjects) in stream_public_key_map.into_iter() { - node.publish(Publication::Messages { - filtered_stream: FilteredMessageStream { - subjects, - stream_name, - }, - // Assign our own public key to NATS subject info. - public_key, - }) - .await?; - } - }; +fn main() -> Result<()> { + let context_builder = ContextBuilder::from_cli()?; + let context = context_builder.try_build_and_start()?; - if let Some(subscribe) = config.subscribe { - for RemoteS3Bucket { - local_bucket_name, - remote_bucket_name, - public_key: remote_public_key, - } in subscribe.s3_buckets - { - node.subscribe(Subscription::Files(FilesSubscription { - remote_bucket_name, - local_bucket_name, - public_key: remote_public_key, - })) - .await?; - } + context.configure()?; - // Multiple subjects can be used on top of a stream and we want to group them over one - // public key and stream name pair. - let mut stream_public_key_map = HashMap::<(StreamName, PublicKey), Vec>::new(); - for RemoteNatsSubject { - stream_name, - subject, - public_key: remote_public_key, - } in subscribe.nats_subjects - { - stream_public_key_map - .entry((stream_name, remote_public_key)) - .and_modify(|subjects| { - subjects.push(subject.clone()); - }) - .or_insert(vec![subject]); - } + log_hello_rhio(); + context.log_configuration(); - // Finally we want to group these subscriptions by public key. - let mut subscription_map = HashMap::>::new(); - for ((stream_name, remote_public_key), subjects) in stream_public_key_map.into_iter() { - let filtered_stream = FilteredMessageStream { - subjects, - stream_name, - }; - subscription_map - .entry(remote_public_key) - .and_modify(|filtered_streams| filtered_streams.push(filtered_stream.clone())) - .or_insert(vec![filtered_stream]); - } + context.wait_for_termination()?; - for (remote_public_key, filtered_streams) in subscription_map.into_iter() { - let subscription = Subscription::Messages(MessagesSubscription { - filtered_streams, - public_key: remote_public_key, - }); - node.subscribe(subscription).await?; - } - }; - - // Launch HTTP server in separate thread to not block rhio runtime. - let (http_error_tx, http_error_rx) = oneshot::channel::>(); - std::thread::spawn(move || { - let runtime = Builder::new_current_thread() - .enable_io() - .thread_name("http-server") - .build() - .expect("http server tokio runtime"); - - let result = runtime.block_on(async move { - run_http_server(config.node.http_bind_port) - .await - .context("failed to start http server with health endpoint")?; - Ok(()) - }); - - http_error_tx.send(result).expect("sending http error"); - }); - - tokio::select! { - Ok(Err(err)) = http_error_rx => bail!(err), - _ = tokio::signal::ctrl_c() => {}, - } - - info!(""); info!("shutting down"); - node.shutdown().await?; - Ok(()) + context.shutdown() } -fn hello_rhio() { +fn log_hello_rhio() { r#" ___ ___ ___ /\ \ /\__\ ___ /\ \ /::\ \ /:/ / /\ \ /::\ \ @@ -203,4 +33,5 @@ fn hello_rhio() { "# .split("\n") .for_each(|line| info!("{}", line)); + info!(""); } diff --git a/rhio/src/node/config.rs b/rhio/src/node/config.rs index 30cef0b..d87382e 100644 --- a/rhio/src/node/config.rs +++ b/rhio/src/node/config.rs @@ -14,6 +14,12 @@ pub struct NodeConfig { inner: Arc>, } +impl Default for NodeConfig { + fn default() -> Self { + Self::new() + } +} + #[derive(Debug)] struct NodeConfigInner { subscriptions: Vec, diff --git a/rhio/src/node/rhio.rs b/rhio/src/node/rhio.rs index c561102..768d37f 100644 --- a/rhio/src/node/rhio.rs +++ b/rhio/src/node/rhio.rs @@ -3,26 +3,33 @@ use std::net::SocketAddr; use anyhow::{anyhow, Result}; use futures_util::future::{MapErr, Shared}; use futures_util::{FutureExt, TryFutureExt}; -use p2panda_blobs::Blobs as BlobsHandler; use p2panda_core::{Hash, PrivateKey, PublicKey}; -use p2panda_net::{ - Config as NetworkConfig, NetworkBuilder, ResyncConfiguration, SyncConfiguration, -}; +use p2panda_net::Config as NetworkConfig; +use s3::error::S3Error; +use tokio::sync::mpsc::Receiver; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinError; use tokio_util::task::AbortOnDropHandle; use tracing::error; -use crate::blobs::{blobs_config, store_from_config, Blobs}; +use crate::blobs::watcher::S3Event; +use crate::blobs::Blobs; use crate::config::Config; use crate::nats::Nats; -use crate::network::sync::RhioSyncProtocol; use crate::network::Panda; use crate::node::actor::{NodeActor, ToNodeActor}; use crate::node::config::NodeConfig; use crate::topic::{Publication, Subscription}; use crate::JoinErrToStr; +#[derive(Debug, Clone, Default)] +pub struct NodeOptions { + pub public_key: PublicKey, + pub private_key: PrivateKey, + pub direct_addresses: Vec, + pub node_config: NodeConfig, +} + pub struct Node { node_id: PublicKey, direct_addresses: Vec, @@ -31,61 +38,20 @@ pub struct Node { } impl Node { - /// Configure and spawn a node. - pub async fn spawn(config: Config, private_key: PrivateKey) -> Result { - // 1. Connect to NATS server and consume streams filtered by NATS subjects. - let nats = Nats::new(config.clone()).await?; - - // 2. Configure rhio peer-to-peer network. - let (node_config, p2p_network_config) = Node::configure_p2p_network(&config).await?; - - let blob_store = store_from_config(&config).await?; - - let sync_protocol = RhioSyncProtocol::new( - node_config.clone(), - nats.clone(), - blob_store.clone(), - private_key.clone(), - ); - - let sync_config = - SyncConfiguration::new(sync_protocol).resync(ResyncConfiguration::default()); - - let builder = NetworkBuilder::from_config(p2p_network_config) - .private_key(private_key.clone()) - .sync(sync_config); - - // 3. Configure and set up blob store and connection handlers for blob replication. - let (network, blobs, watcher_rx) = if config.s3.is_some() { - let (network, blobs_handler) = - BlobsHandler::from_builder_with_config(builder, blob_store.clone(), blobs_config()) - .await?; - // 3.1. Start a service which watches the S3 buckets for changes. - let (watcher_tx, watcher_rx) = mpsc::channel(512); - - let blobs = Blobs::new(blob_store.clone(), blobs_handler, watcher_tx); - (network, Some(blobs), watcher_rx) - } else { - let network = builder.build().await?; - let (_dummy_watcher_tx, dummy_watcher_rx) = mpsc::channel(512); - (network, None, dummy_watcher_rx) - }; - - // 5. Move all networking logic into dedicated "p2panda" actor, dealing with p2p - // networking, data replication and gossipping. - let node_id = network.node_id(); - let direct_addresses = network - .direct_addresses() - .await - .ok_or_else(|| anyhow!("socket is not bind to any interface"))?; - let panda = Panda::new(network); - + /// Create a new Rhio node. + pub async fn new( + nats: Nats, + blobs: Option, + watcher_rx: Receiver>, + panda: Panda, + options: NodeOptions, + ) -> Result { // 6. Finally spawn actor which orchestrates the "business logic", with the help of the // blob store, p2panda network and NATS JetStream consumers. let (node_actor_tx, node_actor_rx) = mpsc::channel(512); let node_actor = NodeActor::new( - node_config, - private_key, + options.node_config, + options.private_key, nats, panda, blobs, @@ -103,8 +69,8 @@ impl Node { .shared(); let node = Node { - node_id, - direct_addresses, + node_id: options.public_key, + direct_addresses: options.direct_addresses, node_actor_tx, actor_handle: actor_drop_handle, }; @@ -112,7 +78,7 @@ impl Node { Ok(node) } - async fn configure_p2p_network( + pub async fn configure_p2p_network( config: &Config, ) -> Result<(NodeConfig, NetworkConfig), anyhow::Error> { let node_config = NodeConfig::new();