Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
upgrade to tokio 1
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert-Steiner committed Jan 30, 2021
1 parent c1a186e commit 9aed5ea
Show file tree
Hide file tree
Showing 24 changed files with 1,561 additions and 9 deletions.
274 changes: 274 additions & 0 deletions rust/Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ members = [
# internals
"benches",
"examples",
"e2e",
]

[workspace.metadata]
Expand Down
2 changes: 2 additions & 0 deletions rust/e2e/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
**/*.log
**/*.pem
35 changes: 35 additions & 0 deletions rust/e2e/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "e2e"
version = "0.0.0"
authors = ["Xayn Engineering <engineering@xaynet.dev>"]
edition = "2018"
description = "End-to-end tests for Xaynet"
readme = "../../README.md"
homepage = "https://xaynet.dev/"
repository = "https://github.com/xaynetwork/xaynet/"
keywords = ["xaynet", "e2e"]
license-file = "../../LICENSE"
publish = false

[dependencies]
anyhow = "1.0.38"
async-trait = "0.1.42"
chrono = { version = "0.4.19" }
config = "0.10.1"
console = "0.14"
futures = "0.3.12"
indicatif = "0.15.0"
influxdb = { version = "0.3.0", default-features = false, features = ["h1-client", "use-serde"] }
kube = { version = "0.48.0" }
k8s-openapi = { version = "0.11.0", default-features = false, features = ["v1_19"] }
reqwest = { version = "0.11.0", default-features = false, features = ["json", "gzip", "stream"] }
serde = { version = "1.0.123", features = ["derive"] }
serde_json = { version = "1.0.61" }
serde_yaml = "0.8.15"
tokio = { version = "1.1.0", features = ["full"] }
toml = "0.5"
tracing = "0.1.22"
tracing-subscriber = "0.2.15"
xaynet-core = { path = "../xaynet-core" }
xaynet-sdk = { path = "../xaynet-sdk", features = ["reqwest-client"] }
xaynet-server = { path = "../xaynet-server", features = ["model-persistence"] }
78 changes: 78 additions & 0 deletions rust/e2e/src/bin/test_case_1.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use e2e::{
test_client::builder::{TestClientBuilder, TestClientBuilderSettings},
test_env::{utils, TestEnvironment, TestEnvironmentSettings},
};
use tokio::{
signal,
time::{timeout, Duration},
};
use tracing::info;
use xaynet_server::state_machine::phases::PhaseName;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let env_settings = TestEnvironmentSettings::from_file("src/bin/test_case_1")?;
let env = TestEnvironment::new(env_settings.clone()).await?;

tokio::select! {
res = timeout(Duration::from_secs(6000), run(env)) => {
res?
}
_ = signal::ctrl_c() => { Ok(()) }
}
}

async fn run(mut env: TestEnvironment) -> anyhow::Result<()> {
let k8s = env.get_k8s_client().await?;
k8s.deploy_with_image_and_config(env.get_env_settings().coordinator.config)
.await?;
let handle = k8s
.save_coordinator_logs("src/bin/test_case_1/coordinator.log")
.await?;

let _pfi_guard = k8s.port_forward_influxdb()?;
let _pfc_guard = if env.get_env_settings().api_client.certificates.is_none() {
Some(k8s.port_forward_coordinator().await?)
} else {
None
};

let mut api_client = env.get_api_client()?;
let mut influx_client = env.get_influx_client();

info!("wait until clients are ready");
let _ = tokio::join!(
utils::wait_until_client_is_ready(&mut api_client),
utils::wait_until_client_is_ready(&mut influx_client),
);
utils::wait_until_phase(&influx_client, PhaseName::Sum).await;

////////////////////////////////////////////////////////////////////////////////////////////////

let coordinator_settings = env.get_coordinator_settings()?;
let test_client_builder_settings = TestClientBuilderSettings::from(coordinator_settings);

let mut test_client_builder = TestClientBuilder::new(test_client_builder_settings, api_client);

////////////////////////////////////////////////////////////////////////////////////////////////

for round in 0..10 {
info!("Round: {}", round);

let mut runner = test_client_builder.build_clients().await?;
info!("run sum clients...");
runner.run_sum_clients().await?;
utils::wait_until_phase(&influx_client, PhaseName::Update).await;
info!("run update clients...");
runner.run_update_clients().await?;
utils::wait_until_phase(&influx_client, PhaseName::Sum2).await;
info!("run sum2 clients...");
runner.run_sum2_clients().await?;
utils::wait_until_phase(&influx_client, PhaseName::Sum).await;
}

////////////////////////////////////////////////////////////////////////////////////////////////

timeout(Duration::from_secs(10), handle).await???;
Ok(())
}
31 changes: 31 additions & 0 deletions rust/e2e/src/bin/test_case_1/Env.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
filter = "test_case=debug,e2e=debug,xaynet=info"

[k8s]
namespace = "xaynet"
coordinator_pod_label = "app=coordinator"
coordinator_image = "xaynetwork/xaynet:development"
influxdb_pod_name = "influxdb-0"
redis_pod_name = "redis-master-0"
s3_pod_label = "minio"

