From e95a3be071e73615f11620c3e39deee81beff7ae Mon Sep 17 00:00:00 2001 From: Jake Date: Thu, 23 Jan 2020 02:16:40 -0800 Subject: [PATCH] tokio 0.2 + std::future support (#2) --- Cargo.toml | 32 +- docker-compose.yml | 2 +- rustfmt.toml | 2 - src/auth.rs | 483 ++++++++------------ src/client.rs | 175 ++----- src/error.rs | 13 +- src/first_ok.rs | 79 +--- src/http.rs | 26 +- src/kv.rs | 366 +++++++-------- src/lib.rs | 32 +- src/members.rs | 217 ++++----- src/stats.rs | 63 ++- tests/auth_test.rs | 182 +++----- tests/client_test.rs | 41 +- tests/kv_test.rs | 1009 ++++++++++++++++------------------------- tests/members_test.rs | 20 +- tests/stats_test.rs | 39 +- tests/test.rs | 48 +- 18 files changed, 1060 insertions(+), 1769 deletions(-) delete mode 100644 rustfmt.toml diff --git a/Cargo.toml b/Cargo.toml index af5e9a7..9bbf6a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,31 +9,35 @@ license = "MIT" name = "etcd" readme = "README.md" repository = "https://github.com/jimmycuadra/rust-etcd" -version = "0.9.0" +version = "0.10.0" [lib] test = false [dependencies] -futures = "0.1.25" -hyper = "0.12.13" -http = "0.1.13" -serde = "1.0.80" -serde_derive = "1.0.80" -serde_json = "1.0.32" -url = "1.7.1" -base64 = "0.10.0" -log = "0.4.6" -tokio = "0.1.13" +futures = "0.3" +hyper = "0.13" +http = "0.2" +serde = "1.0" +serde_derive = "1.0" +serde_json = "1.0" +url = "2.1" +base64 = "0.11" +log = "0.4" +tokio = { version = "0.2", features = ["rt-core", "test-util", "macros", "time"] } [dependencies.hyper-tls] optional = true -version = "0.3.1" +version = "0.4" + +[dependencies.tokio-tls] +optional = true +version = "0.3" [dependencies.native-tls] optional = true -version = "0.2.2" +version = "0.2" [features] default = ["tls"] -tls = ["hyper-tls", "native-tls"] +tls = ["hyper-tls", "tokio-tls", "native-tls"] diff --git a/docker-compose.yml b/docker-compose.yml index 2a289c1..07b9dbf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: volumes: - ./tests/ssl:/ssl rust: - image: rust:1.31.0 + image: rust:1.40.0 environment: RUST_BACKTRACE: 1 RUST_TEST_THREADS: 1 diff --git a/rustfmt.toml b/rustfmt.toml deleted file mode 100644 index 84896d4..0000000 --- a/rustfmt.toml +++ /dev/null @@ -1,2 +0,0 @@ -imports_indent = "Block" -imports_layout = "HorizontalVertical" diff --git a/src/auth.rs b/src/auth.rs index 0136fe1..5408d4f 100644 --- a/src/auth.rs +++ b/src/auth.rs @@ -2,17 +2,15 @@ //! //! These API endpoints are used to manage users and roles. -use std::str::FromStr; - -use futures::{Future, IntoFuture, Stream}; use hyper::client::connect::Connect; use hyper::{StatusCode, Uri}; use serde_derive::{Deserialize, Serialize}; use serde_json; +use std::future::Future; use crate::client::{Client, ClusterInfo, Response}; use crate::error::{ApiError, Error}; -use crate::first_ok::first_ok; +use crate::first_ok::{first_ok, Result}; /// The structure returned by the `GET /v2/auth/enable` endpoint. #[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] @@ -414,191 +412,146 @@ impl Permission { } /// Creates a new role. -pub fn create_role( - client: &Client, - role: Role, -) -> impl Future, Error = Vec> + Send +pub fn create_role(client: &Client, role: Role) -> impl Future> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, { let http_client = client.http_client().clone(); first_ok(client.endpoints().to_vec(), move |member| { - let body = serde_json::to_string(&role) - .map_err(Error::from) - .into_future(); - - let url = build_url(member, &format!("/roles/{}", role.name)); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - - let params = uri.join(body); - let http_client = http_client.clone(); + let role = role.clone(); - let response = - params.and_then(move |(uri, body)| http_client.put(uri, body).map_err(Error::from)); + async move { + let body = serde_json::to_string(&role)?; + let uri = build_uri(&member, &format!("/roles/{}", role.name))?; + let response = http_client.put(uri, body).await?; - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); + let body = hyper::body::to_bytes(response).await?; - body.and_then(move |ref body| match status { + match status { StatusCode::OK | StatusCode::CREATED => { - match serde_json::from_slice::(body) { + match serde_json::from_slice::(&body) { Ok(data) => Ok(Response { data, cluster_info }), Err(error) => Err(Error::Serialization(error)), } } status => Err(Error::UnexpectedStatus(status)), - }) - }) + } + } }) } /// Creates a new user. -pub fn create_user( - client: &Client, - user: NewUser, -) -> impl Future, Error = Vec> + Send +pub fn create_user(client: &Client, user: NewUser) -> impl Future> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Send + Sync + 'static, { let http_client = client.http_client().clone(); first_ok(client.endpoints().to_vec(), move |member| { - let body = serde_json::to_string(&user) - .map_err(Error::from) - .into_future(); - - let url = build_url(member, &format!("/users/{}", user.name)); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - - let params = uri.join(body); - let http_client = http_client.clone(); + let user = user.clone(); - let response = - params.and_then(move |(uri, body)| http_client.put(uri, body).map_err(Error::from)); + async move { + let body = serde_json::to_string(&user)?; + let uri = build_uri(&member, &format!("/users/{}", user.name))?; + let response = http_client.put(uri, body).await?; - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); + let body = hyper::body::to_bytes(response).await?; - body.and_then(move |ref body| match status { + match status { StatusCode::OK | StatusCode::CREATED => { - match serde_json::from_slice::(body) { + match serde_json::from_slice::(&body) { Ok(data) => Ok(Response { data, cluster_info }), Err(error) => Err(Error::Serialization(error)), } } status => Err(Error::UnexpectedStatus(status)), - }) - }) + } + } }) } /// Deletes a role. -pub fn delete_role( - client: &Client, - name: N, -) -> impl Future, Error = Vec> + Send +pub fn delete_role(client: &Client, name: N) -> impl Future> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, N: Into, { let http_client = client.http_client().clone(); let name = name.into(); first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, &format!("/roles/{}", name)); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - let http_client = http_client.clone(); + let name = name.clone(); - let response = uri.and_then(move |uri| http_client.delete(uri).map_err(Error::from)); + async move { + let uri = build_uri(&member, &format!("/roles/{}", name))?; + let response = http_client.delete(uri).await?; - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - if status == StatusCode::OK { - Ok(Response { + match status { + StatusCode::OK => Ok(Response { data: (), cluster_info, - }) - } else { - Err(Error::UnexpectedStatus(status)) + }), + status => Err(Error::UnexpectedStatus(status)), } - }) + } }) } /// Deletes a user. -pub fn delete_user( - client: &Client, - name: N, -) -> impl Future, Error = Vec> + Send +pub fn delete_user(client: &Client, name: N) -> impl Future> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, N: Into, { let http_client = client.http_client().clone(); let name = name.into(); first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, &format!("/users/{}", name)); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - let http_client = http_client.clone(); + let name = name.clone(); - let response = uri.and_then(move |uri| http_client.delete(uri).map_err(Error::from)); + async move { + let uri = build_uri(&member, &format!("/users/{}", name))?; + let response = http_client.delete(uri).await?; - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - if status == StatusCode::OK { - Ok(Response { + match status { + StatusCode::OK => Ok(Response { data: (), cluster_info, - }) - } else { - Err(Error::UnexpectedStatus(status)) + }), + status => Err(Error::UnexpectedStatus(status)), } - }) + } }) } /// Attempts to disable the auth system. -pub fn disable( - client: &Client, -) -> impl Future, Error = Vec> + Send +pub fn disable(client: &Client) -> impl Future> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Send + Sync + 'static, { let http_client = client.http_client().clone(); first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, "/enable"); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - let http_client = http_client.clone(); - let response = uri.and_then(move |uri| http_client.delete(uri).map_err(Error::from)); - - response.and_then(|response| { + async move { + let uri = build_uri(&member, "/enable")?; + let response = http_client.delete(uri).await?; let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); @@ -613,31 +566,23 @@ where }), _ => Err(Error::UnexpectedStatus(status)), } - }) + } }) } /// Attempts to enable the auth system. -pub fn enable( - client: &Client, -) -> impl Future, Error = Vec> + Send +pub fn enable(client: &Client) -> impl Future> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Send + Sync + 'static, { let http_client = client.http_client().clone(); first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, "/enable"); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - let http_client = http_client.clone(); - let response = - uri.and_then(move |uri| http_client.put(uri, "".to_owned()).map_err(Error::from)); - - response.and_then(|response| { + async move { + let uri = build_uri(&member, "/enable")?; + let response = http_client.put(uri, "".to_owned()).await?; let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); @@ -650,315 +595,241 @@ where data: AuthChange::Unchanged, cluster_info, }), - _ => return Err(Error::UnexpectedStatus(status)), + _ => Err(Error::UnexpectedStatus(status)), } - }) + } }) } /// Get a role. -pub fn get_role( - client: &Client, - name: N, -) -> impl Future, Error = Vec> + Send +pub fn get_role(client: &Client, name: N) -> impl Future> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, N: Into, { let http_client = client.http_client().clone(); let name = name.into(); first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, &format!("/roles/{}", name)); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - let http_client = http_client.clone(); + let name = name.clone(); - let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from)); + async move { + let uri = build_uri(&member, &format!("/roles/{}", name))?; + let response = http_client.get(uri).await?; - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); + let body = hyper::body::to_bytes(response).await?; - body.and_then(move |ref body| { - if status == StatusCode::OK { - match serde_json::from_slice::(body) { - Ok(data) => Ok(Response { data, cluster_info }), - Err(error) => Err(Error::Serialization(error)), - } - } else { - Err(Error::UnexpectedStatus(status)) + if status == StatusCode::OK { + match serde_json::from_slice::(&body) { + Ok(data) => Ok(Response { data, cluster_info }), + Err(error) => Err(Error::Serialization(error)), } - }) - }) + } else { + Err(Error::UnexpectedStatus(status)) + } + } }) } /// Gets all roles. -pub fn get_roles( - client: &Client, -) -> impl Future>, Error = Vec> + Send +pub fn get_roles(client: &Client) -> impl Future>> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, { let http_client = client.http_client().clone(); first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, "/roles"); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - let http_client = http_client.clone(); - let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from)); - - response.and_then(|response| { + async move { + let uri = build_uri(&member, "/roles")?; + let response = http_client.get(uri).await?; let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); + let body = hyper::body::to_bytes(response).await?; - body.and_then(move |ref body| { - if status == StatusCode::OK { - match serde_json::from_slice::(body) { - Ok(roles) => { - let data = roles.roles.unwrap_or_else(|| Vec::with_capacity(0)); + if status == StatusCode::OK { + match serde_json::from_slice::(&body) { + Ok(roles) => { + let data = roles.roles.unwrap_or_default(); - Ok(Response { data, cluster_info }) - } - Err(error) => Err(Error::Serialization(error)), + Ok(Response { data, cluster_info }) } - } else { - Err(Error::UnexpectedStatus(status)) + Err(error) => Err(Error::Serialization(error)), } - }) - }) + } else { + Err(Error::UnexpectedStatus(status)) + } + } }) } /// Get a user. -pub fn get_user( - client: &Client, - name: N, -) -> impl Future, Error = Vec> + Send +pub fn get_user(client: &Client, name: N) -> impl Future> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, N: Into, { let http_client = client.http_client().clone(); let name = name.into(); first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, &format!("/users/{}", name)); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - let http_client = http_client.clone(); + let name = name.clone(); + async move { + let uri = build_uri(&member, &format!("/users/{}", name))?; + let response = http_client.get(uri).await?; - let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from)); - - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); + let body = hyper::body::to_bytes(response).await?; - body.and_then(move |ref body| { - if status == StatusCode::OK { - match serde_json::from_slice::(body) { - Ok(data) => Ok(Response { data, cluster_info }), - Err(error) => Err(Error::Serialization(error)), - } - } else { - Err(Error::UnexpectedStatus(status)) + if status == StatusCode::OK { + match serde_json::from_slice::(&body) { + Ok(data) => Ok(Response { data, cluster_info }), + Err(error) => Err(Error::Serialization(error)), } - }) - }) + } else { + Err(Error::UnexpectedStatus(status)) + } + } }) } /// Gets all users. -pub fn get_users( - client: &Client, -) -> impl Future>, Error = Vec> + Send +pub fn get_users(client: &Client) -> impl Future>> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, { let http_client = client.http_client().clone(); first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, "/users"); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - let http_client = http_client.clone(); + async move { + let uri = build_uri(&member, "/users")?; + let response = http_client.get(uri).await?; - let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from)); - - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); + let body = hyper::body::to_bytes(response).await?; - body.and_then(move |ref body| { - if status == StatusCode::OK { - match serde_json::from_slice::(body) { - Ok(users) => { - let data = users.users.unwrap_or_else(|| Vec::with_capacity(0)); + if status == StatusCode::OK { + match serde_json::from_slice::(&body) { + Ok(users) => { + let data = users.users.unwrap_or_default(); - Ok(Response { data, cluster_info }) - } - Err(error) => Err(Error::Serialization(error)), + Ok(Response { data, cluster_info }) } - } else { - Err(Error::UnexpectedStatus(status)) + Err(error) => Err(Error::Serialization(error)), } - }) - }) + } else { + Err(Error::UnexpectedStatus(status)) + } + } }) } /// Determines whether or not the auth system is enabled. -pub fn status( - client: &Client, -) -> impl Future, Error = Vec> + Send +pub fn status(client: &Client) -> impl Future> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, { let http_client = client.http_client().clone(); first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, "/enable"); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - let http_client = http_client.clone(); + async move { + let uri = build_uri(&member, "/enable")?; + let response = http_client.get(uri).await?; - let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from)); - - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); - - body.and_then(move |ref body| { - if status == StatusCode::OK { - match serde_json::from_slice::(body) { - Ok(data) => Ok(Response { - data: data.enabled, - cluster_info, - }), - Err(error) => Err(Error::Serialization(error)), - } - } else { - match serde_json::from_slice::(body) { - Ok(error) => Err(Error::Api(error)), - Err(error) => Err(Error::Serialization(error)), - } + let body = hyper::body::to_bytes(response).await?; + if status == StatusCode::OK { + match serde_json::from_slice::(&body) { + Ok(data) => Ok(Response { + data: data.enabled, + cluster_info, + }), + Err(error) => Err(Error::Serialization(error)), } - }) - }) + } else { + match serde_json::from_slice::(&body) { + Ok(error) => Err(Error::Api(error)), + Err(error) => Err(Error::Serialization(error)), + } + } + } }) } /// Updates an existing role. -pub fn update_role( - client: &Client, - role: RoleUpdate, -) -> impl Future, Error = Vec> + Send +pub fn update_role(client: &Client, role: RoleUpdate) -> impl Future> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, { let http_client = client.http_client().clone(); first_ok(client.endpoints().to_vec(), move |member| { - let body = serde_json::to_string(&role) - .map_err(Error::from) - .into_future(); - - let url = build_url(member, &format!("/roles/{}", role.name)); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - - let params = uri.join(body); - let http_client = http_client.clone(); + let role = role.clone(); - let response = - params.and_then(move |(uri, body)| http_client.put(uri, body).map_err(Error::from)); + async move { + let body = serde_json::to_string(&role)?; + let uri = build_uri(&member, &format!("/roles/{}", role.name))?; + let response = http_client.put(uri, body).await?; - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); + let body = hyper::body::to_bytes(response).await?; - body.and_then(move |ref body| { - if status == StatusCode::OK { - match serde_json::from_slice::(body) { - Ok(data) => Ok(Response { data, cluster_info }), - Err(error) => Err(Error::Serialization(error)), - } - } else { - Err(Error::UnexpectedStatus(status)) + if status == StatusCode::OK { + match serde_json::from_slice::(&body) { + Ok(data) => Ok(Response { data, cluster_info }), + Err(error) => Err(Error::Serialization(error)), } - }) - }) + } else { + Err(Error::UnexpectedStatus(status)) + } + } }) } - /// Updates an existing user. -pub fn update_user( - client: &Client, - user: UserUpdate, -) -> impl Future, Error = Vec> + Send +pub fn update_user(client: &Client, user: UserUpdate) -> impl Future> where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, { let http_client = client.http_client().clone(); first_ok(client.endpoints().to_vec(), move |member| { - let body = serde_json::to_string(&user) - .map_err(Error::from) - .into_future(); - - let url = build_url(member, &format!("/users/{}", user.name)); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - - let params = uri.join(body); - let http_client = http_client.clone(); + let user = user.clone(); - let response = - params.and_then(move |(uri, body)| http_client.put(uri, body).map_err(Error::from)); + async move { + let body = serde_json::to_string(&user)?; + let uri = build_uri(&member, &format!("/users/{}", user.name))?; + let response = http_client.put(uri, body).await?; - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); + let body = hyper::body::to_bytes(response).await?; - body.and_then(move |ref body| { - if status == StatusCode::OK { - match serde_json::from_slice::(body) { - Ok(data) => Ok(Response { data, cluster_info }), - Err(error) => Err(Error::Serialization(error)), - } - } else { - Err(Error::UnexpectedStatus(status)) + if status == StatusCode::OK { + match serde_json::from_slice::(&body) { + Ok(data) => Ok(Response { data, cluster_info }), + Err(error) => Err(Error::Serialization(error)), } - }) - }) + } else { + Err(Error::UnexpectedStatus(status)) + } + } }) } /// Constructs the full URL for an API call. -fn build_url(endpoint: &Uri, path: &str) -> String { - format!("{}v2/auth{}", endpoint, path) +fn build_uri(endpoint: &Uri, path: &str) -> std::result::Result { + format!("{}v2/auth{}", endpoint, path).parse() } diff --git a/src/client.rs b/src/client.rs index 775f26a..80a1330 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,8 @@ //! Contains the etcd client. All API calls are made via the client. -use futures::stream::futures_unordered; -use futures::{Future, IntoFuture, Stream}; +use std::future::Future; + +use futures::stream::{self, Stream, StreamExt}; use http::header::{HeaderMap, HeaderValue}; use hyper::client::connect::{Connect, HttpConnector}; use hyper::{Client as Hyper, StatusCode, Uri}; @@ -46,7 +47,7 @@ const XRAFT_TERM: &str = "X-Raft-Term"; #[derive(Clone, Debug)] pub struct Client where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, { endpoints: Vec, http_client: HttpClient, @@ -109,7 +110,7 @@ impl Client> { endpoints: &[&str], basic_auth: Option, ) -> Result>, Error> { - let connector = HttpsConnector::new(4)?; + let connector = HttpsConnector::new(); let hyper = Hyper::builder().keep_alive(true).build(connector); Client::custom(hyper, endpoints, basic_auth) @@ -118,7 +119,7 @@ impl Client> { impl Client where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, { /// Constructs a new client using the provided `hyper::Client`. /// @@ -135,62 +136,6 @@ where /// # Errors /// /// Fails if no endpoints are provided or if any of the endpoints is an invalid URL. - /// - /// # Examples - /// - /// Configuring the client to authenticate with both HTTP basic auth and an X.509 client - /// certificate: - /// - /// ```no_run - /// use std::fs::File; - /// use std::io::Read; - /// - /// use futures::Future; - /// use hyper::client::HttpConnector; - /// use hyper_tls::HttpsConnector; - /// use native_tls::{Certificate, TlsConnector, Identity}; - /// use tokio::runtime::Runtime; - /// - /// use etcd::{Client, kv}; - /// - /// fn main() { - /// let mut ca_cert_file = File::open("ca.der").unwrap(); - /// let mut ca_cert_buffer = Vec::new(); - /// ca_cert_file.read_to_end(&mut ca_cert_buffer).unwrap(); - /// - /// let mut pkcs12_file = File::open("/source/tests/ssl/client.p12").unwrap(); - /// let mut pkcs12_buffer = Vec::new(); - /// pkcs12_file.read_to_end(&mut pkcs12_buffer).unwrap(); - /// - /// let mut builder = TlsConnector::builder(); - /// builder.add_root_certificate(Certificate::from_der(&ca_cert_buffer).unwrap()); - /// builder.identity(Identity::from_pkcs12(&pkcs12_buffer, "secret").unwrap()); - /// - /// let tls_connector = builder.build().unwrap(); - /// - /// let mut http_connector = HttpConnector::new(4); - /// http_connector.enforce_http(false); - /// let https_connector = HttpsConnector::from((http_connector, tls_connector)); - /// - /// let hyper = hyper::Client::builder().build(https_connector); - /// - /// let client = Client::custom(hyper, &["https://etcd.example.com:2379"], None).unwrap(); - /// - /// let work = kv::set(&client, "/foo", "bar", None).and_then(move |_| { - /// let get_request = kv::get(&client, "/foo", kv::GetOptions::default()); - /// - /// get_request.and_then(|response| { - /// let value = response.data.node.value.unwrap(); - /// - /// assert_eq!(value, "bar".to_string()); - /// - /// Ok(()) - /// }) - /// }); - /// - /// assert!(Runtime::new().unwrap().block_on(work).is_ok()); - /// } - /// ``` pub fn custom( hyper: Hyper, endpoints: &[&str], @@ -223,97 +168,49 @@ where } /// Runs a basic health check against each etcd member. - pub fn health(&self) -> impl Stream, Error = Error> + Send { - let futures = self.endpoints.iter().map(|endpoint| { - let url = build_url(&endpoint, "health"); - let uri = url.parse().map_err(Error::from).into_future(); - let cloned_client = self.http_client.clone(); - let response = uri.and_then(move |uri| cloned_client.get(uri).map_err(Error::from)); - response.and_then(|response| { - let status = response.status(); - let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); - - body.and_then(move |ref body| { - if status == StatusCode::OK { - match serde_json::from_slice::(body) { - Ok(data) => Ok(Response { data, cluster_info }), - Err(error) => Err(Error::Serialization(error)), - } - } else { - match serde_json::from_slice::(body) { - Ok(error) => Err(Error::Api(error)), - Err(error) => Err(Error::Serialization(error)), - } - } - }) + pub fn health<'a>(&'a self) -> impl Stream, Error>> + 'a { + stream::iter(self.endpoints.clone()) + .map(move |endpoint| async move { + let uri = build_url(&endpoint, "health")?; + self.request(uri).await }) - }); - - futures_unordered(futures) + .buffer_unordered(self.endpoints.len()) } /// Returns version information from each etcd cluster member the client was initialized with. - pub fn versions(&self) -> impl Stream, Error = Error> + Send { - let futures = self.endpoints.iter().map(|endpoint| { - let url = build_url(&endpoint, "version"); - let uri = url.parse().map_err(Error::from).into_future(); - let cloned_client = self.http_client.clone(); - let response = uri.and_then(move |uri| cloned_client.get(uri).map_err(Error::from)); - response.and_then(|response| { - let status = response.status(); - let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); - - body.and_then(move |ref body| { - if status == StatusCode::OK { - match serde_json::from_slice::(body) { - Ok(data) => Ok(Response { data, cluster_info }), - Err(error) => Err(Error::Serialization(error)), - } - } else { - match serde_json::from_slice::(body) { - Ok(error) => Err(Error::Api(error)), - Err(error) => Err(Error::Serialization(error)), - } - } - }) + pub fn versions<'a>(&'a self) -> impl Stream, Error>> + 'a { + stream::iter(self.endpoints.clone()) + .map(move |endpoint| async move { + let uri = build_url(&endpoint, "version")?; + self.request(uri).await }) - }); - - futures_unordered(futures) + .buffer_unordered(self.endpoints.len()) } /// Lets other internal code make basic HTTP requests. - pub(crate) fn request( - &self, - uri: U, - ) -> impl Future, Error = Error> + Send + pub(crate) fn request(&self, uri: Uri) -> impl Future, Error>> where - U: Future + Send, T: DeserializeOwned + Send + 'static, { let http_client = self.http_client.clone(); - let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from)); - response.and_then(|response| { + + async move { + let response = http_client.get(uri).await?; let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); - - body.and_then(move |body| { - if status == StatusCode::OK { - match serde_json::from_slice::(&body) { - Ok(data) => Ok(Response { data, cluster_info }), - Err(error) => Err(Error::Serialization(error)), - } - } else { - match serde_json::from_slice::(&body) { - Ok(error) => Err(Error::Api(error)), - Err(error) => Err(Error::Serialization(error)), - } + let body = hyper::body::to_bytes(response).await?; + if status == StatusCode::OK { + match serde_json::from_slice::(&body) { + Ok(data) => Ok(Response { data, cluster_info }), + Err(error) => Err(Error::Serialization(error)), } - }) - }) + } else { + match serde_json::from_slice::(&body) { + Ok(error) => Err(Error::Api(error)), + Err(error) => Err(Error::Serialization(error)), + } + } + } } } @@ -403,6 +300,6 @@ impl<'a> From<&'a HeaderMap> for ClusterInfo { } /// Constructs the full URL for the versions API call. -fn build_url(endpoint: &Uri, path: &str) -> String { - format!("{}{}", endpoint, path) +fn build_url(endpoint: &Uri, path: &str) -> Result { + format!("{}{}", endpoint, path).parse() } diff --git a/src/error.rs b/src/error.rs index 44f37bb..256c28b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,7 +10,6 @@ use hyper::{Error as HttpError, StatusCode}; use native_tls::Error as TlsError; use serde_derive::{Deserialize, Serialize}; use serde_json::Error as SerializationError; -use tokio::timer::timeout::Error as TokioTimeoutError; use url::ParseError as UrlError; /// An error returned by an etcd API endpoint. @@ -72,10 +71,10 @@ impl Display for Error { match *self { Error::Api(ref error) => write!(f, "{}", error), Error::Http(ref error) => write!(f, "{}", error), - ref error @ Error::InvalidConditions => write!(f, "{}", error.description()), + ref error @ Error::InvalidConditions => write!(f, "{}", error.to_string()), Error::InvalidUri(ref error) => write!(f, "{}", error), Error::InvalidUrl(ref error) => write!(f, "{}", error), - ref error @ Error::NoEndpoints => write!(f, "{}", error.description()), + ref error @ Error::NoEndpoints => write!(f, "{}", error.to_string()), #[cfg(feature = "tls")] Error::Tls(ref error) => write!(f, "{}", error), Error::Serialization(ref error) => write!(f, "{}", error), @@ -145,16 +144,10 @@ pub enum WatchError { Timeout, } -impl From> for WatchError { - fn from(_: TokioTimeoutError) -> Self { - WatchError::Timeout - } -} - impl Display for WatchError { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> { match *self { - WatchError::Timeout => write!(f, "{}", self.description()), + WatchError::Timeout => write!(f, "{}", self.to_string()), ref other => other.fmt(f), } } diff --git a/src/first_ok.rs b/src/first_ok.rs index 4a068c6..757eb39 100644 --- a/src/first_ok.rs +++ b/src/first_ok.rs @@ -1,76 +1,29 @@ -use std::mem::replace; -use std::vec::IntoIter; +use crate::{Error, Response}; +use std::future::Future; -use futures::{Async, Future, Poll}; use hyper::Uri; /// Executes the given closure with each cluster member and short-circuit returns the first /// successful result. If all members are exhausted without success, the final error is /// returned. -pub fn first_ok(endpoints: Vec, callback: F) -> FirstOk -where - F: Fn(&Uri) -> T, - T: Future, -{ - let max_errors = endpoints.len(); - - FirstOk { - callback, - current_future: None, - endpoints: endpoints.into_iter(), - errors: Vec::with_capacity(max_errors), - } -} - -#[derive(Debug)] -#[must_use = "futures do nothing unless polled"] -pub struct FirstOk -where - F: Fn(&Uri) -> T, - T: Future, -{ +pub async fn first_ok( + endpoints: Vec, callback: F, - current_future: Option, - endpoints: IntoIter, - errors: Vec, -} - -impl Future for FirstOk +) -> std::result::Result> where - F: Fn(&Uri) -> T, - T: Future, + F: Fn(Uri) -> U, + U: Future>, { - type Item = T::Item; - type Error = Vec; - - fn poll(&mut self) -> Poll { - if let Some(mut current_future) = self.current_future.take() { - match current_future.poll() { - Ok(Async::NotReady) => { - self.current_future = Some(current_future); - - Ok(Async::NotReady) - } - Ok(Async::Ready(item)) => Ok(Async::Ready(item)), - Err(error) => { - self.errors.push(error); + let mut errors = Vec::with_capacity(endpoints.len()); - self.poll() - } - } - } else { - match self.endpoints.next() { - Some(endpoint) => { - self.current_future = Some((self.callback)(&endpoint)); - - self.poll() - } - None => { - let errors = replace(&mut self.errors, vec![]); - - Err(errors) - } - } + for endpoint in endpoints { + match (callback)(endpoint).await { + Ok(result) => return Ok(result), + Err(err) => errors.push(err), } } + + Err(errors) } + +pub type Result = std::result::Result, Vec>; diff --git a/src/http.rs b/src/http.rs index 3cdab5e..f96989e 100644 --- a/src/http.rs +++ b/src/http.rs @@ -10,7 +10,7 @@ use crate::client::BasicAuth; #[derive(Clone, Debug)] pub struct HttpClient where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, { basic_auth: Option, hyper: Hyper, @@ -18,7 +18,7 @@ where impl HttpClient where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Send + Sync + 'static, { /// Constructs a new `HttpClient`. pub fn new(hyper: Hyper, basic_auth: Option) -> Self { @@ -48,32 +48,32 @@ where // private /// Adds the Authorization HTTP header to a request if a credentials were supplied. - fn add_auth_header<'a>(&self, request: &mut Builder) { + fn add_auth_header(&self, request: Builder) -> Builder { if let Some(ref basic_auth) = self.basic_auth { let auth = format!("{}:{}", basic_auth.username, basic_auth.password); let header_value = format!("Basic {}", encode(&auth)); - request.header(AUTHORIZATION, header_value); + request.header(AUTHORIZATION, header_value) + } else { + request } } /// Makes a request to etcd. fn request(&self, method: Method, uri: Uri) -> ResponseFuture { - let mut request = Request::builder(); - request.method(method).uri(uri); - - self.add_auth_header(&mut request); - + let request = Request::builder().method(method).uri(uri); + let request = self.add_auth_header(request); self.hyper.request(request.body(Body::empty()).unwrap()) } /// Makes a request with an HTTP body to etcd. fn request_with_body(&self, method: Method, uri: Uri, body: String) -> ResponseFuture { - let mut request = Request::builder(); - request.method(method).uri(uri); - request.header(CONTENT_TYPE, "application/x-www-form-urlencoded"); + let request = Request::builder() + .method(method) + .uri(uri) + .header(CONTENT_TYPE, "application/x-www-form-urlencoded"); - self.add_auth_header(&mut request); + let request = self.add_auth_header(request); self.hyper.request(request.body(Body::from(body)).unwrap()) } diff --git a/src/kv.rs b/src/kv.rs index 49997b2..785ff54 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -5,28 +5,23 @@ //! there other other key-value pairs "underneath" it, such as "/foo/bar". use std::collections::HashMap; -use std::str::FromStr; use std::time::Duration; -use futures::future::{Future, IntoFuture}; -use futures::stream::Stream; use hyper::client::connect::Connect; use hyper::{StatusCode, Uri}; use serde_derive::{Deserialize, Serialize}; use serde_json; -use tokio::timer::Timeout; +use std::future::Future; +use tokio::time::timeout; use url::Url; pub use crate::error::WatchError; use crate::client::{Client, ClusterInfo, Response}; use crate::error::{ApiError, Error}; -use crate::first_ok::first_ok; +use crate::first_ok::{first_ok, Result}; use crate::options::{ - ComparisonConditions, - DeleteOptions, - GetOptions as InternalGetOptions, - SetOptions, + ComparisonConditions, DeleteOptions, GetOptions as InternalGetOptions, SetOptions, }; use url::form_urlencoded::Serializer; @@ -137,14 +132,14 @@ pub struct WatchOptions { /// # Errors /// /// Fails if the conditions didn't match or if no conditions were given. -pub fn compare_and_delete( - client: &Client, - key: &str, - current_value: Option<&str>, +pub fn compare_and_delete<'a, C>( + client: &'a Client, + key: &'a str, + current_value: Option<&'a str>, current_modified_index: Option, -) -> impl Future, Error = Vec> + Send +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_delete( client, @@ -176,16 +171,16 @@ where /// # Errors /// /// Fails if the conditions didn't match or if no conditions were given. -pub fn compare_and_swap( - client: &Client, - key: &str, - value: &str, +pub fn compare_and_swap<'a, C>( + client: &'a Client, + key: &'a str, + value: &'a str, ttl: Option, - current_value: Option<&str>, + current_value: Option<&'a str>, current_modified_index: Option, -) -> impl Future, Error = Vec> + Send +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_set( client, @@ -214,14 +209,14 @@ where /// # Errors /// /// Fails if the key already exists. -pub fn create( - client: &Client, - key: &str, - value: &str, +pub fn create<'a, C>( + client: &'a Client, + key: &'a str, + value: &'a str, ttl: Option, -) -> impl Future, Error = Vec> + Send +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_set( client, @@ -246,13 +241,13 @@ where /// # Errors /// /// Fails if the key already exists. -pub fn create_dir( - client: &Client, - key: &str, +pub fn create_dir<'a, C>( + client: &'a Client, + key: &'a str, ttl: Option, -) -> impl Future, Error = Vec> + Send +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_set( client, @@ -285,14 +280,14 @@ where /// # Errors /// /// Fails if the key already exists and is not a directory. -pub fn create_in_order( - client: &Client, - key: &str, - value: &str, +pub fn create_in_order<'a, C>( + client: &'a Client, + key: &'a str, + value: &'a str, ttl: Option, -) -> impl Future, Error = Vec> + Send +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_set( client, @@ -318,13 +313,13 @@ where /// # Errors /// /// Fails if the key is a directory and `recursive` is `false`. -pub fn delete( - client: &Client, - key: &str, +pub fn delete<'a, C>( + client: &'a Client, + key: &'a str, recursive: bool, -) -> impl Future, Error = Vec> + Send +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_delete( client, @@ -346,12 +341,12 @@ where /// # Errors /// /// Fails if the directory is not empty. -pub fn delete_dir( - client: &Client, - key: &str, -) -> impl Future, Error = Vec> + Send +pub fn delete_dir<'a, C>( + client: &'a Client, + key: &'a str, +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_delete( client, @@ -374,13 +369,13 @@ where /// # Errors /// /// Fails if the key doesn't exist. -pub fn get( - client: &Client, - key: &str, +pub fn get<'a, C>( + client: &'a Client, + key: &'a str, options: GetOptions, -) -> impl Future, Error = Vec> + Send +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_get( client, @@ -408,14 +403,14 @@ where /// # Errors /// /// Fails if the node is a directory. -pub fn set( - client: &Client, - key: &str, - value: &str, +pub fn set<'a, C>( + client: &'a Client, + key: &'a str, + value: &'a str, ttl: Option, -) -> impl Future, Error = Vec> + Send +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_set( client, @@ -439,13 +434,13 @@ where /// # Errors /// /// Fails if the node does not exist. -pub fn refresh( - client: &Client, - key: &str, +pub fn refresh<'a, C>( + client: &'a Client, + key: &'a str, ttl: u64, -) -> impl Future, Error = Vec> + Send +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_set( client, @@ -472,13 +467,13 @@ where /// # Errors /// /// Fails if the node is an existing directory. -pub fn set_dir( - client: &Client, - key: &str, +pub fn set_dir<'a, C>( + client: &'a Client, + key: &'a str, ttl: Option, -) -> impl Future, Error = Vec> + Send +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_set( client, @@ -503,14 +498,14 @@ where /// # Errors /// /// Fails if the key does not exist. -pub fn update( - client: &Client, - key: &str, - value: &str, +pub fn update<'a, C>( + client: &'a Client, + key: &'a str, + value: &'a str, ttl: Option, -) -> impl Future, Error = Vec> + Send +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_set( client, @@ -538,13 +533,13 @@ where /// # Errors /// /// Fails if the node does not exist. -pub fn update_dir( - client: &Client, - key: &str, +pub fn update_dir<'a, C>( + client: &'a Client, + key: &'a str, ttl: Option, -) -> impl Future, Error = Vec> + Send +) -> impl Future> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { raw_set( client, @@ -575,13 +570,13 @@ where /// /// Fails if a timeout is specified and the duration lapses without a response from the etcd /// cluster. -pub fn watch( +pub async fn watch( client: &Client, key: &str, options: WatchOptions, -) -> Box, Error = WatchError> + Send> +) -> std::result::Result, WatchError> where - C: Clone + Connect, + C: Clone + Connect + Sync + Send + 'static, { let work = raw_get( client, @@ -592,34 +587,31 @@ where wait: true, ..Default::default() }, - ) - .map_err(|errors| WatchError::Other(errors)); + ); if let Some(duration) = options.timeout { - Box::new( - Timeout::new(work, duration).map_err(|e| match e.into_inner() { - Some(we) => we, - None => WatchError::Timeout, - }), - ) + match timeout(duration.into(), work).await { + Ok(res) => res.map_err(WatchError::Other), + Err(_) => Err(WatchError::Timeout), + } } else { - Box::new(work) + work.await.map_err(WatchError::Other) } } /// Constructs the full URL for an API call. -fn build_url(endpoint: &Uri, path: &str) -> String { - format!("{}v2/keys{}", endpoint, path) +fn build_uri(endpoint: &Uri, path: &str) -> std::result::Result { + format!("{}v2/keys{}", endpoint, path).parse() } /// Handles all delete operations. -fn raw_delete( +async fn raw_delete( client: &Client, key: &str, options: DeleteOptions<'_>, -) -> Box, Error = Vec> + Send> +) -> Result where - C: Clone + Connect, + C: Clone + Connect + Sync + Send + 'static, { let mut query_pairs = HashMap::new(); @@ -635,7 +627,7 @@ where let conditions = options.conditions.unwrap(); if conditions.is_empty() { - return Box::new(Err(vec![Error::InvalidConditions]).into_future()); + return Err(vec![Error::InvalidConditions]); } if conditions.modified_index.is_some() { @@ -653,53 +645,43 @@ where let http_client = client.http_client().clone(); let key = key.to_string(); - let result = first_ok(client.endpoints().to_vec(), move |endpoint| { - let url = Url::parse_with_params(&build_url(endpoint, &key), query_pairs.clone()) - .map_err(Error::from) - .into_future(); - - let uri = url.and_then(|url| { - Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future() - }); - + first_ok(client.endpoints().to_vec(), move |endpoint| { let http_client = http_client.clone(); + let query_pairs = query_pairs.clone(); + let key = key.clone(); + async move { + let url = + Url::parse_with_params(&build_uri(&endpoint, &key)?.to_string(), query_pairs)?; + let uri = url.to_string().parse()?; + let response = http_client.delete(uri).await?; - let response = uri.and_then(move |uri| http_client.delete(uri).map_err(Error::from)); - - response.and_then(move |response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); - - body.and_then(move |ref body| { - if status == StatusCode::OK { - match serde_json::from_slice::(body) { - Ok(data) => Ok(Response { data, cluster_info }), - Err(error) => Err(Error::Serialization(error)), - } - } else { - match serde_json::from_slice::(body) { - Ok(error) => Err(Error::Api(error)), - Err(error) => Err(Error::Serialization(error)), - } + let body = hyper::body::to_bytes(response).await?; + if status == StatusCode::OK { + match serde_json::from_slice::(&body) { + Ok(data) => Ok(Response { data, cluster_info }), + Err(error) => Err(Error::Serialization(error)), } - }) - }) - }); - - Box::new(result) + } else { + match serde_json::from_slice::(&body) { + Ok(error) => Err(Error::Api(error)), + Err(error) => Err(Error::Serialization(error)), + } + } + } + }) + .await } /// Handles all get operations. -fn raw_get( +async fn raw_get( client: &Client, key: &str, options: InternalGetOptions, -) -> impl Future, Error = Vec> + Send +) -> Result where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { let mut query_pairs = HashMap::new(); @@ -721,50 +703,40 @@ where let key = key.to_string(); first_ok(client.endpoints().to_vec(), move |endpoint| { - let url = Url::parse_with_params(&build_url(endpoint, &key), query_pairs.clone()) - .map_err(Error::from) - .into_future(); - - let uri = url.and_then(|url| { - Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future() - }); - let http_client = http_client.clone(); + let key = key.clone(); + let query_pairs = query_pairs.clone(); - let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from)); + async move { + let url = + Url::parse_with_params(&build_uri(&endpoint, &key)?.to_string(), query_pairs)?; + let uri = url.to_string().parse()?; + let response = http_client.get(uri).await?; - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); + let body = hyper::body::to_bytes(response).await?; - body.and_then(move |ref body| { - if status == StatusCode::OK { - match serde_json::from_slice::(body) { - Ok(data) => Ok(Response { data, cluster_info }), - Err(error) => Err(Error::Serialization(error)), - } - } else { - match serde_json::from_slice::(body) { - Ok(error) => Err(Error::Api(error)), - Err(error) => Err(Error::Serialization(error)), - } + if status == StatusCode::OK { + match serde_json::from_slice::(&body) { + Ok(data) => Ok(Response { data, cluster_info }), + Err(error) => Err(Error::Serialization(error)), + } + } else { + match serde_json::from_slice::(&body) { + Ok(error) => Err(Error::Api(error)), + Err(error) => Err(Error::Serialization(error)), } - }) - }) + } + } }) + .await } /// Handles all set operations. -fn raw_set( - client: &Client, - key: &str, - options: SetOptions<'_>, -) -> Box, Error = Vec> + Send> +async fn raw_set(client: &Client, key: &str, options: SetOptions<'_>) -> Result where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { let mut http_options = vec![]; @@ -781,13 +753,18 @@ where } let prev_exist = match options.prev_exist { - Some(prev_exist) => prev_exist, - None => false, + Some(prev_exist) => Some(prev_exist), + None => { + if options.refresh { + Some(true) + } else { + None + } + } }; // If we are calling refresh, we should also ensure we are setting prevExist. - if prev_exist || options.refresh { - let prev_exist = prev_exist || options.refresh; + if let Some(prev_exist) = prev_exist { http_options.push(("prevExist".to_owned(), prev_exist.to_string())); } @@ -797,7 +774,7 @@ where if let Some(ref conditions) = options.conditions { if conditions.is_empty() { - return Box::new(Err(vec![Error::InvalidConditions]).into_future()); + return Err(vec![Error::InvalidConditions]); } if let Some(ref modified_index) = conditions.modified_index { @@ -813,45 +790,38 @@ where let key = key.to_string(); let create_in_order = options.create_in_order; - let result = first_ok(client.endpoints().to_vec(), move |endpoint| { - let mut serializer = Serializer::new(String::new()); - serializer.extend_pairs(http_options.clone()); - let body = serializer.finish(); - - let url = build_url(endpoint, &key); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - + first_ok(client.endpoints().to_vec(), move |endpoint| { let http_client = http_client.clone(); - - let response = uri.and_then(move |uri| { - if create_in_order { - http_client.post(uri, body).map_err(Error::from) + let key = key.clone(); + let mut ser = Serializer::new(String::new()); + ser.extend_pairs(http_options.clone()); + let body = ser.finish(); + + async move { + let uri = build_uri(&endpoint, &key)?; + let response = if create_in_order { + http_client.post(uri, body).await? } else { - http_client.put(uri, body).map_err(Error::from) - } - }); + http_client.put(uri, body).await? + }; - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); + let body = hyper::body::to_bytes(response).await?; - body.and_then(move |ref body| match status { + match status { StatusCode::CREATED | StatusCode::OK => { - match serde_json::from_slice::(body) { + match serde_json::from_slice::(&body) { Ok(data) => Ok(Response { data, cluster_info }), Err(error) => Err(Error::Serialization(error)), } } - _ => match serde_json::from_slice::(body) { + _ => match serde_json::from_slice::(&body) { Ok(error) => Err(Error::Api(error)), Err(error) => Err(Error::Serialization(error)), }, - }) - }) - }); - - Box::new(result) + } + } + }) + .await } diff --git a/src/lib.rs b/src/lib.rs index 68aedb6..e5cbe48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,37 +30,27 @@ //! ```no_run //! use etcd::Client; //! use etcd::kv::{self, Action}; -//! use futures::Future; -//! use tokio::runtime::Runtime; //! -//! fn main() { +//! async fn usage() { //! // Create a client to access a single cluster member. Addresses of multiple cluster //! // members can be provided and the client will try each one in sequence until it //! // receives a successful response. //! let client = Client::new(&["http://etcd.example.com:2379"], None).unwrap(); //! //! // Set the key "/foo" to the value "bar" with no expiration. -//! let work = kv::set(&client, "/foo", "bar", None).and_then(move |_| { -//! // Once the key has been set, ask for details about it. -//! let get_request = kv::get(&client, "/foo", kv::GetOptions::default()); +//! assert!(kv::set(&client, "/foo", "bar", None).await.is_ok()); +//! // Once the key has been set, ask for details about it. +//! let response = kv::get(&client, "/foo", kv::GetOptions::default()).await.unwrap(); +//! // The information returned tells you what kind of operation was performed. +//! assert_eq!(response.data.action, Action::Get); //! -//! get_request.and_then(|response| { -//! // The information returned tells you what kind of operation was performed. -//! assert_eq!(response.data.action, Action::Get); +//! // The value of the key is what we set it to previously. +//! assert_eq!(response.data.node.value, Some("bar".to_string())); //! -//! // The value of the key is what we set it to previously. -//! assert_eq!(response.data.node.value, Some("bar".to_string())); +//! // Each API call also returns information about the etcd cluster extracted from +//! // HTTP response headers. +//! assert!(response.cluster_info.etcd_index.is_some()); //! -//! // Each API call also returns information about the etcd cluster extracted from -//! // HTTP response headers. -//! assert!(response.cluster_info.etcd_index.is_some()); -//! -//! Ok(()) -//! }) -//! }); -//! -//! // Start the event loop, driving the asynchronous code to completion. -//! assert!(Runtime::new().unwrap().block_on(work).is_ok()); //! } //! ``` //! diff --git a/src/members.rs b/src/members.rs index d9ea5ba..a8d9137 100644 --- a/src/members.rs +++ b/src/members.rs @@ -2,17 +2,15 @@ //! //! These API endpoints are used to manage cluster membership. -use std::str::FromStr; - -use futures::{Future, IntoFuture, Stream}; use hyper::client::connect::Connect; use hyper::{StatusCode, Uri}; use serde_derive::{Deserialize, Serialize}; use serde_json; +use std::future::Future; use crate::client::{Client, ClusterInfo, Response}; use crate::error::{ApiError, Error}; -use crate::first_ok::first_ok; +use crate::first_ok::{first_ok, Result}; /// An etcd server that is a member of a cluster. #[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)] @@ -50,55 +48,44 @@ struct ListResponse { /// /// * client: A `Client` to use to make the API call. /// * peer_urls: URLs exposing this cluster member's peer API. -pub fn add( - client: &Client, - peer_urls: Vec, -) -> Box, Error = Vec>> +pub async fn add(client: &Client, peer_urls: Vec) -> Result<()> where - C: Clone + Connect, + C: Clone + Send + Sync + Connect + 'static, { let peer_urls = PeerUrls { peer_urls }; let body = match serde_json::to_string(&peer_urls) { Ok(body) => body, - Err(error) => return Box::new(Err(vec![Error::Serialization(error)]).into_future()), + Err(error) => return Err(vec![Error::Serialization(error)]), }; let http_client = client.http_client().clone(); - let result = first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, ""); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - - let body = body.clone(); + first_ok(client.endpoints().to_vec(), move |member| { let http_client = http_client.clone(); + let body = body.clone(); - let response = uri.and_then(move |uri| http_client.post(uri, body).map_err(Error::from)); - - response.and_then(|response| { + async move { + let uri = build_uri(&member, "")?; + let response = http_client.post(uri, body).await?; let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); - - body.and_then(move |ref body| { - if status == StatusCode::CREATED { - Ok(Response { - data: (), - cluster_info, - }) - } else { - match serde_json::from_slice::(body) { - Ok(error) => Err(Error::Api(error)), - Err(error) => Err(Error::Serialization(error)), - } + let body = hyper::body::to_bytes(response).await?; + + if status == StatusCode::CREATED { + Ok(Response { + data: (), + cluster_info, + }) + } else { + match serde_json::from_slice::(&body) { + Ok(error) => Err(Error::Api(error)), + Err(error) => Err(Error::Serialization(error)), } - }) - }) - }); - - Box::new(result) + } + } + }) + .await } /// Deletes a member from the cluster. @@ -107,44 +94,36 @@ where /// /// * client: A `Client` to use to make the API call. /// * id: The unique identifier of the member to delete. -pub fn delete( - client: &Client, - id: String, -) -> impl Future, Error = Vec> + Send +pub fn delete(client: &Client, id: String) -> impl Future> where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { let http_client = client.http_client().clone(); first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, &format!("/{}", id)); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - let http_client = http_client.clone(); + let id = id.clone(); - let response = uri.and_then(move |uri| http_client.delete(uri).map_err(Error::from)); + async move { + let uri = build_uri(&member, &format!("/{}", id))?; + let response = http_client.delete(uri).await?; - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); - - body.and_then(move |ref body| { - if status == StatusCode::NO_CONTENT { - Ok(Response { - data: (), - cluster_info, - }) - } else { - match serde_json::from_slice::(body) { - Ok(error) => Err(Error::Api(error)), - Err(error) => Err(Error::Serialization(error)), - } + let body = hyper::body::to_bytes(response).await?; + + if status == StatusCode::NO_CONTENT { + Ok(Response { + data: (), + cluster_info, + }) + } else { + match serde_json::from_slice::(&body) { + Ok(error) => Err(Error::Api(error)), + Err(error) => Err(Error::Serialization(error)), } - }) - }) + } + } }) } @@ -153,46 +132,38 @@ where /// # Parameters /// /// * client: A `Client` to use to make the API call. -pub fn list( - client: &Client, -) -> impl Future>, Error = Vec> + Send +pub fn list(client: &Client) -> impl Future>> where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { let http_client = client.http_client().clone(); first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, ""); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - let http_client = http_client.clone(); - let response = uri.and_then(move |uri| http_client.get(uri).map_err(Error::from)); + async move { + let uri = build_uri(&member, "")?; + let response = http_client.get(uri).await?; - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); + let body = hyper::body::to_bytes(response).await?; - body.and_then(move |ref body| { - if status == StatusCode::OK { - match serde_json::from_slice::(body) { - Ok(data) => Ok(Response { - data: data.members, - cluster_info, - }), - Err(error) => Err(Error::Serialization(error)), - } - } else { - match serde_json::from_slice::(body) { - Ok(error) => Err(Error::Api(error)), - Err(error) => Err(Error::Serialization(error)), - } + if status == StatusCode::OK { + match serde_json::from_slice::(&body) { + Ok(data) => Ok(Response { + data: data.members, + cluster_info, + }), + Err(error) => Err(Error::Serialization(error)), + } + } else { + match serde_json::from_slice::(&body) { + Ok(error) => Err(Error::Api(error)), + Err(error) => Err(Error::Serialization(error)), } - }) - }) + } + } }) } @@ -203,59 +174,49 @@ where /// * client: A `Client` to use to make the API call. /// * id: The unique identifier of the member to update. /// * peer_urls: URLs exposing this cluster member's peer API. -pub fn update( - client: &Client, - id: String, - peer_urls: Vec, -) -> Box, Error = Vec>> +pub async fn update(client: &Client, id: String, peer_urls: Vec) -> Result<()> where - C: Clone + Connect, + C: Clone + Send + Sync + Connect + 'static, { let peer_urls = PeerUrls { peer_urls }; let body = match serde_json::to_string(&peer_urls) { Ok(body) => body, - Err(error) => return Box::new(Err(vec![Error::Serialization(error)]).into_future()), + Err(error) => return Err(vec![Error::Serialization(error)]), }; let http_client = client.http_client().clone(); - let result = first_ok(client.endpoints().to_vec(), move |member| { - let url = build_url(member, &format!("/{}", id)); - let uri = Uri::from_str(url.as_str()) - .map_err(Error::from) - .into_future(); - + first_ok(client.endpoints().to_vec(), move |member| { let body = body.clone(); let http_client = http_client.clone(); + let id = id.clone(); - let response = uri.and_then(move |uri| http_client.put(uri, body).map_err(Error::from)); + async move { + let uri = build_uri(&member, &format!("/{}", id))?; + let response = http_client.put(uri, body).await?; - response.and_then(|response| { let status = response.status(); let cluster_info = ClusterInfo::from(response.headers()); - let body = response.into_body().concat2().map_err(Error::from); - - body.and_then(move |ref body| { - if status == StatusCode::NO_CONTENT { - Ok(Response { - data: (), - cluster_info, - }) - } else { - match serde_json::from_slice::(body) { - Ok(error) => Err(Error::Api(error)), - Err(error) => Err(Error::Serialization(error)), - } + let body = hyper::body::to_bytes(response).await?; + + if status == StatusCode::NO_CONTENT { + Ok(Response { + data: (), + cluster_info, + }) + } else { + match serde_json::from_slice::(&body) { + Ok(error) => Err(Error::Api(error)), + Err(error) => Err(Error::Serialization(error)), } - }) - }) - }); - - Box::new(result) + } + } + }) + .await } /// Constructs the full URL for an API call. -fn build_url(endpoint: &Uri, path: &str) -> String { - format!("{}v2/members{}", endpoint, path) +fn build_uri(endpoint: &Uri, path: &str) -> std::result::Result { + format!("{}v2/members{}", endpoint, path).parse() } diff --git a/src/stats.rs b/src/stats.rs index b739e93..bba02ee 100644 --- a/src/stats.rs +++ b/src/stats.rs @@ -2,8 +2,7 @@ use std::collections::HashMap; -use futures::stream::futures_unordered; -use futures::{Future, IntoFuture, Stream}; +use futures::stream::{self, Stream, StreamExt}; use hyper::client::connect::Connect; use hyper::Uri; use serde_derive::{Deserialize, Serialize}; @@ -157,58 +156,50 @@ pub struct StoreStats { /// Returns statistics about the leader member of a cluster. /// /// Fails if JSON decoding fails, which suggests a bug in our schema. -pub fn leader_stats( - client: &Client, -) -> impl Future, Error = Error> + Send +pub async fn leader_stats(client: &Client) -> Result, Error> where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { - let url = build_url(&client.endpoints()[0], "v2/stats/leader"); - let uri = url.parse().map_err(Error::from).into_future(); - - client.request(uri) + let uri = build_uri(&client.endpoints()[0], "v2/stats/leader")?; + client.request(uri).await } /// Returns statistics about each cluster member the client was initialized with. /// /// Fails if JSON decoding fails, which suggests a bug in our schema. -pub fn self_stats( - client: &Client, -) -> impl Stream, Error = Error> + Send +pub fn self_stats<'a, C>( + client: &'a Client, +) -> impl Stream, Error>> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { - let futures = client.endpoints().iter().map(|endpoint| { - let url = build_url(&endpoint, "v2/stats/self"); - let uri = url.parse().map_err(Error::from).into_future(); - - client.request(uri) - }); - - futures_unordered(futures) + stream::iter(client.endpoints().clone()) + .map(move |endpoint| async move { + let uri = build_uri(&endpoint, "v2/stats/self")?; + client.request(uri).await + }) + .buffer_unordered(client.endpoints().len()) } /// Returns statistics about operations handled by each etcd member the client was initialized /// with. /// /// Fails if JSON decoding fails, which suggests a bug in our schema. -pub fn store_stats( - client: &Client, -) -> impl Stream, Error = Error> + Send +pub fn store_stats<'a, C>( + client: &'a Client, +) -> impl Stream, Error>> + 'a where - C: Clone + Connect, + C: Clone + Connect + Send + Sync + 'static, { - let futures = client.endpoints().iter().map(|endpoint| { - let url = build_url(&endpoint, "v2/stats/store"); - let uri = url.parse().map_err(Error::from).into_future(); - - client.request(uri) - }); - - futures_unordered(futures) + stream::iter(client.endpoints().clone()) + .map(move |endpoint| async move { + let uri = build_uri(&endpoint, "v2/stats/store")?; + client.request(uri).await + }) + .buffer_unordered(client.endpoints().len()) } /// Constructs the full URL for an API call. -fn build_url(endpoint: &Uri, path: &str) -> String { - format!("{}{}", endpoint, path) +fn build_uri(endpoint: &Uri, path: &str) -> std::result::Result { + format!("{}{}", endpoint, path).parse() } diff --git a/tests/auth_test.rs b/tests/auth_test.rs index b191f83..57e7d6a 100644 --- a/tests/auth_test.rs +++ b/tests/auth_test.rs @@ -1,13 +1,9 @@ use etcd::auth::{self, AuthChange, NewUser, Role, RoleUpdate, UserUpdate}; use etcd::{BasicAuth, Client}; -use futures::future::Future; -use tokio::runtime::Runtime; -#[test] -fn auth() { +#[tokio::test] +async fn auth() { let client = Client::new(&["http://etcd:2379"], None).unwrap(); - let client_2 = client.clone(); - let client_3 = client.clone(); let basic_auth = BasicAuth { username: "root".into(), @@ -15,125 +11,63 @@ fn auth() { }; let authed_client = Client::new(&["http://etcd:2379"], Some(basic_auth)).unwrap(); - let authed_client_2 = authed_client.clone(); - let authed_client_3 = authed_client.clone(); - let authed_client_4 = authed_client.clone(); - let authed_client_5 = authed_client.clone(); - let authed_client_6 = authed_client.clone(); - let authed_client_7 = authed_client.clone(); - let authed_client_8 = authed_client.clone(); - let authed_client_9 = authed_client.clone(); let root_user = NewUser::new("root", "secret"); - let work: Box + Send> = Box::new( - auth::status(&client) - .then(move |res| { - let response = res.unwrap(); - - assert_eq!(response.data, false); - - auth::create_user(&client_2, root_user) - }) - .then(move |res| { - let response = res.unwrap(); - - assert_eq!(response.data.name(), "root"); - - auth::enable(&client_3) - }) - .then(move |res| { - let response = res.unwrap(); - - assert_eq!(response.data, AuthChange::Changed); - - let mut update_guest = RoleUpdate::new("guest"); - - update_guest.revoke_kv_write_permission("/*"); - - auth::update_role(&authed_client, update_guest) - }) - .then(move |res| { - res.unwrap(); - - let mut rkt_role = Role::new("rkt"); - - rkt_role.grant_kv_read_permission("/rkt/*"); - rkt_role.grant_kv_write_permission("/rkt/*"); - - auth::create_role(&authed_client_2, rkt_role) - }) - .then(move |res| { - res.unwrap(); - - let mut rkt_user = NewUser::new("rkt", "secret"); - - rkt_user.add_role("rkt"); - - auth::create_user(&authed_client_3, rkt_user) - }) - .then(move |res| { - let response = res.unwrap(); - - let rkt_user = response.data; - - assert_eq!(rkt_user.name(), "rkt"); - - let role_name = &rkt_user.role_names()[0]; - - assert_eq!(role_name, "rkt"); - - let mut update_rkt_user = UserUpdate::new("rkt"); - - update_rkt_user.update_password("secret2"); - update_rkt_user.grant_role("root"); - - auth::update_user(&authed_client_4, update_rkt_user) - }) - .then(move |res| { - res.unwrap(); - - auth::get_role(&authed_client_5, "rkt") - }) - .then(move |res| { - let response = res.unwrap(); - - let role = response.data; - - assert!(role.kv_read_permissions().contains(&"/rkt/*".to_owned())); - assert!(role.kv_write_permissions().contains(&"/rkt/*".to_owned())); - - auth::delete_user(&authed_client_6, "rkt") - }) - .then(move |res| { - res.unwrap(); - - auth::delete_role(&authed_client_7, "rkt") - }) - .then(move |res| { - res.unwrap(); - - let mut update_guest = RoleUpdate::new("guest"); - - update_guest.grant_kv_write_permission("/*"); - - auth::update_role(&authed_client_8, update_guest) - }) - .then(move |res| { - res.unwrap(); - - auth::disable(&authed_client_9) - }) - .then(|res| { - let response = res.unwrap(); - - assert_eq!(response.data, AuthChange::Changed); - - Ok(()) - }), - ); - - let _ = Runtime::new() - .expect("failed to create Tokio runtime") - .block_on(work); + let response = auth::status(&client).await.unwrap(); + assert_eq!(response.data, false); + + let response = auth::create_user(&client, root_user).await.unwrap(); + assert_eq!(response.data.name(), "root"); + + let response = auth::enable(&client).await.unwrap(); + assert_eq!(response.data, AuthChange::Changed); + + let mut update_guest = RoleUpdate::new("guest"); + update_guest.revoke_kv_write_permission("/*"); + auth::update_role(&authed_client, update_guest) + .await + .unwrap(); + + let mut rkt_role = Role::new("rkt"); + rkt_role.grant_kv_read_permission("/rkt/*"); + rkt_role.grant_kv_write_permission("/rkt/*"); + auth::create_role(&authed_client, rkt_role).await.unwrap(); + + let mut rkt_user = NewUser::new("rkt", "secret"); + rkt_user.add_role("rkt"); + let response = auth::create_user(&authed_client, rkt_user).await.unwrap(); + assert_eq!(response.data.name(), "rkt"); + let role_name = &response.data.role_names()[0]; + assert_eq!(role_name, "rkt"); + + let mut update_rkt_user = UserUpdate::new("rkt"); + update_rkt_user.update_password("secret2"); + update_rkt_user.grant_role("root"); + auth::update_user(&authed_client, update_rkt_user) + .await + .unwrap(); + + let response = auth::get_role(&authed_client, "rkt").await.unwrap(); + assert!(response + .data + .kv_read_permissions() + .contains(&"/rkt/*".to_owned())); + assert!(response + .data + .kv_write_permissions() + .contains(&"/rkt/*".to_owned())); + + auth::delete_user(&authed_client, "rkt").await.unwrap(); + + auth::delete_role(&authed_client, "rkt").await.unwrap(); + + let mut update_guest = RoleUpdate::new("guest"); + update_guest.grant_kv_write_permission("/*"); + auth::update_role(&authed_client, update_guest) + .await + .unwrap(); + + let response = auth::disable(&authed_client).await.unwrap(); + assert_eq!(response.data, AuthChange::Changed); } diff --git a/tests/client_test.rs b/tests/client_test.rs index e6d6d0d..679632d 100644 --- a/tests/client_test.rs +++ b/tests/client_test.rs @@ -1,35 +1,28 @@ -use futures::{Future, Stream}; +use futures::stream::StreamExt; use crate::test::TestClient; mod test; -#[test] -fn health() { - let mut client = TestClient::no_destructor(); +#[tokio::test] +async fn health() { + let client = TestClient::no_destructor(); + let mut health = client.health(); - let work = client.health().collect().and_then(|responses| { - for response in responses { - assert_eq!(response.data.health, "true"); - } - - Ok(()) - }); - - client.run(work); + while let Some(response) = health.next().await { + assert_eq!(response.unwrap().data.health, "true"); + } } -#[test] -fn versions() { - let mut client = TestClient::no_destructor(); - let work = client.versions().collect().and_then(|responses| { - for response in responses { - assert_eq!(response.data.cluster_version, "2.3.0"); - assert_eq!(response.data.server_version, "2.3.8"); - } +#[tokio::test] +async fn versions() { + let client = TestClient::no_destructor(); + let mut versions = client.versions(); - Ok(()) - }); + while let Some(response) = versions.next().await { + let response = response.unwrap(); - client.run(work); + assert_eq!(response.data.cluster_version, "2.3.0"); + assert_eq!(response.data.server_version, "2.3.8"); + } } diff --git a/tests/kv_test.rs b/tests/kv_test.rs index 05528ad..a0f01c1 100644 --- a/tests/kv_test.rs +++ b/tests/kv_test.rs @@ -1,721 +1,495 @@ -use std::thread::{sleep, spawn}; use std::time::Duration; use etcd::kv::{self, Action, GetOptions, KeyValueInfo, WatchError, WatchOptions}; use etcd::{Error, Response}; -use futures::future::{join_all, Future}; -use futures::sync::oneshot::channel; +use futures::future::try_join_all; +use tokio::task::spawn; +use tokio::time::delay_for; use crate::test::TestClient; mod test; -#[test] -fn create() { - let mut client = TestClient::new(); +#[tokio::test] +async fn create() { + let client = TestClient::new().await; - let work = kv::create(&client, "/test/foo", "bar", Some(60)).and_then(|res| { - let node = res.data.node; + let res = kv::create(&client, "/test/foo", "bar", Some(60)) + .await + .unwrap(); + let node = res.data.node; - assert_eq!(res.data.action, Action::Create); - assert_eq!(node.value.unwrap(), "bar"); - assert_eq!(node.ttl.unwrap(), 60); - - Ok(()) - }); - - client.run(work); -} - -#[test] -fn create_does_not_replace_existing_key() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::create(&client, "/test/foo", "bar", Some(60)).and_then(move |_| { - kv::create(&inner_client, "/test/foo", "bar", Some(60)).then(|result| { - match result { - Ok(_) => panic!("expected EtcdError due to pre-existing key"), - Err(errors) => { - for error in errors { - match error { - Error::Api(ref error) => { - assert_eq!(error.message, "Key already exists") - } - _ => panic!("expected EtcdError due to pre-existing key"), - } - } - } - } - - Ok(()) - }) - }); - - client.run(work); + assert_eq!(res.data.action, Action::Create); + assert_eq!(node.value.unwrap(), "bar"); + assert_eq!(node.ttl.unwrap(), 60); } -#[test] -fn create_in_order() { - let mut client = TestClient::new(); - let inner_client = client.clone(); +#[tokio::test] +async fn create_does_not_replace_existing_key() { + let client = TestClient::new().await; - let requests = - (1..4).map(move |_| Box::new(kv::create_in_order(&inner_client, "/test/foo", "bar", None))); + kv::create(&client, "/test/foo", "bar", Some(60)) + .await + .unwrap(); - let work = join_all(requests).and_then(|res: Vec>| { - let mut kvis: Vec = res.into_iter().map(|response| response.data).collect(); - kvis.sort_by_key(|ref kvi| kvi.node.modified_index); - - let keys: Vec = kvis.into_iter().map(|kvi| kvi.node.key.unwrap()).collect(); - - assert!(keys[0] < keys[1]); - assert!(keys[1] < keys[2]); - - Ok(()) - }); + let errors = kv::create(&client, "/test/foo", "bar", Some(60)) + .await + .expect_err("expected EtcdError due to pre-existing key"); - client.run(work); -} - -#[test] -fn create_in_order_must_operate_on_a_directory() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::create(&client, "/test/foo", "bar", None).and_then(move |_| { - kv::create_in_order(&inner_client, "/test/foo", "baz", None).then(|result| { - assert!(result.is_err()); - - Ok(()) - }) - }); - - client.run(work); + for error in errors { + match error { + Error::Api(ref error) => assert_eq!(error.message, "Key already exists"), + _ => panic!("expected EtcdError due to pre-existing key"), + } + } } -#[test] -fn compare_and_delete() { - let mut client = TestClient::new(); - let inner_client = client.clone(); +#[tokio::test] +async fn create_in_order() { + let client = TestClient::new().await; - let work = kv::create(&client, "/test/foo", "bar", None).and_then(move |res| { - let index = res.data.node.modified_index; + let requests = (1..4).map(|_| kv::create_in_order(&client, "/test/foo", "bar", None)); + let results: Vec> = try_join_all(requests).await.unwrap(); + let mut kvis: Vec = results.into_iter().map(|response| response.data).collect(); - kv::compare_and_delete(&inner_client, "/test/foo", Some("bar"), index).and_then(|res| { - assert_eq!(res.data.action, Action::CompareAndDelete); + kvis.sort_by_key(|ref kvi| kvi.node.modified_index); - Ok(()) - }) - }); + let keys: Vec = kvis.into_iter().map(|kvi| kvi.node.key.unwrap()).collect(); - client.run(work); + assert!(keys[0] < keys[1]); + assert!(keys[1] < keys[2]); } -#[test] -fn compare_and_delete_only_index() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::create(&client, "/test/foo", "bar", None).and_then(move |res| { - let index = res.data.node.modified_index; - - kv::compare_and_delete(&inner_client, "/test/foo", None, index).and_then(|res| { - assert_eq!(res.data.action, Action::CompareAndDelete); - - Ok(()) - }) - }); - - client.run(work); +#[tokio::test] +async fn create_in_order_must_operate_on_a_directory() { + let client = TestClient::new().await; + kv::create(&client, "/test/foo", "bar", None).await.unwrap(); + kv::create_in_order(&client, "/test/foo", "baz", None) + .await + .unwrap_err(); } -#[test] -fn compare_and_delete_only_value() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::create(&client, "/test/foo", "bar", None).and_then(move |_| { - kv::compare_and_delete(&inner_client, "/test/foo", Some("bar"), None).and_then(|res| { - assert_eq!(res.data.action, Action::CompareAndDelete); - - Ok(()) - }) - }); - - client.run(work); -} - -#[test] -fn compare_and_delete_requires_conditions() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::create(&client, "/test/foo", "bar", None).and_then(move |_| { - kv::compare_and_delete(&inner_client, "/test/foo", None, None).then(|result| match result { - Ok(_) => panic!("expected Error::InvalidConditions"), - Err(errors) => { - if errors.len() == 1 { - match errors[0] { - Error::InvalidConditions => Ok(()), - _ => panic!("expected Error::InvalidConditions"), - } - } else { - panic!("expected a single error: Error::InvalidConditions"); - } - } - }) - }); - - client.run(work); +#[tokio::test] +async fn compare_and_delete() { + let client = TestClient::new().await; + let res = kv::create(&client, "/test/foo", "bar", None).await.unwrap(); + let index = res.data.node.modified_index; + let res = kv::compare_and_delete(&client, "/test/foo", Some("bar"), index) + .await + .unwrap(); + assert_eq!(res.data.action, Action::CompareAndDelete); } -#[test] -fn test_compare_and_swap() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::create(&client, "/test/foo", "bar", None).and_then(move |res| { - let index = res.data.node.modified_index; - - kv::compare_and_swap( - &inner_client, - "/test/foo", - "baz", - Some(100), - Some("bar"), - index, - ) - .and_then(|res| { - assert_eq!(res.data.action, Action::CompareAndSwap); - - Ok(()) - }) - }); +#[tokio::test] +async fn compare_and_delete_only_index() { + let client = TestClient::new().await; - client.run(work); + let res = kv::create(&client, "/test/foo", "bar", None).await.unwrap(); + let index = res.data.node.modified_index; + let res = kv::compare_and_delete(&client, "/test/foo", None, index) + .await + .unwrap(); + assert_eq!(res.data.action, Action::CompareAndDelete); } -#[test] -fn compare_and_swap_only_index() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::create(&client, "/test/foo", "bar", None).and_then(move |res| { - let index = res.data.node.modified_index; +#[tokio::test] +async fn compare_and_delete_only_value() { + let client = TestClient::new().await; - kv::compare_and_swap(&inner_client, "/test/foo", "baz", None, None, index).and_then(|res| { - assert_eq!(res.data.action, Action::CompareAndSwap); - - Ok(()) - }) - }); - - client.run(work); + kv::create(&client, "/test/foo", "bar", None).await.unwrap(); + let res = kv::compare_and_delete(&client, "/test/foo", Some("bar"), None) + .await + .unwrap(); + assert_eq!(res.data.action, Action::CompareAndDelete); } -#[test] -fn compare_and_swap() { - let mut client = TestClient::new(); - let inner_client = client.clone(); +#[tokio::test] +async fn compare_and_delete_requires_conditions() { + let client = TestClient::new().await; - let work = kv::create(&client, "/test/foo", "bar", None).and_then(move |_| { - kv::compare_and_swap(&inner_client, "/test/foo", "baz", None, Some("bar"), None).and_then( - |res| { - assert_eq!(res.data.action, Action::CompareAndSwap); - - Ok(()) - }, - ) - }); + kv::create(&client, "/test/foo", "bar", None).await.unwrap(); + let errors = kv::compare_and_delete(&client, "/test/foo", None, None) + .await + .expect_err("expected Error::InvalidConditions"); - client.run(work); -} - -#[test] -fn compare_and_swap_requires_conditions() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::create(&client, "/test/foo", "bar", None).and_then(move |_| { - kv::compare_and_swap(&inner_client, "/test/foo", "baz", None, None, None).then(|result| { - match result { - Ok(_) => panic!("expected Error::InvalidConditions"), - Err(errors) => { - if errors.len() == 1 { - match errors[0] { - Error::InvalidConditions => Ok(()), - _ => panic!("expected Error::InvalidConditions"), - } - } else { - panic!("expected a single error: Error::InvalidConditions"); - } - } - } - }) - }); - - client.run(work); -} + if errors.len() == 1 { + match errors[0] { + Error::InvalidConditions => {} + _ => panic!("expected Error::InvalidConditions"), + } + } else { + panic!("expected a single error: Error::InvalidConditions"); + } +} + +#[tokio::test] +async fn test_compare_and_swap() { + let client = TestClient::new().await; + let res = kv::create(&client, "/test/foo", "bar", None).await.unwrap(); + let index = res.data.node.modified_index; + + let res = kv::compare_and_swap(&client, "/test/foo", "baz", Some(100), Some("bar"), index) + .await + .unwrap(); + assert_eq!(res.data.action, Action::CompareAndSwap); +} + +#[tokio::test] +async fn test_compare_and_swap_only_index() { + let client = TestClient::new().await; + let res = kv::create(&client, "/test/foo", "bar", None).await.unwrap(); + let index = res.data.node.modified_index; + + let res = kv::compare_and_swap(&client, "/test/foo", "baz", None, None, index) + .await + .unwrap(); + assert_eq!(res.data.action, Action::CompareAndSwap); +} + +#[tokio::test] +async fn test_compare_and_swap_only_value() { + let client = TestClient::new().await; + kv::create(&client, "/test/foo", "bar", None).await.unwrap(); + let res = kv::compare_and_swap(&client, "/test/foo", "baz", None, Some("bar"), None) + .await + .unwrap(); + assert_eq!(res.data.action, Action::CompareAndSwap); +} + +#[tokio::test] +async fn compare_and_swap_requires_conditions() { + let client = TestClient::new().await; + kv::create(&client, "/test/foo", "bar", None).await.unwrap(); + let errors = kv::compare_and_swap(&client, "/test/foo", "baz", None, None, None) + .await + .expect_err("expected Error::InvalidConditions"); + + if errors.len() == 1 { + match errors[0] { + Error::InvalidConditions => {} + _ => panic!("expected Error::InvalidConditions"), + } + } else { + panic!("expected a single error: Error::InvalidConditions"); + } +} + +#[tokio::test] +async fn get() { + let client = TestClient::new().await; + kv::create(&client, "/test/foo", "bar", Some(60)) + .await + .unwrap(); + let res = kv::get(&client, "/test/foo", GetOptions::default()) + .await + .unwrap(); + assert_eq!(res.data.action, Action::Get); + + let node = res.data.node; + + assert_eq!(node.value.unwrap(), "bar"); + assert_eq!(node.ttl.unwrap(), 60); +} + +#[tokio::test] +async fn get_non_recursive() { + let client = TestClient::new().await; + kv::set(&client, "/test/dir/baz", "blah", None) + .await + .unwrap(); + kv::set(&client, "/test/foo", "bar", None).await.unwrap(); + let res = kv::get( + &client, + "/test", + GetOptions { + sort: true, + ..Default::default() + }, + ) + .await + .unwrap(); + let node = res.data.node; -#[test] -fn get() { - let mut client = TestClient::new(); - let inner_client = client.clone(); + assert_eq!(node.dir.unwrap(), true); - let work = kv::create(&client, "/test/foo", "bar", Some(60)).and_then(move |_| { - kv::get(&inner_client, "/test/foo", GetOptions::default()).and_then(|res| { - assert_eq!(res.data.action, Action::Get); + let nodes = node.nodes.unwrap(); - let node = res.data.node; + assert_eq!(nodes[0].clone().key.unwrap(), "/test/dir"); + assert_eq!(nodes[0].clone().dir.unwrap(), true); + assert_eq!(nodes[1].clone().key.unwrap(), "/test/foo"); + assert_eq!(nodes[1].clone().value.unwrap(), "bar"); +} - assert_eq!(node.value.unwrap(), "bar"); - assert_eq!(node.ttl.unwrap(), 60); +#[tokio::test] +async fn get_recursive() { + let client = TestClient::new().await; - Ok(()) - }) - }); + kv::set(&client, "/test/dir/baz", "blah", None) + .await + .unwrap(); - client.run(work); -} - -#[test] -fn get_non_recursive() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = join_all(vec![ - kv::set(&client, "/test/dir/baz", "blah", None), - kv::set(&client, "/test/foo", "bar", None), - ]) - .and_then(move |_| { - kv::get( - &inner_client, - "/test", - GetOptions { - sort: true, - ..Default::default() - }, - ) - .and_then(|res| { - let node = res.data.node; - - assert_eq!(node.dir.unwrap(), true); - - let nodes = node.nodes.unwrap(); - - assert_eq!(nodes[0].clone().key.unwrap(), "/test/dir"); - assert_eq!(nodes[0].clone().dir.unwrap(), true); - assert_eq!(nodes[1].clone().key.unwrap(), "/test/foo"); - assert_eq!(nodes[1].clone().value.unwrap(), "bar"); - - Ok(()) - }) - }); - - client.run(work); -} - -#[test] -fn get_recursive() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::set(&client, "/test/dir/baz", "blah", None).and_then(move |_| { - kv::get( - &inner_client, - "/test", - GetOptions { - recursive: true, - sort: true, - ..Default::default() - }, - ) - .and_then(|res| { - let nodes = res.data.node.nodes.unwrap(); - - assert_eq!( - nodes[0].clone().nodes.unwrap()[0].clone().value.unwrap(), - "blah" - ); - - Ok(()) - }) - }); + let res = kv::get( + &client, + "/test", + GetOptions { + recursive: true, + sort: true, + ..Default::default() + }, + ) + .await + .unwrap(); + let nodes = res.data.node.nodes.unwrap(); - client.run(work); + assert_eq!( + nodes[0].clone().nodes.unwrap()[0].clone().value.unwrap(), + "blah" + ); } -#[test] -fn get_root() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::create(&client, "/test/foo", "bar", Some(60)).and_then(move |_| { - kv::get(&inner_client, "/", GetOptions::default()).and_then(|res| { - assert_eq!(res.data.action, Action::Get); - - let node = res.data.node; +#[tokio::test] +async fn get_root() { + let client = TestClient::new().await; - assert!(node.created_index.is_none()); - assert!(node.modified_index.is_none()); - assert_eq!(node.nodes.unwrap().len(), 1); - assert_eq!(node.dir.unwrap(), true); + kv::create(&client, "/test/foo", "bar", Some(60)) + .await + .unwrap(); + let res = kv::get(&client, "/", GetOptions::default()).await.unwrap(); + assert_eq!(res.data.action, Action::Get); - Ok(()) - }) - }); + let node = res.data.node; - client.run(work); + assert!(node.created_index.is_none()); + assert!(node.modified_index.is_none()); + assert_eq!(node.nodes.unwrap().len(), 1); + assert_eq!(node.dir.unwrap(), true); } -#[test] -fn https() { - let mut client = TestClient::https(true); - - let work = kv::set(&client, "/test/foo", "bar", Some(60)); - - client.run(work); +#[tokio::test] +async fn https() { + let client = TestClient::https(true); + kv::set(&client, "/test/foo", "bar", Some(60)) + .await + .unwrap(); } -#[test] -fn https_without_valid_client_certificate() { - let mut client = TestClient::https(false); - - let work: Box + Send> = - Box::new(kv::set(&client, "/test/foo", "bar", Some(60)).then(|res| { - assert!(res.is_err()); +#[tokio::test] +async fn https_without_valid_client_certificate() { + let client = TestClient::https(false); - Ok(()) - })); - - client.run(work); + kv::set(&client, "/test/foo", "bar", Some(60)) + .await + .unwrap_err(); } -#[test] -fn set() { - let mut client = TestClient::new(); - - let work = kv::set(&client, "/test/foo", "baz", None).and_then(|res| { - assert_eq!(res.data.action, Action::Set); +#[tokio::test] +async fn set() { + let client = TestClient::new().await; - let node = res.data.node; + let res = kv::set(&client, "/test/foo", "baz", None).await.unwrap(); + assert_eq!(res.data.action, Action::Set); - assert_eq!(node.value.unwrap(), "baz"); - assert!(node.ttl.is_none()); + let node = res.data.node; - Ok(()) - }); - - client.run(work); + assert_eq!(node.value.unwrap(), "baz"); + assert!(node.ttl.is_none()); } -#[test] -fn set_and_refresh() { - let mut client = TestClient::new(); +#[tokio::test] +async fn set_and_refresh() { + let client = TestClient::new().await; - let work = kv::set(&client, "/test/foo", "baz", Some(30)).and_then(|res| { - assert_eq!(res.data.action, Action::Set); + let res = kv::set(&client, "/test/foo", "baz", Some(30)) + .await + .unwrap(); + assert_eq!(res.data.action, Action::Set); - let node = res.data.node; + let node = res.data.node; - assert_eq!(node.value.unwrap(), "baz"); - assert!(node.ttl.is_some()); + assert_eq!(node.value.unwrap(), "baz"); + assert!(node.ttl.is_some()); - Ok(()) - }); + let res = kv::refresh(&client, "/test/foo", 30).await.unwrap(); + assert_eq!(res.data.action, Action::Update); - client.run(work); + let node = res.data.node; - let work = kv::refresh(&client, "/test/foo", 30).and_then(|res| { - assert_eq!(res.data.action, Action::Update); - - let node = res.data.node; - - assert_eq!(node.value.unwrap(), "baz"); - assert!(node.ttl.is_some()); - - Ok(()) - }); - - client.run(work); + assert_eq!(node.value.unwrap(), "baz"); + assert!(node.ttl.is_some()); } -#[test] -fn set_dir() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::set_dir(&client, "/test", None).and_then(move |_| { - kv::set_dir(&inner_client, "/test", None) - .then(|result| match result { - Ok(_) => panic!("set_dir should fail on an existing dir"), - Err(_) => Ok(()), - }) - .and_then(move |_| { - kv::set(&inner_client, "/test/foo", "bar", None) - .and_then(move |_| kv::set_dir(&inner_client, "/test/foo", None)) - }) - }); - - client.run(work); +#[tokio::test] +async fn set_dir() { + let client = TestClient::new().await; + kv::set_dir(&client, "/test", None).await.unwrap(); + kv::set_dir(&client, "/test", None) + .await + .expect_err("set_dir should fail on an existing dir"); + kv::set(&client, "/test/foo", "bar", None).await.unwrap(); + kv::set_dir(&client, "/test/foo", None).await.unwrap(); } -#[test] -fn update() { - let mut client = TestClient::new(); - let inner_client = client.clone(); +#[tokio::test] +async fn update() { + let client = TestClient::new().await; + kv::create(&client, "/test/foo", "bar", None).await.unwrap(); - let work = kv::create(&client, "/test/foo", "bar", None).and_then(move |_| { - kv::update(&inner_client, "/test/foo", "blah", Some(30)).and_then(|res| { - assert_eq!(res.data.action, Action::Update); + let res = kv::update(&client, "/test/foo", "blah", Some(30)) + .await + .unwrap(); + assert_eq!(res.data.action, Action::Update); - let node = res.data.node; + let node = res.data.node; - assert_eq!(node.value.unwrap(), "blah"); - assert_eq!(node.ttl.unwrap(), 30); - - Ok(()) - }) - }); - - client.run(work); + assert_eq!(node.value.unwrap(), "blah"); + assert_eq!(node.ttl.unwrap(), 30); } -#[test] -fn update_requires_existing_key() { - let mut client = TestClient::no_destructor(); +#[tokio::test] +async fn update_requires_existing_key() { + let client = TestClient::new().await; - let work = kv::update(&client, "/test/foo", "bar", None).then(|result| { - match result { - Err(ref errors) => match errors[0] { - Error::Api(ref error) => assert_eq!(error.message, "Key not found"), - _ => panic!("expected EtcdError due to missing key"), - }, - _ => panic!("expected EtcdError due to missing key"), - } + let errors = kv::update(&client, "/test/foo", "bar", None) + .await + .expect_err("expected EtcdError due to missing key"); - let result: Result<(), ()> = Ok(()); - - result - }); - - client.run(work); + match errors[0] { + Error::Api(ref error) => assert_eq!(error.message, "Key not found"), + _ => panic!("expected EtcdError due to missing key"), + } } -#[test] -fn update_dir() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::create_dir(&client, "/test", None).and_then(move |_| { - kv::update_dir(&inner_client, "/test", Some(60)).and_then(|res| { - assert_eq!(res.data.node.ttl.unwrap(), 60); +#[tokio::test] +async fn update_dir() { + let client = TestClient::new().await; - Ok(()) - }) - }); - - client.run(work); + kv::create_dir(&client, "/test", None).await.unwrap(); + let res = kv::update_dir(&client, "/test", Some(60)).await.unwrap(); + assert_eq!(res.data.node.ttl.unwrap(), 60); } -#[test] -fn update_dir_replaces_key() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::set(&client, "/test/foo", "bar", None).and_then(move |_| { - kv::update_dir(&inner_client, "/test/foo", Some(60)).and_then(|res| { - let node = res.data.node; - - assert_eq!(node.value.unwrap(), ""); - assert_eq!(node.ttl.unwrap(), 60); +#[tokio::test] +async fn update_dir_replaces_key() { + let client = TestClient::new().await; - Ok(()) - }) - }); + kv::set(&client, "/test/foo", "bar", None).await.unwrap(); + let res = kv::update_dir(&client, "/test/foo", Some(60)) + .await + .unwrap(); + let node = res.data.node; - client.run(work); + assert_eq!(node.value.unwrap(), ""); + assert_eq!(node.ttl.unwrap(), 60); } -#[test] -fn update_dir_requires_existing_dir() { - let mut client = TestClient::no_destructor(); - - let work: Box + Send> = - Box::new(kv::update_dir(&client, "/test", None).then(|res| { - assert!(res.is_err()); - - Ok(()) - })); - - client.run(work); +#[tokio::test] +async fn update_dir_requires_existing_dir() { + let client = TestClient::new().await; + kv::update_dir(&client, "/test", None).await.unwrap_err(); } -#[test] -fn delete() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::create(&client, "/test/foo", "bar", None).and_then(move |_| { - kv::delete(&inner_client, "/test/foo", false).and_then(|res| { - assert_eq!(res.data.action, Action::Delete); - - Ok(()) - }) - }); - - client.run(work); +#[tokio::test] +async fn delete() { + let client = TestClient::new().await; + kv::create(&client, "/test/foo", "bar", None).await.unwrap(); + let res = kv::delete(&client, "/test/foo", false).await.unwrap(); + assert_eq!(res.data.action, Action::Delete); } -#[test] -fn create_dir() { - let mut client = TestClient::new(); +#[tokio::test] +async fn create_dir() { + let client = TestClient::new().await; - let work = kv::create_dir(&client, "/test/dir", None).and_then(|res| { - assert_eq!(res.data.action, Action::Create); + let res = kv::create_dir(&client, "/test/dir", None).await.unwrap(); + assert_eq!(res.data.action, Action::Create); - let node = res.data.node; + let node = res.data.node; - assert!(node.dir.is_some()); - assert!(node.value.is_none()); - - Ok(()) - }); - - client.run(work); + assert!(node.dir.is_some()); + assert!(node.value.is_none()); } -#[test] -fn delete_dir() { - let mut client = TestClient::new(); - let inner_client = client.clone(); +#[tokio::test] +async fn delete_dir() { + let client = TestClient::new().await; - let work = kv::create_dir(&client, "/test/dir", None).and_then(move |_| { - kv::delete_dir(&inner_client, "/test/dir").and_then(|res| { - assert_eq!(res.data.action, Action::Delete); - - Ok(()) - }) - }); - - client.run(work); + kv::create_dir(&client, "/test/dir", None).await.unwrap(); + let res = kv::delete_dir(&client, "/test/dir").await.unwrap(); + assert_eq!(res.data.action, Action::Delete); } -#[test] -fn watch() { - let (tx, rx) = channel(); - - let child = spawn(move || { - let mut client = TestClient::no_destructor(); - let inner_client = client.clone(); - - let work = rx.then(move |_| kv::set(&inner_client, "/test/foo", "baz", None)); - - client.run(work); +#[tokio::test] +async fn watch() { + let client = TestClient::new().await; + kv::create(&client, "/test/foo", "bar", None).await.unwrap(); + let child = spawn(async { + let client = TestClient::no_destructor(); + kv::set(&client, "/test/foo", "baz", None).await.unwrap(); }); - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::create(&client, "/test/foo", "bar", None) - .map_err(|errors| WatchError::Other(errors)) - .and_then(move |_| { - tx.send(()).unwrap(); - - kv::watch(&inner_client, "/test/foo", WatchOptions::default()).and_then(|res| { - assert_eq!(res.data.node.value.unwrap(), "baz"); - - Ok(()) - }) - }); - - client.run(work); - - child.join().ok().unwrap(); -} - -#[test] -fn watch_cancel() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work: Box + Send> = Box::new( - kv::create(&client, "/test/foo", "bar", None) - .map_err(|errors| WatchError::Other(errors)) - .and_then(move |_| { - kv::watch( - &inner_client, - "/test/foo", - WatchOptions { - timeout: Some(Duration::from_millis(1)), - ..Default::default() - }, - ) - }) - .then(|res| match res { - Ok(_) => panic!("expected WatchError::Timeout"), - Err(WatchError::Timeout) => Ok(()), - Err(_) => panic!("expected WatchError::Timeout"), - }), - ); - - client.run(work); + let res = kv::watch(&client, "/test/foo", WatchOptions::default()) + .await + .unwrap(); + assert_eq!(res.data.node.value.unwrap(), "baz"); + child.await.unwrap() } -#[test] -fn watch_index() { - let mut client = TestClient::new(); - let inner_client = client.clone(); - - let work = kv::set(&client, "/test/foo", "bar", None) - .map_err(|errors| WatchError::Other(errors)) - .and_then(move |res| { - let index = res.data.node.modified_index; - - kv::watch( - &inner_client, - "/test/foo", - WatchOptions { - index: index, - ..Default::default() - }, - ) - .and_then(move |res| { - let node = res.data.node; +#[tokio::test] +async fn watch_cancel() { + let client = TestClient::new().await; + kv::create(&client, "/test/foo", "bar", None).await.unwrap(); - assert_eq!(node.modified_index, index); - assert_eq!(node.value.unwrap(), "bar"); - - Ok(()) - }) - }); - - client.run(work); + let err = kv::watch( + &client, + "/test/foo", + WatchOptions { + timeout: Some(Duration::from_millis(1)), + ..Default::default() + }, + ) + .await + .unwrap_err(); + match err { + WatchError::Timeout => {} + _ => panic!("unexpected error"), + } } -#[test] -fn watch_recursive() { - let (tx, rx) = channel(); - - let child = spawn(move || { - let mut client = TestClient::no_destructor(); - let inner_client = client.clone(); +#[tokio::test] +async fn watch_index() { + let client = TestClient::new().await; + let res = kv::set(&client, "/test/foo", "bar", None).await.unwrap(); - let work = rx.then(move |_| { - let duration = Duration::from_millis(100); - sleep(duration); - kv::set(&inner_client, "/test/foo/bar", "baz", None) - }); + let index = res.data.node.modified_index; - client.run(work); + let res = kv::watch( + &client, + "/test/foo", + WatchOptions { + index, + ..Default::default() + }, + ) + .await + .unwrap(); + let node = res.data.node; + + assert_eq!(node.modified_index, index); + assert_eq!(node.value.unwrap(), "bar"); +} + +#[tokio::test] +async fn watch_recursive() { + let child = spawn(async { + let client = TestClient::no_destructor(); + delay_for(Duration::from_millis(100)).await; + kv::set(&client, "/test/foo/bar", "baz", None) + .await + .unwrap() }); - let mut client = TestClient::new(); - - tx.send(()).unwrap(); - - let work = kv::watch( + let client = TestClient::new().await; + let res = kv::watch( &client, "/test", WatchOptions { @@ -724,16 +498,11 @@ fn watch_recursive() { ..Default::default() }, ) - .and_then(|res| { - let node = res.data.node; - - assert_eq!(node.key.unwrap(), "/test/foo/bar"); - assert_eq!(node.value.unwrap(), "baz"); - - Ok(()) - }); - - client.run(work); + .await + .unwrap(); + let node = res.data.node; - child.join().ok().unwrap(); + assert_eq!(node.key.unwrap(), "/test/foo/bar"); + assert_eq!(node.value.unwrap(), "baz"); + child.await.unwrap(); } diff --git a/tests/members_test.rs b/tests/members_test.rs index 61f36c4..8012f38 100644 --- a/tests/members_test.rs +++ b/tests/members_test.rs @@ -1,22 +1,14 @@ use etcd::members; -use futures::future::Future; use crate::test::TestClient; mod test; -#[test] -fn list() { - let mut client = TestClient::no_destructor(); +#[tokio::test] +async fn list() { + let client = TestClient::no_destructor(); + let members = members::list(&client).await.unwrap().data; + let member = &members[0]; - let work = members::list(&client).and_then(|res| { - let members = res.data; - let member = &members[0]; - - assert_eq!(member.name, "default"); - - Ok(()) - }); - - client.run(work); + assert_eq!(member.name, "default"); } diff --git a/tests/stats_test.rs b/tests/stats_test.rs index df898bf..4f35cd9 100644 --- a/tests/stats_test.rs +++ b/tests/stats_test.rs @@ -1,33 +1,32 @@ use etcd::stats; -use futures::{Future, Stream}; use crate::test::TestClient; +use futures::stream::StreamExt; mod test; -#[test] -fn leader_stats() { - let mut client = TestClient::no_destructor(); - - let work = stats::leader_stats(&client); - - client.run(work); +#[tokio::test] +async fn leader_stats() { + let client = TestClient::no_destructor(); + stats::leader_stats(&client).await.unwrap(); } -#[test] -fn self_stats() { - let mut client = TestClient::no_destructor(); +#[tokio::test] +async fn self_stats() { + let client = TestClient::no_destructor(); + let mut stats = stats::self_stats(&client); - let work = stats::self_stats(&client).collect().and_then(|_| Ok(())); - - client.run(work); + while let Some(s) = stats.next().await { + s.unwrap(); + } } -#[test] -fn store_stats() { - let mut client = TestClient::no_destructor(); - - let work = stats::store_stats(&client).collect().and_then(|_| Ok(())); +#[tokio::test] +async fn store_stats() { + let client = TestClient::no_destructor(); + let mut stats = stats::store_stats(&client); - client.run(work); + while let Some(s) = stats.next().await { + s.unwrap(); + } } diff --git a/tests/test.rs b/tests/test.rs index 30b350f..42fd554 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -3,32 +3,31 @@ use std::io::Read; use std::ops::Deref; use etcd::{kv, Client}; -use futures::Future; use hyper::client::connect::Connect; use hyper::client::{Client as Hyper, HttpConnector}; use hyper_tls::HttpsConnector; use native_tls::{Certificate, Identity, TlsConnector}; -use tokio::runtime::Runtime; /// Wrapper around Client that automatically cleans up etcd after each test. pub struct TestClient where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, { c: Client, run_destructor: bool, - runtime: Runtime, } impl TestClient { /// Creates a new client for a test. #[allow(dead_code)] - pub fn new() -> TestClient { - TestClient { + pub async fn new() -> TestClient { + let tc = TestClient { c: Client::new(&["http://etcd:2379"], None).unwrap(), run_destructor: true, - runtime: Runtime::new().expect("failed to create Tokio runtime"), - } + }; + + kv::delete(&tc.c, "/test", true).await.ok(); + tc } /// Creates a new client for a test that will not clean up the key space afterwards. @@ -37,7 +36,6 @@ impl TestClient { TestClient { c: Client::new(&["http://etcd:2379"], None).unwrap(), run_destructor: false, - runtime: Runtime::new().expect("failed to create Tokio runtime"), } } @@ -61,53 +59,31 @@ impl TestClient { let tls_connector = builder.build().unwrap(); - let mut http_connector = HttpConnector::new(1); + let mut http_connector = HttpConnector::new(); http_connector.enforce_http(false); - let https_connector = HttpsConnector::from((http_connector, tls_connector)); + let https_connector = HttpsConnector::from((http_connector, tls_connector.into())); let hyper = Hyper::builder().build(https_connector); TestClient { c: Client::custom(hyper, &["https://etcdsecure:2379"], None).unwrap(), run_destructor: true, - runtime: Runtime::new().expect("failed to create Tokio runtime"), } } } -impl TestClient -where - C: Clone + Connect + Sync + 'static, -{ - #[allow(dead_code)] - pub fn run(&mut self, future: F) - where - F: Future + Send + 'static, - O: Send + 'static, - E: Send + 'static, - { - let _ = self.runtime.block_on(future.map(|_| ()).map_err(|_| ())); - } -} - impl Drop for TestClient where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, { fn drop(&mut self) { - if self.run_destructor { - let future = kv::delete(&self.c, "/test", true) - .map(|_| ()) - .map_err(|_| ()); - - let _ = self.runtime.block_on(future); - } + if self.run_destructor {} } } impl Deref for TestClient where - C: Clone + Connect + Sync + 'static, + C: Clone + Connect + Sync + Send + 'static, { type Target = Client;