Skip to content

Commit

Permalink
improvements and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron committed Dec 8, 2023
1 parent c4da3aa commit 220d45b
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 140 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/hotshot/examples/combined/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ async fn main() {
url: "http://localhost".to_string(),
port: 4444,
public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
config_file: None,
network_config_file: None,
})
.await
});
Expand Down
6 changes: 4 additions & 2 deletions crates/hotshot/examples/combined/multi-validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct MultiValidatorArgs {
/// An optional network config file to save to/load from
/// Allows for rejoining the network on a complete state loss
#[arg(short, long)]
pub config_file: Option<String>,
pub network_config_file: Option<String>,
}

#[cfg_attr(
Expand Down Expand Up @@ -66,7 +66,9 @@ async fn main() {
url: args.url,
port: args.port,
public_ip: args.public_ip,
config_file: args.config_file.map(|s| format!("{}-{}", s, node_index)),
network_config_file: args
.network_config_file
.map(|s| format!("{}-{}", s, node_index)),
})
.await
});
Expand Down
33 changes: 18 additions & 15 deletions crates/hotshot/examples/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use hotshot::{
types::{SignatureKey, SystemContextHandle},
HotShotType, Memberships, Networks, SystemContext,
};
use hotshot_orchestrator::config::NetworkConfigSource;
use hotshot_orchestrator::{
self,
client::{OrchestratorClient, ValidatorArgs},
Expand Down Expand Up @@ -825,27 +826,27 @@ pub async fn main_entry_point<

error!("Starting validator");

let orchestrator_client: OrchestratorClient = OrchestratorClient::new(args.clone()).await;

// Identify with the orchestrator
// see what our public identity will be
let public_ip = match args.public_ip {
Some(ip) => ip,
None => local_ip_address::local_ip().unwrap(),
};

// Save/load config from file or orchestrator
let (node_index, mut run_config) = orchestrator_client
.config_from_file_or_orchestrator::<TYPES>(public_ip.to_string())
.await;
let orchestrator_client: OrchestratorClient =
OrchestratorClient::new(args.clone(), public_ip.to_string()).await;

error!("Retrieved configuration; our node index is {node_index}");
// conditionally save/load config from file or orchestrator
let (mut run_config, source) =
NetworkConfig::from_file_or_orchestrator(&orchestrator_client, args.network_config_file)
.await;

run_config.node_index = node_index.into();
let node_index = run_config.node_index;
error!("Retrieved config; our node index is {node_index}");

run_config.config.my_own_validator_config =
ValidatorConfig::<<TYPES as NodeType>::SignatureKey>::generated_from_seed_indexed(
run_config.seed,
node_index.into(),
node_index,
1,
);
//run_config.libp2p_config.as_mut().unwrap().public_ip = args.public_ip.unwrap();
Expand Down Expand Up @@ -885,12 +886,14 @@ pub async fn main_entry_point<
}
}

error!("Waiting for start command from orchestrator");
orchestrator_client
.wait_for_all_nodes_ready(run_config.clone().node_index)
.await;
if let NetworkConfigSource::Orchestrator = source {
error!("Waiting for the start command from orchestrator");
orchestrator_client
.wait_for_all_nodes_ready(run_config.clone().node_index)
.await;
}

error!("All nodes are ready! Starting HotShot");
error!("Starting HotShot");
run.run_hotshot(
hotshot,
&mut transactions,
Expand Down
3 changes: 1 addition & 2 deletions crates/hotshot/examples/libp2p/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ async fn main() {
NodeImpl,
>(OrchestratorArgs {
url: Url::parse("http://localhost:4444").unwrap(),

config_file: args.config_file.clone(),
}));

Expand All @@ -69,7 +68,7 @@ async fn main() {
url: "http://localhost".to_string(),
port: 4444,
public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
config_file: None,
network_config_file: None,
})
.await
});
Expand Down
6 changes: 4 additions & 2 deletions crates/hotshot/examples/libp2p/multi-validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct MultiValidatorArgs {
/// An optional network config file to save to/load from
/// Allows for rejoining the network on a complete state loss
#[arg(short, long)]
pub config_file: Option<String>,
pub network_config_file: Option<String>,
}

#[cfg_attr(
Expand Down Expand Up @@ -65,7 +65,9 @@ async fn main() {
url: args.url,
port: args.port,
public_ip: args.public_ip,
config_file: args.config_file.map(|s| format!("{}-{}", s, node_index)),
network_config_file: args
.network_config_file
.map(|s| format!("{}-{}", s, node_index)),
})
.await
});
Expand Down
2 changes: 1 addition & 1 deletion crates/hotshot/examples/webserver/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async fn main() {
url: "http://localhost".to_string(),
port: 4444,
public_ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
config_file: None,
network_config_file: None,
})
.await
});
Expand Down
6 changes: 4 additions & 2 deletions crates/hotshot/examples/webserver/multi-validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct MultiValidatorArgs {
/// An optional network config file to save to/load from
/// Allows for rejoining the network on a complete state loss
#[arg(short, long)]
pub config_file: Option<String>,
pub network_config_file: Option<String>,
}

#[cfg_attr(
Expand Down Expand Up @@ -64,7 +64,9 @@ async fn main() {
url: args.url,
port: args.port,
public_ip: args.public_ip,
config_file: args.config_file.map(|s| format!("{}-{}", s, node_index)),
network_config_file: args
.network_config_file
.map(|s| format!("{}-{}", s, node_index)),
})
.await
});
Expand Down
1 change: 1 addition & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ serde_json = "1.0.96"
snafu = { workspace = true }
# TODO upgrade to toml = { workspace = true } https://github.com/EspressoSystems/HotShot/issues/1698
toml = "0.5.9"
thiserror = "1.0.50"