[coordinator]
config = "src/bin/test_case_1/config.toml"

[influx]
url = "http://localhost:8086"
db = "metrics"

[redis]
url = "redis://localhost/"

[s3]
access_key = "minio"
secret_access_key = "minio123"
region = ["minio", "http://localhost:9000"]

[api_client]
address = "http://localhost:8081"
# tls
# address = "https://dev-coordinator.xayn.com"
# certificates = [ "src/bin/test_case_1/dev-coordinator-xayn-com.pem" ]
# identity =
45 changes: 45 additions & 0 deletions rust/e2e/src/bin/test_case_1/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[log]
filter = "xaynet=debug,http=warn,info"

[api]
bind_address = "0.0.0.0:8081"
tls_certificate = "/app/ssl/tls.pem"
tls_key = "/app/ssl/tls.key"

[pet.sum]
prob = 0.5
count = { min = 10, max = 100 }
time = { min = 5, max = 3600 }

[pet.update]
prob = 0.9
count = { min = 3, max = 10000 }
time = { min = 10, max = 3600 }

[pet.sum2]
count = { min = 5, max = 100 }
time = { min = 5, max = 3600 }

[mask]
group_type = "Prime"
data_type = "F32"
bound_type = "B0"
model_type = "M3"

[model]
length = 1

[metrics.influxdb]
url = "http://influxdb:8086"
db = "metrics"

[redis]
url = "redis://127.0.0.1/"

[s3]
access_key = "minio"
secret_access_key = "minio123"
region = ["minio", "http://minio:9000"]

[restore]
enable = false
3 changes: 3 additions & 0 deletions rust/e2e/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod test_client;
pub mod test_env;
pub mod utils;
118 changes: 118 additions & 0 deletions rust/e2e/src/test_client/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::sync::Arc;

use anyhow::bail;
use xaynet_core::{
crypto::SigningKeyPair,
mask::{FromPrimitives, Model},
};
use xaynet_sdk::{client::Client as ApiClient, XaynetClient};
use xaynet_server::settings::Settings as CoordinatorSettings;

use super::{
runner::ClientRunner,
utils::{default_sum_client, default_update_client, generate_client, ClientType, LocalModel},
};
use crate::utils::concurrent_futures::ConcurrentFutures;

pub struct TestClientBuilderSettings {
number_of_sum: u64,
number_of_update: u64,
number_of_sum2: u64,
model_length: usize,
}

impl TestClientBuilderSettings {
pub fn new(
number_of_sum: u64,
number_of_update: u64,
number_of_sum2: u64,
model_length: usize,
) -> Self {
Self {
number_of_sum,
number_of_update,
number_of_sum2,
model_length,
}
}
}

impl From<CoordinatorSettings> for TestClientBuilderSettings {
fn from(settings: CoordinatorSettings) -> Self {
Self {
number_of_sum: settings.pet.sum.count.min,
number_of_update: settings.pet.update.count.min,
number_of_sum2: settings.pet.sum2.count.min,
model_length: settings.model.length,
}
}
}

pub struct TestClientBuilder {
settings: TestClientBuilderSettings,
api_client: ApiClient<reqwest::Client>,
model: Arc<Model>,
}

impl TestClientBuilder {
pub fn new(
settings: TestClientBuilderSettings,
api_client: ApiClient<reqwest::Client>,
) -> Self {
let model = Model::from_primitives(vec![1; settings.model_length].into_iter()).unwrap();
Self {
api_client,
settings,
model: Arc::new(model),
}
}

pub async fn build_client<F, R>(
&mut self,
r#type: &ClientType,
func: F,
) -> anyhow::Result<ConcurrentFutures<R>>
where
F: Fn(SigningKeyPair, ApiClient<reqwest::Client>, LocalModel) -> R,
R: Send + 'static + futures::Future,
<R as futures::Future>::Output: Send + 'static,
{
let round_params = self.api_client.get_round_params().await?;
let mut clients = ConcurrentFutures::<R>::new(100);

let number_of_clients = match r#type {
ClientType::Sum => self.settings.number_of_sum,
ClientType::Update => self.settings.number_of_update,
_ => bail!("client type is not supported"),
};

for _ in 0..number_of_clients {
let key_pair = generate_client(r#type, &round_params);
let client = func(
key_pair,
self.api_client.clone(),
LocalModel(self.model.clone()),
);

clients.push(client);
}

Ok(clients)
}

pub async fn build_clients(&mut self) -> anyhow::Result<ClientRunner> {
let sum_clients = self
.build_client(&ClientType::Sum, default_sum_client)
.await?;

let update_clients = self
.build_client(&ClientType::Update, default_update_client)
.await?;

Ok(ClientRunner::new(
sum_clients,
update_clients,
self.settings.number_of_sum2,
))
}
}
3 changes: 3 additions & 0 deletions rust/e2e/src/test_client/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod builder;
pub mod runner;
pub mod utils;
Loading

0 comments on commit 9aed5ea

Please sign in to comment.