diff --git a/Cargo.lock b/Cargo.lock index f46f1933b3..c5cbc9fcde 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2737,6 +2737,7 @@ dependencies = [ "serde_json", "snafu", "surf-disco", + "thiserror", "tide-disco 0.4.1", "tokio", "toml 0.5.11", diff --git a/crates/hotshot/examples/combined/all.rs b/crates/hotshot/examples/combined/all.rs index 38ae05490f..1842cbf687 100644 --- a/crates/hotshot/examples/combined/all.rs +++ b/crates/hotshot/examples/combined/all.rs @@ -101,7 +101,7 @@ async fn main() { url: "http://localhost".to_string(), port: 4444, public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), - config_file: None, + network_config_file: None, }) .await }); diff --git a/crates/hotshot/examples/combined/multi-validator.rs b/crates/hotshot/examples/combined/multi-validator.rs index 674c994fcc..a20bcb8b40 100644 --- a/crates/hotshot/examples/combined/multi-validator.rs +++ b/crates/hotshot/examples/combined/multi-validator.rs @@ -30,7 +30,7 @@ struct MultiValidatorArgs { /// An optional network config file to save to/load from /// Allows for rejoining the network on a complete state loss #[arg(short, long)] - pub config_file: Option, + pub network_config_file: Option, } #[cfg_attr( @@ -66,7 +66,9 @@ async fn main() { url: args.url, port: args.port, public_ip: args.public_ip, - config_file: args.config_file.map(|s| format!("{}-{}", s, node_index)), + network_config_file: args + .network_config_file + .map(|s| format!("{}-{}", s, node_index)), }) .await }); diff --git a/crates/hotshot/examples/infra/mod.rs b/crates/hotshot/examples/infra/mod.rs index 46d8b4ead6..aa17fa578e 100644 --- a/crates/hotshot/examples/infra/mod.rs +++ b/crates/hotshot/examples/infra/mod.rs @@ -16,6 +16,7 @@ use hotshot::{ types::{SignatureKey, SystemContextHandle}, HotShotType, Memberships, Networks, SystemContext, }; +use hotshot_orchestrator::config::NetworkConfigSource; use hotshot_orchestrator::{ self, client::{OrchestratorClient, ValidatorArgs}, @@ -825,27 +826,27 @@ pub async fn main_entry_point< error!("Starting validator"); - let orchestrator_client: OrchestratorClient = OrchestratorClient::new(args.clone()).await; - - // Identify with the orchestrator + // see what our public identity will be let public_ip = match args.public_ip { Some(ip) => ip, None => local_ip_address::local_ip().unwrap(), }; - // Save/load config from file or orchestrator - let (node_index, mut run_config) = orchestrator_client - .config_from_file_or_orchestrator::(public_ip.to_string()) - .await; + let orchestrator_client: OrchestratorClient = + OrchestratorClient::new(args.clone(), public_ip.to_string()).await; - error!("Retrieved configuration; our node index is {node_index}"); + // conditionally save/load config from file or orchestrator + let (mut run_config, source) = + NetworkConfig::from_file_or_orchestrator(&orchestrator_client, args.network_config_file) + .await; - run_config.node_index = node_index.into(); + let node_index = run_config.node_index; + error!("Retrieved config; our node index is {node_index}"); run_config.config.my_own_validator_config = ValidatorConfig::<::SignatureKey>::generated_from_seed_indexed( run_config.seed, - node_index.into(), + node_index, 1, ); //run_config.libp2p_config.as_mut().unwrap().public_ip = args.public_ip.unwrap(); @@ -885,12 +886,14 @@ pub async fn main_entry_point< } } - error!("Waiting for start command from orchestrator"); - orchestrator_client - .wait_for_all_nodes_ready(run_config.clone().node_index) - .await; + if let NetworkConfigSource::Orchestrator = source { + error!("Waiting for the start command from orchestrator"); + orchestrator_client + .wait_for_all_nodes_ready(run_config.clone().node_index) + .await; + } - error!("All nodes are ready! Starting HotShot"); + error!("Starting HotShot"); run.run_hotshot( hotshot, &mut transactions, diff --git a/crates/hotshot/examples/libp2p/all.rs b/crates/hotshot/examples/libp2p/all.rs index e50f10d38c..897f7657b7 100644 --- a/crates/hotshot/examples/libp2p/all.rs +++ b/crates/hotshot/examples/libp2p/all.rs @@ -45,7 +45,6 @@ async fn main() { NodeImpl, >(OrchestratorArgs { url: Url::parse("http://localhost:4444").unwrap(), - config_file: args.config_file.clone(), })); @@ -69,7 +68,7 @@ async fn main() { url: "http://localhost".to_string(), port: 4444, public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), - config_file: None, + network_config_file: None, }) .await }); diff --git a/crates/hotshot/examples/libp2p/multi-validator.rs b/crates/hotshot/examples/libp2p/multi-validator.rs index a6ac23e64e..642e53b90f 100644 --- a/crates/hotshot/examples/libp2p/multi-validator.rs +++ b/crates/hotshot/examples/libp2p/multi-validator.rs @@ -30,7 +30,7 @@ struct MultiValidatorArgs { /// An optional network config file to save to/load from /// Allows for rejoining the network on a complete state loss #[arg(short, long)] - pub config_file: Option, + pub network_config_file: Option, } #[cfg_attr( @@ -65,7 +65,9 @@ async fn main() { url: args.url, port: args.port, public_ip: args.public_ip, - config_file: args.config_file.map(|s| format!("{}-{}", s, node_index)), + network_config_file: args + .network_config_file + .map(|s| format!("{}-{}", s, node_index)), }) .await }); diff --git a/crates/hotshot/examples/webserver/all.rs b/crates/hotshot/examples/webserver/all.rs index 356b050738..5764e0c602 100644 --- a/crates/hotshot/examples/webserver/all.rs +++ b/crates/hotshot/examples/webserver/all.rs @@ -93,7 +93,7 @@ async fn main() { url: "http://localhost".to_string(), port: 4444, public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)), - config_file: None, + network_config_file: None, }) .await }); diff --git a/crates/hotshot/examples/webserver/multi-validator.rs b/crates/hotshot/examples/webserver/multi-validator.rs index c0a0f12713..0a726a071d 100644 --- a/crates/hotshot/examples/webserver/multi-validator.rs +++ b/crates/hotshot/examples/webserver/multi-validator.rs @@ -30,7 +30,7 @@ struct MultiValidatorArgs { /// An optional network config file to save to/load from /// Allows for rejoining the network on a complete state loss #[arg(short, long)] - pub config_file: Option, + pub network_config_file: Option, } #[cfg_attr( @@ -64,7 +64,9 @@ async fn main() { url: args.url, port: args.port, public_ip: args.public_ip, - config_file: args.config_file.map(|s| format!("{}-{}", s, node_index)), + network_config_file: args + .network_config_file + .map(|s| format!("{}-{}", s, node_index)), }) .await }); diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index b4f093972d..4555e6cfed 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -25,6 +25,7 @@ serde_json = "1.0.96" snafu = { workspace = true } # TODO upgrade to toml = { workspace = true } https://github.com/EspressoSystems/HotShot/issues/1698 toml = "0.5.9" +thiserror = "1.0.50" [target.'cfg(all(async_executor_impl = "tokio"))'.dependencies] tokio = { workspace = true } diff --git a/crates/orchestrator/src/client.rs b/crates/orchestrator/src/client.rs index 2fb1855871..44208c60a4 100644 --- a/crates/orchestrator/src/client.rs +++ b/crates/orchestrator/src/client.rs @@ -1,69 +1,17 @@ -use std::{fs, net::IpAddr, time::Duration}; +use std::{net::IpAddr, time::Duration}; use crate::config::NetworkConfig; use async_compatibility_layer::art::async_sleep; use clap::Parser; use futures::{Future, FutureExt}; -use hotshot_types::traits::node_implementation::NodeType; +use hotshot_types::traits::{election::ElectionConfig, signature_key::SignatureKey}; use surf_disco::{error::ClientError, Client}; -use tracing::error; - -/// Loads the node's index and config from a file -#[allow(clippy::type_complexity)] -fn load_index_and_config_from_file( - file: String, -) -> Option<( - u16, - NetworkConfig, -)> -where - TYPES: NodeType, -{ - let data = match fs::read(file) { - Ok(data) => data, - Err(e) => { - error!("Failed to load index and config from file: {}", e); - None? - } - }; - - match bincode::deserialize(&data) { - Ok(data) => Some(data), - Err(e) => { - error!("Failed to deserialize index and config from file: {}", e); - None - } - } -} - -/// Saves the node's index and config to a file -fn save_index_and_config_to_file( - node_index: u16, - config: NetworkConfig, - file: String, -) where - TYPES: NodeType, -{ - // serialize - let serialized = match bincode::serialize(&(node_index, config)) { - Ok(data) => data, - Err(e) => { - error!("Failed to serialize index and config to file: {}", e); - return; - } - }; - - // write - if let Err(e) = fs::write(file, serialized) { - error!("Failed to write index and config to file: {}", e); - } -} /// Holds the client connection to the orchestrator pub struct OrchestratorClient { client: surf_disco::Client, - file: Option, + pub identity: String, } // VALIDATOR @@ -85,75 +33,26 @@ pub struct ValidatorArgs { /// An optional network config file to save to/load from /// Allows for rejoining the network on a complete state loss #[arg(short, long)] - pub config_file: Option, + pub network_config_file: Option, } impl OrchestratorClient { - /// Creates the client that connects to the orchestrator - pub async fn new(args: ValidatorArgs) -> Self { + /// Creates the client that will connect to the orchestrator + pub async fn new(args: ValidatorArgs, identity: String) -> Self { let base_url = format!("{0}:{1}", args.url, args.port).parse().unwrap(); let client = surf_disco::Client::::new(base_url); // TODO ED: Add healthcheck wait here - OrchestratorClient { - client, - file: args.config_file, - } - } - - /// Gets the node config and index first from a file, or the orchestrator if - /// 1. no file is specified - /// 2. there is an issue with the file - /// Saves file to disk if successfully retrieved from the orchestrator - pub async fn config_from_file_or_orchestrator( - &self, - identity: String, - ) -> ( - u16, - NetworkConfig, - ) { - if let Some(file) = &self.file { - error!("Attempting to retrieve node configuration from file"); - // load from file, if fail load from orchestrator and save - match load_index_and_config_from_file::(file.clone()) { - Some(res) => res, - None => { - error!( - "Failed to load node configuration from file; trying to load from orchestrator" - ); - // load from orchestrator - let (node_index, run_config) = self - .config_from_orchestrator::(identity.to_string()) - .await; - - // save to file - save_index_and_config_to_file::( - node_index, - run_config.clone(), - file.to_string(), - ); - - (node_index, run_config) - } - } - } else { - // load from orchestrator - error!("Attempting to load node configuration from orchestrator"); - self.config_from_orchestrator::(identity.to_string()) - .await - } + OrchestratorClient { client, identity } } /// Sends an identify message to the orchestrator and attempts to get its config /// Returns both the node_index and the run configuration from the orchestrator /// Will block until both are returned #[allow(clippy::type_complexity)] - pub async fn config_from_orchestrator( + pub async fn get_config( &self, identity: String, - ) -> ( - u16, - NetworkConfig, - ) { + ) -> NetworkConfig { // get the node index let identity = identity.as_str(); let identity = |client: Client| { @@ -171,10 +70,7 @@ impl OrchestratorClient { // get the corresponding config let f = |client: Client| { async move { - let config: Result< - NetworkConfig, - ClientError, - > = client + let config: Result, ClientError> = client .post(&format!("api/config/{node_index}")) .send() .await; @@ -183,7 +79,11 @@ impl OrchestratorClient { .boxed() }; - (node_index, self.wait_for_fn_from_orchestrator(f).await) + let mut config = self.wait_for_fn_from_orchestrator(f).await; + + config.node_index = node_index as u64; + + config } /// Tells the orchestrator this validator is ready to start diff --git a/crates/orchestrator/src/config.rs b/crates/orchestrator/src/config.rs index 9966c8a9fd..8049f4c294 100644 --- a/crates/orchestrator/src/config.rs +++ b/crates/orchestrator/src/config.rs @@ -11,8 +11,12 @@ use std::{ time::Duration, }; use surf_disco::Url; +use thiserror::Error; use toml; use tracing::error; + +use crate::client::OrchestratorClient; + #[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] pub struct Libp2pConfig { pub bootstrap_nodes: Vec<(SocketAddr, Vec)>, @@ -57,6 +61,18 @@ pub struct WebServerConfig { pub wait_between_polls: Duration, } +#[derive(Error, Debug)] +pub enum NetworkConfigError { + #[error("Failed to read NetworkConfig from file")] + ReadFromFileError(std::io::Error), + #[error("Failed to deserialize loaded NetworkConfig")] + DeserializeError(bincode::Error), + #[error("Failed to write NetworkConfig to file")] + WriteToFileError(std::io::Error), + #[error("Failed to serialize NetworkConfig")] + SerializeError(bincode::Error), +} + #[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] #[serde(bound(deserialize = ""))] pub struct NetworkConfig { @@ -78,6 +94,151 @@ pub struct NetworkConfig { pub da_web_server_config: Option, } +pub enum NetworkConfigSource { + Orchestrator, + File, +} + +impl NetworkConfig { + /// Asynchronously retrieves a `NetworkConfig` either from a file or from an orchestrator. + /// + /// This function takes an `OrchestratorClient`, an identity string, and an optional file path. + /// + /// If a file path is provided, the function will first attempt to load the `NetworkConfig` from the file. + /// If the file does not exist or cannot be read, the function will fall back to retrieving the `NetworkConfig` from the orchestrator. + /// The retrieved `NetworkConfig` is then saved back to the file for future use. + /// + /// If no file path is provided, the function will directly retrieve the `NetworkConfig` from the orchestrator. + /// + /// # Arguments + /// + /// * `client` - An `OrchestratorClient` used to retrieve the `NetworkConfig` from the orchestrator. + /// * `identity` - A string representing the identity for which to retrieve the `NetworkConfig`. + /// * `file` - An optional string representing the path to the file from which to load the `NetworkConfig`. + /// + /// # Returns + /// + /// This function returns a tuple containing a `NetworkConfig` and a `NetworkConfigSource`. The `NetworkConfigSource` indicates whether the `NetworkConfig` was loaded from a file or retrieved from the orchestrator. + /// + /// # Examples + /// + /// ```no_run + /// let client = OrchestratorClient::new(); + /// let identity = "my_identity".to_string(); + /// let file = Some("/path/to/my/config".to_string()); + /// let (config, source) = NetworkConfig::from_file_or_orchestrator(client, file).await; + /// ``` + pub async fn from_file_or_orchestrator( + client: &OrchestratorClient, + file: Option, + ) -> (NetworkConfig, NetworkConfigSource) { + if let Some(file) = file { + // if we pass in file, try there first + match Self::from_file(file.clone()) { + Ok(config) => (config, NetworkConfigSource::File), + Err(e) => { + // fallback to orchestrator + error!("{e}"); + + let config = client.get_config(client.identity.clone()).await; + + // save to file if we fell back + if let Err(e) = config.to_file(file) { + error!("{e}"); + }; + + (config, NetworkConfigSource::Orchestrator) + } + } + } else { + error!("Retrieving config from the orchestrator"); + + // otherwise just get from orchestrator + ( + client.get_config(client.identity.clone()).await, + NetworkConfigSource::Orchestrator, + ) + } + } + + /// Loads a `NetworkConfig` from a file. + /// + /// This function takes a file path as a string, reads the file, and then deserializes the contents into a `NetworkConfig`. + /// + /// # Arguments + /// + /// * `file` - A string representing the path to the file from which to load the `NetworkConfig`. + /// + /// # Returns + /// + /// This function returns a `Result` that contains a `NetworkConfig` if the file was successfully read and deserialized, or a `NetworkConfigError` if an error occurred. + /// + /// # Errors + /// + /// This function will return an error if the file cannot be read or if the contents cannot be deserialized into a `NetworkConfig`. + /// + /// # Examples + /// + /// ```no_run + /// let file = "/path/to/my/config".to_string(); + /// let config = NetworkConfig::from_file(file).unwrap(); + /// ``` + pub fn from_file(file: String) -> Result { + // read from file + let data = match fs::read(file) { + Ok(data) => data, + Err(e) => { + return Err(NetworkConfigError::ReadFromFileError(e)); + } + }; + + // deserialize + match bincode::deserialize(&data) { + Ok(data) => Ok(data), + Err(e) => Err(NetworkConfigError::DeserializeError(e)), + } + } + + /// Serializes the `NetworkConfig` and writes it to a file. + /// + /// This function takes a file path as a string, serializes the `NetworkConfig` into binary format using `bincode`, and then writes the serialized data to the file. + /// + /// # Arguments + /// + /// * `file` - A string representing the path to the file where the `NetworkConfig` should be saved. + /// + /// # Returns + /// + /// This function returns a `Result` that contains `()` if the `NetworkConfig` was successfully serialized and written to the file, or a `NetworkConfigError` if an error occurred. + /// + /// # Errors + /// + /// This function will return an error if the `NetworkConfig` cannot be serialized or if the file cannot be written. + /// + /// # Examples + /// + /// ```no_run + /// let file = "/path/to/my/config".to_string(); + /// let config = NetworkConfig::from_file(file); + /// config.to_file(file).unwrap(); + /// ``` + pub fn to_file(&self, file: String) -> Result<(), NetworkConfigError> { + // serialize + let serialized = match bincode::serialize(self) { + Ok(data) => data, + Err(e) => { + return Err(NetworkConfigError::SerializeError(e)); + } + }; + + // write to file + match fs::write(file, serialized) { + Ok(()) => Ok(()), + Err(e) => Err(NetworkConfigError::WriteToFileError(e)), + } + } +} + impl Default for NetworkConfig { fn default() -> Self { Self {