[target.'cfg(all(async_executor_impl = "tokio"))'.dependencies]
tokio = { workspace = true }
Expand Down
130 changes: 15 additions & 115 deletions crates/orchestrator/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,69 +1,17 @@
use std::{fs, net::IpAddr, time::Duration};
use std::{net::IpAddr, time::Duration};

use crate::config::NetworkConfig;
use async_compatibility_layer::art::async_sleep;
use clap::Parser;
use futures::{Future, FutureExt};

use hotshot_types::traits::node_implementation::NodeType;
use hotshot_types::traits::{election::ElectionConfig, signature_key::SignatureKey};
use surf_disco::{error::ClientError, Client};
use tracing::error;

/// Loads the node's index and config from a file
#[allow(clippy::type_complexity)]
fn load_index_and_config_from_file<TYPES>(
file: String,
) -> Option<(
u16,
NetworkConfig<TYPES::SignatureKey, TYPES::ElectionConfigType>,
)>
where
TYPES: NodeType,
{
let data = match fs::read(file) {
Ok(data) => data,
Err(e) => {
error!("Failed to load index and config from file: {}", e);
None?
}
};

match bincode::deserialize(&data) {
Ok(data) => Some(data),
Err(e) => {
error!("Failed to deserialize index and config from file: {}", e);
None
}
}
}

/// Saves the node's index and config to a file
fn save_index_and_config_to_file<TYPES>(
node_index: u16,
config: NetworkConfig<TYPES::SignatureKey, TYPES::ElectionConfigType>,
file: String,
) where
TYPES: NodeType,
{
// serialize
let serialized = match bincode::serialize(&(node_index, config)) {
Ok(data) => data,
Err(e) => {
error!("Failed to serialize index and config to file: {}", e);
return;
}
};

// write
if let Err(e) = fs::write(file, serialized) {
error!("Failed to write index and config to file: {}", e);
}
}

/// Holds the client connection to the orchestrator
pub struct OrchestratorClient {
client: surf_disco::Client<ClientError>,
file: Option<String>,
pub identity: String,
}

// VALIDATOR
Expand All @@ -85,75 +33,26 @@ pub struct ValidatorArgs {
/// An optional network config file to save to/load from
/// Allows for rejoining the network on a complete state loss
#[arg(short, long)]
pub config_file: Option<String>,
pub network_config_file: Option<String>,
}

impl OrchestratorClient {
/// Creates the client that connects to the orchestrator
pub async fn new(args: ValidatorArgs) -> Self {
/// Creates the client that will connect to the orchestrator
pub async fn new(args: ValidatorArgs, identity: String) -> Self {
let base_url = format!("{0}:{1}", args.url, args.port).parse().unwrap();
let client = surf_disco::Client::<ClientError>::new(base_url);
// TODO ED: Add healthcheck wait here
OrchestratorClient {
client,
file: args.config_file,
}
}

/// Gets the node config and index first from a file, or the orchestrator if
/// 1. no file is specified
/// 2. there is an issue with the file
/// Saves file to disk if successfully retrieved from the orchestrator
pub async fn config_from_file_or_orchestrator<TYPES: NodeType>(
&self,
identity: String,
) -> (
u16,
NetworkConfig<TYPES::SignatureKey, TYPES::ElectionConfigType>,
) {
if let Some(file) = &self.file {
error!("Attempting to retrieve node configuration from file");
// load from file, if fail load from orchestrator and save
match load_index_and_config_from_file::<TYPES>(file.clone()) {
Some(res) => res,
None => {
error!(
"Failed to load node configuration from file; trying to load from orchestrator"
);
// load from orchestrator
let (node_index, run_config) = self
.config_from_orchestrator::<TYPES>(identity.to_string())
.await;

// save to file
save_index_and_config_to_file::<TYPES>(
node_index,
run_config.clone(),
file.to_string(),
);

(node_index, run_config)
}
}
} else {
// load from orchestrator
error!("Attempting to load node configuration from orchestrator");
self.config_from_orchestrator::<TYPES>(identity.to_string())
.await
}
OrchestratorClient { client, identity }
}

/// Sends an identify message to the orchestrator and attempts to get its config
/// Returns both the node_index and the run configuration from the orchestrator
/// Will block until both are returned
#[allow(clippy::type_complexity)]
pub async fn config_from_orchestrator<TYPES: NodeType>(
pub async fn get_config<K: SignatureKey, E: ElectionConfig>(
&self,
identity: String,
) -> (
u16,
NetworkConfig<TYPES::SignatureKey, TYPES::ElectionConfigType>,
) {
) -> NetworkConfig<K, E> {
// get the node index
let identity = identity.as_str();
let identity = |client: Client<ClientError>| {
Expand All @@ -171,10 +70,7 @@ impl OrchestratorClient {
// get the corresponding config
let f = |client: Client<ClientError>| {
async move {
let config: Result<
NetworkConfig<TYPES::SignatureKey, TYPES::ElectionConfigType>,
ClientError,
> = client
let config: Result<NetworkConfig<K, E>, ClientError> = client
.post(&format!("api/config/{node_index}"))
.send()
.await;
Expand All @@ -183,7 +79,11 @@ impl OrchestratorClient {
.boxed()
};

(node_index, self.wait_for_fn_from_orchestrator(f).await)
let mut config = self.wait_for_fn_from_orchestrator(f).await;

config.node_index = node_index as u64;

config
}

/// Tells the orchestrator this validator is ready to start
Expand Down
Loading

0 comments on commit 220d45b

Please sign in to comment.