From 3222b2c228388cc9766f5d1abba9879c4a74fcec Mon Sep 17 00:00:00 2001 From: EAimTY Date: Mon, 2 Oct 2023 21:05:01 +0900 Subject: [PATCH 01/10] Correct descriptions in `MemberAddResponse` (#66) --- src/rpc/cluster.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rpc/cluster.rs b/src/rpc/cluster.rs index a77cc1b..2cd2c09 100644 --- a/src/rpc/cluster.rs +++ b/src/rpc/cluster.rs @@ -159,7 +159,7 @@ impl MemberAddResponse { self.0.header.as_ref().map(From::from) } - /// Get Member List + /// Get the member information of the added member. #[inline] pub fn member(&self) -> Option<&Member> { self.0.member.as_ref().map(From::from) @@ -171,7 +171,7 @@ impl MemberAddResponse { self.0.header.take().map(ResponseHeader::new) } - /// Get Member List + /// Get the member list after adding the new member. #[inline] pub fn member_list(&self) -> &[Member] { unsafe { &*(self.0.members.as_slice() as *const _ as *const [Member]) } From 290dff8471725f46f75842af48687c5b87ac1f99 Mon Sep 17 00:00:00 2001 From: harscoet Date: Sun, 19 Nov 2023 05:08:49 +0100 Subject: [PATCH 02/10] tonic options max_decoding_message_size/max_encoding_message_size (#69) --- src/rpc/kv.rs | 16 ++++++++++++++++ src/rpc/watch.rs | 8 ++++++++ 2 files changed, 24 insertions(+) diff --git a/src/rpc/kv.rs b/src/rpc/kv.rs index 1ac8f08..56f488c 100644 --- a/src/rpc/kv.rs +++ b/src/rpc/kv.rs @@ -38,6 +38,22 @@ impl KvClient { Self { inner } } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// Puts the given key into the key-value store. /// A put request increments the revision of the key-value store /// and generates one event in the event history. diff --git a/src/rpc/watch.rs b/src/rpc/watch.rs index 4900c33..c90a789 100644 --- a/src/rpc/watch.rs +++ b/src/rpc/watch.rs @@ -36,6 +36,14 @@ impl WatchClient { Self { inner } } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Watches for events happening or that have happened. Both input and output /// are streams; the input stream is for creating and canceling watchers and the output /// stream sends events. One watch RPC can watch on multiple key ranges, streaming events From 6291b79268eaf559e90d0313ac669ea0d0e1e595 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 20 Nov 2023 09:07:46 +0800 Subject: [PATCH 03/10] Implement take_prev_kvs for DeleteResponse (#68) * Implement take_prev_kvs for DeleteResponse Signed-off-by: tison --- src/rpc/kv.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/rpc/kv.rs b/src/rpc/kv.rs index 56f488c..a1a2913 100644 --- a/src/rpc/kv.rs +++ b/src/rpc/kv.rs @@ -584,6 +584,12 @@ impl DeleteResponse { pub fn prev_kvs(&self) -> &[KeyValue] { unsafe { &*(self.0.prev_kvs.as_slice() as *const _ as *const [KeyValue]) } } + + /// If `prev_kvs` is set in the request, take the previous key-value pairs, leaving an empty vector in its place. + #[inline] + pub fn take_prev_kvs(&mut self) -> Vec { + unsafe { std::mem::transmute(std::mem::take(&mut self.0.prev_kvs)) } + } } /// Options for `Compact` operation. From 97156aaa2fb026bd64aea2bbb3f7db00b443d588 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 20 Nov 2023 21:37:20 +0800 Subject: [PATCH 04/10] Release v0.12.2 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 95589b8..089d2f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "etcd-client" -version = "0.12.1" +version = "0.12.2" authors = ["The etcd-client Authors "] edition = "2021" readme = "README.md" From 5c66a903df61cc13b293a5490ac12ff89aaab2df Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 21 Nov 2023 21:07:17 +0800 Subject: [PATCH 05/10] Add GetResponse::take_kvs() & KeyValue::into_key_value() (#70) Signed-off-by: tison --- src/rpc/kv.rs | 6 ++++++ src/rpc/mod.rs | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/src/rpc/kv.rs b/src/rpc/kv.rs index a1a2913..1298b16 100644 --- a/src/rpc/kv.rs +++ b/src/rpc/kv.rs @@ -453,6 +453,12 @@ impl GetResponse { unsafe { &*(self.0.kvs.as_slice() as *const _ as *const [KeyValue]) } } + /// If `kvs` is set in the request, take the key-value pairs, leaving an empty vector in its place. + #[inline] + pub fn take_kvs(&mut self) -> Vec { + unsafe { std::mem::transmute(std::mem::take(&mut self.0.kvs)) } + } + /// Indicates if there are more keys to return in the requested range. #[inline] pub const fn more(&self) -> bool { diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 2f801f5..92ecbe0 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -126,6 +126,11 @@ impl KeyValue { std::str::from_utf8_unchecked(self.value()) } + /// Convert to key-value pair. + pub fn into_key_value(self) -> (Vec, Vec) { + (self.0.key, self.0.value) + } + /// The revision of last creation on this key. #[inline] pub const fn create_revision(&self) -> i64 { From f5d427b0e0653fac0a8a3dd5d3d6fafd59a3d366 Mon Sep 17 00:00:00 2001 From: David Li Date: Wed, 22 Nov 2023 00:45:53 +0800 Subject: [PATCH 06/10] Release v0.12.3 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 089d2f8..a835aa1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "etcd-client" -version = "0.12.2" +version = "0.12.3" authors = ["The etcd-client Authors "] edition = "2021" readme = "README.md" From c8c40bc68b0d4f1f57ab4f4f84262ea5c1496ef5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=E2=80=86=20Li?= Date: Fri, 15 Dec 2023 19:37:38 +0800 Subject: [PATCH 07/10] Split test cases to tests directory (#72) --- src/client.rs | 777 ----------------------------------------------- tests/client.rs | 773 ++++++++++++++++++++++++++++++++++++++++++++++ tests/testing.rs | 10 + 3 files changed, 783 insertions(+), 777 deletions(-) create mode 100644 tests/client.rs create mode 100644 tests/testing.rs diff --git a/src/client.rs b/src/client.rs index 885fdd9..5b31b4b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -827,780 +827,3 @@ impl ConnectOptions { } } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::{Compare, CompareOp, EventType, PermissionType, TxnOp, TxnOpResponse}; - - const DEFAULT_TEST_ENDPOINT: &str = "localhost:2379"; - - /// Get client for testing. - async fn get_client() -> Result { - Client::connect([DEFAULT_TEST_ENDPOINT], None).await - } - - #[tokio::test] - async fn test_put() -> Result<()> { - let mut client = get_client().await?; - client.put("put", "123", None).await?; - - // overwrite with prev key - { - let resp = client - .put("put", "456", Some(PutOptions::new().with_prev_key())) - .await?; - let prev_key = resp.prev_key(); - assert!(prev_key.is_some()); - let prev_key = prev_key.unwrap(); - assert_eq!(prev_key.key(), b"put"); - assert_eq!(prev_key.value(), b"123"); - } - - // overwrite again with prev key - { - let resp = client - .put("put", "789", Some(PutOptions::new().with_prev_key())) - .await?; - let prev_key = resp.prev_key(); - assert!(prev_key.is_some()); - let prev_key = prev_key.unwrap(); - assert_eq!(prev_key.key(), b"put"); - assert_eq!(prev_key.value(), b"456"); - } - - Ok(()) - } - - #[tokio::test] - async fn test_get() -> Result<()> { - let mut client = get_client().await?; - client.put("get10", "10", None).await?; - client.put("get11", "11", None).await?; - client.put("get20", "20", None).await?; - client.put("get21", "21", None).await?; - - // get key - { - let resp = client.get("get11", None).await?; - assert_eq!(resp.count(), 1); - assert!(!resp.more()); - assert_eq!(resp.kvs().len(), 1); - assert_eq!(resp.kvs()[0].key(), b"get11"); - assert_eq!(resp.kvs()[0].value(), b"11"); - } - - // get from key - { - let resp = client - .get( - "get11", - Some(GetOptions::new().with_from_key().with_limit(2)), - ) - .await?; - assert!(resp.more()); - assert_eq!(resp.kvs().len(), 2); - assert_eq!(resp.kvs()[0].key(), b"get11"); - assert_eq!(resp.kvs()[0].value(), b"11"); - assert_eq!(resp.kvs()[1].key(), b"get20"); - assert_eq!(resp.kvs()[1].value(), b"20"); - } - - // get prefix keys - { - let resp = client - .get("get1", Some(GetOptions::new().with_prefix())) - .await?; - assert_eq!(resp.count(), 2); - assert!(!resp.more()); - assert_eq!(resp.kvs().len(), 2); - assert_eq!(resp.kvs()[0].key(), b"get10"); - assert_eq!(resp.kvs()[0].value(), b"10"); - assert_eq!(resp.kvs()[1].key(), b"get11"); - assert_eq!(resp.kvs()[1].value(), b"11"); - } - - Ok(()) - } - - #[tokio::test] - async fn test_delete() -> Result<()> { - let mut client = get_client().await?; - client.put("del10", "10", None).await?; - client.put("del11", "11", None).await?; - client.put("del20", "20", None).await?; - client.put("del21", "21", None).await?; - client.put("del31", "31", None).await?; - client.put("del32", "32", None).await?; - - // delete key - { - let resp = client.delete("del11", None).await?; - assert_eq!(resp.deleted(), 1); - let resp = client - .get("del11", Some(GetOptions::new().with_count_only())) - .await?; - assert_eq!(resp.count(), 0); - } - - // delete a range of keys - { - let resp = client - .delete("del11", Some(DeleteOptions::new().with_range("del22"))) - .await?; - assert_eq!(resp.deleted(), 2); - let resp = client - .get( - "del11", - Some(GetOptions::new().with_range("del22").with_count_only()), - ) - .await?; - assert_eq!(resp.count(), 0); - } - - // delete key with prefix - { - let resp = client - .delete("del3", Some(DeleteOptions::new().with_prefix())) - .await?; - assert_eq!(resp.deleted(), 2); - let resp = client - .get("del32", Some(GetOptions::new().with_count_only())) - .await?; - assert_eq!(resp.count(), 0); - } - - Ok(()) - } - - #[tokio::test] - async fn test_compact() -> Result<()> { - let mut client = get_client().await?; - let rev0 = client - .put("compact", "0", None) - .await? - .header() - .unwrap() - .revision(); - let rev1 = client - .put("compact", "1", None) - .await? - .header() - .unwrap() - .revision(); - - // before compacting - let rev0_resp = client - .get("compact", Some(GetOptions::new().with_revision(rev0))) - .await?; - assert_eq!(rev0_resp.kvs()[0].value(), b"0"); - let rev1_resp = client - .get("compact", Some(GetOptions::new().with_revision(rev1))) - .await?; - assert_eq!(rev1_resp.kvs()[0].value(), b"1"); - - client.compact(rev1, None).await?; - - // after compacting - let result = client - .get("compact", Some(GetOptions::new().with_revision(rev0))) - .await; - assert!(result.is_err()); - let rev1_resp = client - .get("compact", Some(GetOptions::new().with_revision(rev1))) - .await?; - assert_eq!(rev1_resp.kvs()[0].value(), b"1"); - - Ok(()) - } - - #[tokio::test] - async fn test_txn() -> Result<()> { - let mut client = get_client().await?; - client.put("txn01", "01", None).await?; - - // transaction 1 - { - let resp = client - .txn( - Txn::new() - .when(&[Compare::value("txn01", CompareOp::Equal, "01")][..]) - .and_then( - &[TxnOp::put( - "txn01", - "02", - Some(PutOptions::new().with_prev_key()), - )][..], - ) - .or_else(&[TxnOp::get("txn01", None)][..]), - ) - .await?; - - assert!(resp.succeeded()); - let op_responses = resp.op_responses(); - assert_eq!(op_responses.len(), 1); - - match op_responses[0] { - TxnOpResponse::Put(ref resp) => assert_eq!(resp.prev_key().unwrap().value(), b"01"), - _ => panic!("unexpected response"), - } - - let resp = client.get("txn01", None).await?; - assert_eq!(resp.kvs()[0].key(), b"txn01"); - assert_eq!(resp.kvs()[0].value(), b"02"); - } - - // transaction 2 - { - let resp = client - .txn( - Txn::new() - .when(&[Compare::value("txn01", CompareOp::Equal, "01")][..]) - .and_then(&[TxnOp::put("txn01", "02", None)][..]) - .or_else(&[TxnOp::get("txn01", None)][..]), - ) - .await?; - - assert!(!resp.succeeded()); - let op_responses = resp.op_responses(); - assert_eq!(op_responses.len(), 1); - - match op_responses[0] { - TxnOpResponse::Get(ref resp) => assert_eq!(resp.kvs()[0].value(), b"02"), - _ => panic!("unexpected response"), - } - } - - Ok(()) - } - - #[tokio::test] - async fn test_watch() -> Result<()> { - let mut client = get_client().await?; - - let (mut watcher, mut stream) = client.watch("watch01", None).await?; - - client.put("watch01", "01", None).await?; - - let resp = stream.message().await?.unwrap(); - assert_eq!(resp.watch_id(), watcher.watch_id()); - assert_eq!(resp.events().len(), 1); - - let kv = resp.events()[0].kv().unwrap(); - assert_eq!(kv.key(), b"watch01"); - assert_eq!(kv.value(), b"01"); - assert_eq!(resp.events()[0].event_type(), EventType::Put); - - watcher.cancel().await?; - - let resp = stream.message().await?.unwrap(); - assert_eq!(resp.watch_id(), watcher.watch_id()); - assert!(resp.canceled()); - - Ok(()) - } - - #[tokio::test] - async fn test_grant_revoke() -> Result<()> { - let mut client = get_client().await?; - let resp = client.lease_grant(123, None).await?; - assert_eq!(resp.ttl(), 123); - let id = resp.id(); - client.lease_revoke(id).await?; - Ok(()) - } - - #[tokio::test] - async fn test_keep_alive() -> Result<()> { - let mut client = get_client().await?; - - let resp = client.lease_grant(60, None).await?; - assert_eq!(resp.ttl(), 60); - let id = resp.id(); - - let (mut keeper, mut stream) = client.lease_keep_alive(id).await?; - keeper.keep_alive().await?; - - let resp = stream.message().await?.unwrap(); - assert_eq!(resp.id(), keeper.id()); - assert_eq!(resp.ttl(), 60); - - client.lease_revoke(id).await?; - Ok(()) - } - - #[tokio::test] - async fn test_time_to_live() -> Result<()> { - let mut client = get_client().await?; - let leaseid = 200; - let resp = client - .lease_grant(60, Some(LeaseGrantOptions::new().with_id(leaseid))) - .await?; - assert_eq!(resp.ttl(), 60); - assert_eq!(resp.id(), leaseid); - - let resp = client.lease_time_to_live(leaseid, None).await?; - assert_eq!(resp.id(), leaseid); - assert_eq!(resp.granted_ttl(), 60); - - client.lease_revoke(leaseid).await?; - Ok(()) - } - - #[tokio::test] - async fn test_leases() -> Result<()> { - let lease1 = 100; - let lease2 = 101; - let lease3 = 102; - - let mut client = get_client().await?; - let resp = client - .lease_grant(60, Some(LeaseGrantOptions::new().with_id(lease1))) - .await?; - assert_eq!(resp.ttl(), 60); - assert_eq!(resp.id(), lease1); - - let resp = client - .lease_grant(60, Some(LeaseGrantOptions::new().with_id(lease2))) - .await?; - assert_eq!(resp.ttl(), 60); - assert_eq!(resp.id(), lease2); - - let resp = client - .lease_grant(60, Some(LeaseGrantOptions::new().with_id(lease3))) - .await?; - assert_eq!(resp.ttl(), 60); - assert_eq!(resp.id(), lease3); - - let resp = client.leases().await?; - let leases: Vec<_> = resp.leases().iter().map(|status| status.id()).collect(); - assert!(leases.contains(&lease1)); - assert!(leases.contains(&lease2)); - assert!(leases.contains(&lease3)); - - client.lease_revoke(lease1).await?; - client.lease_revoke(lease2).await?; - client.lease_revoke(lease3).await?; - Ok(()) - } - - #[tokio::test] - async fn test_lock() -> Result<()> { - let mut client = get_client().await?; - let resp = client.lock("lock-test", None).await?; - let key = resp.key(); - let key_str = std::str::from_utf8(key)?; - assert!(key_str.starts_with("lock-test/")); - - client.unlock(key).await?; - Ok(()) - } - - #[ignore] - #[tokio::test] - async fn test_auth() -> Result<()> { - let mut client = get_client().await?; - client.auth_enable().await?; - - // after enable auth, must operate by authenticated client - client.put("auth-test", "value", None).await.unwrap_err(); - - // connect with authenticate, the user must already exists - let options = Some(ConnectOptions::new().with_user( - "root", // user name - "rootpwd", // password - )); - let mut client_auth = Client::connect(["localhost:2379"], options).await?; - client_auth.put("auth-test", "value", None).await?; - - client_auth.auth_disable().await?; - - // after disable auth, operate ok - let mut client = get_client().await?; - client.put("auth-test", "value", None).await?; - - Ok(()) - } - - #[tokio::test] - async fn test_role() -> Result<()> { - let mut client = get_client().await?; - - let role1 = "role1"; - let role2 = "role2"; - - let _ = client.role_delete(role1).await; - let _ = client.role_delete(role2).await; - - client.role_add(role1).await?; - - client.role_get(role1).await?; - - client.role_delete(role1).await?; - client.role_get(role1).await.unwrap_err(); - - client.role_add(role2).await?; - client.role_get(role2).await?; - - { - let resp = client.role_list().await?; - assert!(resp.roles().contains(&role2.to_string())); - } - - client - .role_grant_permission(role2, Permission::read("123")) - .await?; - client - .role_grant_permission(role2, Permission::write("abc").with_from_key()) - .await?; - client - .role_grant_permission(role2, Permission::read_write("hi").with_range_end("hjj")) - .await?; - client - .role_grant_permission( - role2, - Permission::new(PermissionType::Write, "pp").with_prefix(), - ) - .await?; - client - .role_grant_permission( - role2, - Permission::new(PermissionType::Read, "xyz").with_all_keys(), - ) - .await?; - - { - let resp = client.role_get(role2).await?; - let permissions = resp.permissions(); - assert!(permissions.contains(&Permission::read("123"))); - assert!(permissions.contains(&Permission::write("abc").with_from_key())); - assert!(permissions.contains(&Permission::read_write("hi").with_range_end("hjj"))); - assert!(permissions.contains(&Permission::write("pp").with_prefix())); - assert!(permissions.contains(&Permission::read("xyz").with_all_keys())); - } - - //revoke all permission - client.role_revoke_permission(role2, "123", None).await?; - client - .role_revoke_permission( - role2, - "abc", - Some(RoleRevokePermissionOptions::new().with_from_key()), - ) - .await?; - client - .role_revoke_permission( - role2, - "hi", - Some(RoleRevokePermissionOptions::new().with_range_end("hjj")), - ) - .await?; - client - .role_revoke_permission( - role2, - "pp", - Some(RoleRevokePermissionOptions::new().with_prefix()), - ) - .await?; - client - .role_revoke_permission( - role2, - "xyz", - Some(RoleRevokePermissionOptions::new().with_all_keys()), - ) - .await?; - - let resp = client.role_get(role2).await?; - assert!(resp.permissions().is_empty()); - - client.role_delete(role2).await?; - - Ok(()) - } - - #[tokio::test] - async fn test_user() -> Result<()> { - let name1 = "usr1"; - let password1 = "pwd1"; - let name2 = "usr2"; - let password2 = "pwd2"; - let name3 = "usr3"; - let password3 = "pwd3"; - let role1 = "role1"; - - let mut client = get_client().await?; - - // ignore result - let _resp = client.user_delete(name1).await; - let _resp = client.user_delete(name2).await; - let _resp = client.user_delete(name3).await; - let _resp = client.role_delete(role1).await; - - client - .user_add(name1, password1, Some(UserAddOptions::new())) - .await?; - - client - .user_add(name2, password2, Some(UserAddOptions::new().with_no_pwd())) - .await?; - - client.user_add(name3, password3, None).await?; - - client.user_get(name1).await?; - - { - let resp = client.user_list().await?; - assert!(resp.users().contains(&name1.to_string())); - } - - client.user_delete(name2).await?; - client.user_get(name2).await.unwrap_err(); - - client.user_change_password(name1, password2).await?; - client.user_get(name1).await?; - - client.role_add(role1).await?; - client.user_grant_role(name1, role1).await?; - client.user_get(name1).await?; - - client.user_revoke_role(name1, role1).await?; - client.user_get(name1).await?; - - let _ = client.user_delete(name1).await; - let _ = client.user_delete(name2).await; - let _ = client.user_delete(name3).await; - let _ = client.role_delete(role1).await; - - Ok(()) - } - - #[tokio::test] - async fn test_alarm() -> Result<()> { - let mut client = get_client().await?; - - // Test deactivate alarm. - { - let options = AlarmOptions::new(); - let _resp = client - .alarm(AlarmAction::Deactivate, AlarmType::None, Some(options)) - .await?; - } - - // Test get None alarm. - let member_id = { - let resp = client - .alarm(AlarmAction::Get, AlarmType::None, None) - .await?; - let mems = resp.alarms(); - assert_eq!(mems.len(), 0); - 0 - }; - - let mut options = AlarmOptions::new(); - options.with_member(member_id); - - // Test get no space alarm. - { - let resp = client - .alarm(AlarmAction::Get, AlarmType::Nospace, Some(options.clone())) - .await?; - let mems = resp.alarms(); - assert_eq!(mems.len(), 0); - } - - Ok(()) - } - - #[tokio::test] - async fn test_status() -> Result<()> { - let mut client = get_client().await?; - let resp = client.status().await?; - - let db_size = resp.db_size(); - assert_ne!(db_size, 0); - Ok(()) - } - - #[tokio::test] - async fn test_defragment() -> Result<()> { - let mut client = get_client().await?; - let resp = client.defragment().await?; - let hd = resp.header(); - assert!(hd.is_none()); - Ok(()) - } - - #[tokio::test] - async fn test_hash() -> Result<()> { - let mut client = get_client().await?; - let resp = client.hash().await?; - let hd = resp.header(); - assert!(hd.is_some()); - assert_ne!(resp.hash(), 0); - Ok(()) - } - - #[tokio::test] - async fn test_hash_kv() -> Result<()> { - let mut client = get_client().await?; - let resp = client.hash_kv(0).await?; - let hd = resp.header(); - assert!(hd.is_some()); - assert_ne!(resp.hash(), 0); - assert_ne!(resp.compact_version(), 0); - Ok(()) - } - - #[tokio::test] - async fn test_snapshot() -> Result<()> { - let mut client = get_client().await?; - let mut msg = client.snapshot().await?; - loop { - if let Some(resp) = msg.message().await? { - assert!(!resp.blob().is_empty()); - if resp.remaining_bytes() == 0 { - break; - } - } - } - Ok(()) - } - - #[ignore] - #[tokio::test] - async fn test_cluster() -> Result<()> { - let node1 = "localhost:2520"; - let node2 = "localhost:2530"; - let node3 = "localhost:2540"; - let mut client = get_client().await?; - let resp = client - .member_add([node1], Some(MemberAddOptions::new().with_is_learner())) - .await?; - let id1 = resp.member().unwrap().id(); - - let resp = client.member_add([node2], None).await?; - let id2 = resp.member().unwrap().id(); - let resp = client.member_add([node3], None).await?; - let id3 = resp.member().unwrap().id(); - - let resp = client.member_list().await?; - let members: Vec<_> = resp.members().iter().map(|member| member.id()).collect(); - assert!(members.contains(&id1)); - assert!(members.contains(&id2)); - assert!(members.contains(&id3)); - Ok(()) - } - - #[tokio::test] - async fn test_move_leader() -> Result<()> { - let mut client = get_client().await?; - let resp = client.member_list().await?; - let member_list = resp.members(); - - let resp = client.status().await?; - let leader_id = resp.leader(); - println!("status {:?}, leader_id {:?}", resp, resp.leader()); - - let mut member_id = leader_id; - for member in member_list { - println!("member_id {:?}, name is {:?}", member.id(), member.name()); - if member.id() != leader_id { - member_id = member.id(); - break; - } - } - - let resp = client.move_leader(member_id).await?; - let header = resp.header(); - if member_id == leader_id { - assert!(header.is_none()); - } else { - assert!(header.is_some()); - } - - Ok(()) - } - - #[tokio::test] - async fn test_election() -> Result<()> { - let mut client = get_client().await?; - let resp = client.lease_grant(10, None).await?; - let lease_id = resp.id(); - assert_eq!(resp.ttl(), 10); - - let resp = client.campaign("myElection", "123", lease_id).await?; - let leader = resp.leader().unwrap(); - assert_eq!(leader.name(), b"myElection"); - assert_eq!(leader.lease(), lease_id); - - let resp = client - .proclaim( - "123", - Some(ProclaimOptions::new().with_leader(leader.clone())), - ) - .await?; - let header = resp.header(); - println!("proclaim header {:?}", header.unwrap()); - assert!(header.is_some()); - - let mut msg = client.observe(leader.name()).await?; - loop { - if let Some(resp) = msg.message().await? { - assert!(resp.kv().is_some()); - println!("observe key {:?}", resp.kv().unwrap().key_str()); - if resp.kv().is_some() { - break; - } - } - } - - let resp = client.leader("myElection").await?; - let kv = resp.kv().unwrap(); - assert_eq!(kv.value(), b"123"); - assert_eq!(kv.key(), leader.key()); - println!("key is {:?}", kv.key_str()); - println!("value is {:?}", kv.value_str()); - - let resign_option = ResignOptions::new().with_leader(leader.clone()); - - let resp = client.resign(Some(resign_option)).await?; - let header = resp.header(); - println!("resign header {:?}", header.unwrap()); - assert!(header.is_some()); - - Ok(()) - } - - #[tokio::test] - async fn test_remove_and_add_endpoint() -> Result<()> { - let mut client = get_client().await?; - client.put("endpoint", "add_remove", None).await?; - - // get key - { - let resp = client.get("endpoint", None).await?; - assert_eq!(resp.count(), 1); - assert!(!resp.more()); - assert_eq!(resp.kvs().len(), 1); - assert_eq!(resp.kvs()[0].key(), b"endpoint"); - assert_eq!(resp.kvs()[0].value(), b"add_remove"); - } - - // remove endpoint - client.remove_endpoint(DEFAULT_TEST_ENDPOINT).await?; - // `Client::get` will hang before adding the endpoint back - client.add_endpoint(DEFAULT_TEST_ENDPOINT).await?; - - // get key after remove and add endpoint - { - let resp = client.get("endpoint", None).await?; - assert_eq!(resp.count(), 1); - assert!(!resp.more()); - assert_eq!(resp.kvs().len(), 1); - assert_eq!(resp.kvs()[0].key(), b"endpoint"); - assert_eq!(resp.kvs()[0].value(), b"add_remove"); - } - - Ok(()) - } -} diff --git a/tests/client.rs b/tests/client.rs new file mode 100644 index 0000000..46b2e46 --- /dev/null +++ b/tests/client.rs @@ -0,0 +1,773 @@ +mod testing; + +use crate::testing::{get_client, Result, DEFAULT_TEST_ENDPOINT}; +use etcd_client::{ + AlarmAction, AlarmOptions, AlarmType, Client, Compare, CompareOp, ConnectOptions, + DeleteOptions, EventType, GetOptions, LeaseGrantOptions, MemberAddOptions, Permission, + PermissionType, ProclaimOptions, PutOptions, ResignOptions, RoleRevokePermissionOptions, Txn, + TxnOp, TxnOpResponse, UserAddOptions, +}; + +#[tokio::test] +async fn test_put() -> Result<()> { + let mut client = get_client().await?; + client.put("put", "123", None).await?; + + // overwrite with prev key + { + let resp = client + .put("put", "456", Some(PutOptions::new().with_prev_key())) + .await?; + let prev_key = resp.prev_key(); + assert!(prev_key.is_some()); + let prev_key = prev_key.unwrap(); + assert_eq!(prev_key.key(), b"put"); + assert_eq!(prev_key.value(), b"123"); + } + + // overwrite again with prev key + { + let resp = client + .put("put", "789", Some(PutOptions::new().with_prev_key())) + .await?; + let prev_key = resp.prev_key(); + assert!(prev_key.is_some()); + let prev_key = prev_key.unwrap(); + assert_eq!(prev_key.key(), b"put"); + assert_eq!(prev_key.value(), b"456"); + } + + Ok(()) +} + +#[tokio::test] +async fn test_get() -> Result<()> { + let mut client = get_client().await?; + client.put("get10", "10", None).await?; + client.put("get11", "11", None).await?; + client.put("get20", "20", None).await?; + client.put("get21", "21", None).await?; + + // get key + { + let resp = client.get("get11", None).await?; + assert_eq!(resp.count(), 1); + assert!(!resp.more()); + assert_eq!(resp.kvs().len(), 1); + assert_eq!(resp.kvs()[0].key(), b"get11"); + assert_eq!(resp.kvs()[0].value(), b"11"); + } + + // get from key + { + let resp = client + .get( + "get11", + Some(GetOptions::new().with_from_key().with_limit(2)), + ) + .await?; + assert!(resp.more()); + assert_eq!(resp.kvs().len(), 2); + assert_eq!(resp.kvs()[0].key(), b"get11"); + assert_eq!(resp.kvs()[0].value(), b"11"); + assert_eq!(resp.kvs()[1].key(), b"get20"); + assert_eq!(resp.kvs()[1].value(), b"20"); + } + + // get prefix keys + { + let resp = client + .get("get1", Some(GetOptions::new().with_prefix())) + .await?; + assert_eq!(resp.count(), 2); + assert!(!resp.more()); + assert_eq!(resp.kvs().len(), 2); + assert_eq!(resp.kvs()[0].key(), b"get10"); + assert_eq!(resp.kvs()[0].value(), b"10"); + assert_eq!(resp.kvs()[1].key(), b"get11"); + assert_eq!(resp.kvs()[1].value(), b"11"); + } + + Ok(()) +} + +#[tokio::test] +async fn test_delete() -> Result<()> { + let mut client = get_client().await?; + client.put("del10", "10", None).await?; + client.put("del11", "11", None).await?; + client.put("del20", "20", None).await?; + client.put("del21", "21", None).await?; + client.put("del31", "31", None).await?; + client.put("del32", "32", None).await?; + + // delete key + { + let resp = client.delete("del11", None).await?; + assert_eq!(resp.deleted(), 1); + let resp = client + .get("del11", Some(GetOptions::new().with_count_only())) + .await?; + assert_eq!(resp.count(), 0); + } + + // delete a range of keys + { + let resp = client + .delete("del11", Some(DeleteOptions::new().with_range("del22"))) + .await?; + assert_eq!(resp.deleted(), 2); + let resp = client + .get( + "del11", + Some(GetOptions::new().with_range("del22").with_count_only()), + ) + .await?; + assert_eq!(resp.count(), 0); + } + + // delete key with prefix + { + let resp = client + .delete("del3", Some(DeleteOptions::new().with_prefix())) + .await?; + assert_eq!(resp.deleted(), 2); + let resp = client + .get("del32", Some(GetOptions::new().with_count_only())) + .await?; + assert_eq!(resp.count(), 0); + } + + Ok(()) +} + +#[tokio::test] +async fn test_compact() -> Result<()> { + let mut client = get_client().await?; + let rev0 = client + .put("compact", "0", None) + .await? + .header() + .unwrap() + .revision(); + let rev1 = client + .put("compact", "1", None) + .await? + .header() + .unwrap() + .revision(); + + // before compacting + let rev0_resp = client + .get("compact", Some(GetOptions::new().with_revision(rev0))) + .await?; + assert_eq!(rev0_resp.kvs()[0].value(), b"0"); + let rev1_resp = client + .get("compact", Some(GetOptions::new().with_revision(rev1))) + .await?; + assert_eq!(rev1_resp.kvs()[0].value(), b"1"); + + client.compact(rev1, None).await?; + + // after compacting + let result = client + .get("compact", Some(GetOptions::new().with_revision(rev0))) + .await; + assert!(result.is_err()); + let rev1_resp = client + .get("compact", Some(GetOptions::new().with_revision(rev1))) + .await?; + assert_eq!(rev1_resp.kvs()[0].value(), b"1"); + + Ok(()) +} + +#[tokio::test] +async fn test_txn() -> Result<()> { + let mut client = get_client().await?; + client.put("txn01", "01", None).await?; + + // transaction 1 + { + let resp = client + .txn( + Txn::new() + .when(&[Compare::value("txn01", CompareOp::Equal, "01")][..]) + .and_then( + &[TxnOp::put( + "txn01", + "02", + Some(PutOptions::new().with_prev_key()), + )][..], + ) + .or_else(&[TxnOp::get("txn01", None)][..]), + ) + .await?; + + assert!(resp.succeeded()); + let op_responses = resp.op_responses(); + assert_eq!(op_responses.len(), 1); + + match op_responses[0] { + TxnOpResponse::Put(ref resp) => assert_eq!(resp.prev_key().unwrap().value(), b"01"), + _ => panic!("unexpected response"), + } + + let resp = client.get("txn01", None).await?; + assert_eq!(resp.kvs()[0].key(), b"txn01"); + assert_eq!(resp.kvs()[0].value(), b"02"); + } + + // transaction 2 + { + let resp = client + .txn( + Txn::new() + .when(&[Compare::value("txn01", CompareOp::Equal, "01")][..]) + .and_then(&[TxnOp::put("txn01", "02", None)][..]) + .or_else(&[TxnOp::get("txn01", None)][..]), + ) + .await?; + + assert!(!resp.succeeded()); + let op_responses = resp.op_responses(); + assert_eq!(op_responses.len(), 1); + + match op_responses[0] { + TxnOpResponse::Get(ref resp) => assert_eq!(resp.kvs()[0].value(), b"02"), + _ => panic!("unexpected response"), + } + } + + Ok(()) +} + +#[tokio::test] +async fn test_watch() -> Result<()> { + let mut client = get_client().await?; + + let (mut watcher, mut stream) = client.watch("watch01", None).await?; + + client.put("watch01", "01", None).await?; + + let resp = stream.message().await?.unwrap(); + assert_eq!(resp.watch_id(), watcher.watch_id()); + assert_eq!(resp.events().len(), 1); + + let kv = resp.events()[0].kv().unwrap(); + assert_eq!(kv.key(), b"watch01"); + assert_eq!(kv.value(), b"01"); + assert_eq!(resp.events()[0].event_type(), EventType::Put); + + watcher.cancel().await?; + + let resp = stream.message().await?.unwrap(); + assert_eq!(resp.watch_id(), watcher.watch_id()); + assert!(resp.canceled()); + + Ok(()) +} + +#[tokio::test] +async fn test_grant_revoke() -> Result<()> { + let mut client = get_client().await?; + let resp = client.lease_grant(123, None).await?; + assert_eq!(resp.ttl(), 123); + let id = resp.id(); + client.lease_revoke(id).await?; + Ok(()) +} + +#[tokio::test] +async fn test_keep_alive() -> Result<()> { + let mut client = get_client().await?; + + let resp = client.lease_grant(60, None).await?; + assert_eq!(resp.ttl(), 60); + let id = resp.id(); + + let (mut keeper, mut stream) = client.lease_keep_alive(id).await?; + keeper.keep_alive().await?; + + let resp = stream.message().await?.unwrap(); + assert_eq!(resp.id(), keeper.id()); + assert_eq!(resp.ttl(), 60); + + client.lease_revoke(id).await?; + Ok(()) +} + +#[tokio::test] +async fn test_time_to_live() -> Result<()> { + let mut client = get_client().await?; + let lease_id = 200; + let resp = client + .lease_grant(60, Some(LeaseGrantOptions::new().with_id(lease_id))) + .await?; + assert_eq!(resp.ttl(), 60); + assert_eq!(resp.id(), lease_id); + + let resp = client.lease_time_to_live(lease_id, None).await?; + assert_eq!(resp.id(), lease_id); + assert_eq!(resp.granted_ttl(), 60); + + client.lease_revoke(lease_id).await?; + Ok(()) +} + +#[tokio::test] +async fn test_leases() -> Result<()> { + let lease1 = 100; + let lease2 = 101; + let lease3 = 102; + + let mut client = get_client().await?; + let resp = client + .lease_grant(60, Some(LeaseGrantOptions::new().with_id(lease1))) + .await?; + assert_eq!(resp.ttl(), 60); + assert_eq!(resp.id(), lease1); + + let resp = client + .lease_grant(60, Some(LeaseGrantOptions::new().with_id(lease2))) + .await?; + assert_eq!(resp.ttl(), 60); + assert_eq!(resp.id(), lease2); + + let resp = client + .lease_grant(60, Some(LeaseGrantOptions::new().with_id(lease3))) + .await?; + assert_eq!(resp.ttl(), 60); + assert_eq!(resp.id(), lease3); + + let resp = client.leases().await?; + let leases: Vec<_> = resp.leases().iter().map(|status| status.id()).collect(); + assert!(leases.contains(&lease1)); + assert!(leases.contains(&lease2)); + assert!(leases.contains(&lease3)); + + client.lease_revoke(lease1).await?; + client.lease_revoke(lease2).await?; + client.lease_revoke(lease3).await?; + Ok(()) +} + +#[tokio::test] +async fn test_lock() -> Result<()> { + let mut client = get_client().await?; + let resp = client.lock("lock-test", None).await?; + let key = resp.key(); + let key_str = std::str::from_utf8(key)?; + assert!(key_str.starts_with("lock-test/")); + + client.unlock(key).await?; + Ok(()) +} + +#[ignore] +#[tokio::test] +async fn test_auth() -> Result<()> { + let mut client = get_client().await?; + client.auth_enable().await?; + + // after enable auth, must operate by authenticated client + client.put("auth-test", "value", None).await.unwrap_err(); + + // connect with authenticate, the user must already exists + let options = Some(ConnectOptions::new().with_user( + "root", // user name + "rootpwd", // password + )); + let mut client_auth = Client::connect(["localhost:2379"], options).await?; + client_auth.put("auth-test", "value", None).await?; + + client_auth.auth_disable().await?; + + // after disable auth, operate ok + let mut client = get_client().await?; + client.put("auth-test", "value", None).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_role() -> Result<()> { + let mut client = get_client().await?; + + let role1 = "role1"; + let role2 = "role2"; + + let _ = client.role_delete(role1).await; + let _ = client.role_delete(role2).await; + + client.role_add(role1).await?; + + client.role_get(role1).await?; + + client.role_delete(role1).await?; + client.role_get(role1).await.unwrap_err(); + + client.role_add(role2).await?; + client.role_get(role2).await?; + + { + let resp = client.role_list().await?; + assert!(resp.roles().contains(&role2.to_string())); + } + + client + .role_grant_permission(role2, Permission::read("123")) + .await?; + client + .role_grant_permission(role2, Permission::write("abc").with_from_key()) + .await?; + client + .role_grant_permission(role2, Permission::read_write("hi").with_range_end("hjj")) + .await?; + client + .role_grant_permission( + role2, + Permission::new(PermissionType::Write, "pp").with_prefix(), + ) + .await?; + client + .role_grant_permission( + role2, + Permission::new(PermissionType::Read, "xyz").with_all_keys(), + ) + .await?; + + { + let resp = client.role_get(role2).await?; + let permissions = resp.permissions(); + assert!(permissions.contains(&Permission::read("123"))); + assert!(permissions.contains(&Permission::write("abc").with_from_key())); + assert!(permissions.contains(&Permission::read_write("hi").with_range_end("hjj"))); + assert!(permissions.contains(&Permission::write("pp").with_prefix())); + assert!(permissions.contains(&Permission::read("xyz").with_all_keys())); + } + + //revoke all permission + client.role_revoke_permission(role2, "123", None).await?; + client + .role_revoke_permission( + role2, + "abc", + Some(RoleRevokePermissionOptions::new().with_from_key()), + ) + .await?; + client + .role_revoke_permission( + role2, + "hi", + Some(RoleRevokePermissionOptions::new().with_range_end("hjj")), + ) + .await?; + client + .role_revoke_permission( + role2, + "pp", + Some(RoleRevokePermissionOptions::new().with_prefix()), + ) + .await?; + client + .role_revoke_permission( + role2, + "xyz", + Some(RoleRevokePermissionOptions::new().with_all_keys()), + ) + .await?; + + let resp = client.role_get(role2).await?; + assert!(resp.permissions().is_empty()); + + client.role_delete(role2).await?; + + Ok(()) +} + +#[tokio::test] +async fn test_user() -> Result<()> { + let name1 = "usr1"; + let password1 = "pwd1"; + let name2 = "usr2"; + let password2 = "pwd2"; + let name3 = "usr3"; + let password3 = "pwd3"; + let role1 = "role1"; + + let mut client = get_client().await?; + + // ignore result + let _resp = client.user_delete(name1).await; + let _resp = client.user_delete(name2).await; + let _resp = client.user_delete(name3).await; + let _resp = client.role_delete(role1).await; + + client + .user_add(name1, password1, Some(UserAddOptions::new())) + .await?; + + client + .user_add(name2, password2, Some(UserAddOptions::new().with_no_pwd())) + .await?; + + client.user_add(name3, password3, None).await?; + + client.user_get(name1).await?; + + { + let resp = client.user_list().await?; + assert!(resp.users().contains(&name1.to_string())); + } + + client.user_delete(name2).await?; + client.user_get(name2).await.unwrap_err(); + + client.user_change_password(name1, password2).await?; + client.user_get(name1).await?; + + client.role_add(role1).await?; + client.user_grant_role(name1, role1).await?; + client.user_get(name1).await?; + + client.user_revoke_role(name1, role1).await?; + client.user_get(name1).await?; + + let _ = client.user_delete(name1).await; + let _ = client.user_delete(name2).await; + let _ = client.user_delete(name3).await; + let _ = client.role_delete(role1).await; + + Ok(()) +} + +#[tokio::test] +async fn test_alarm() -> Result<()> { + let mut client = get_client().await?; + + // Test deactivate alarm. + { + let options = AlarmOptions::new(); + let _resp = client + .alarm(AlarmAction::Deactivate, AlarmType::None, Some(options)) + .await?; + } + + // Test get None alarm. + let member_id = { + let resp = client + .alarm(AlarmAction::Get, AlarmType::None, None) + .await?; + let mems = resp.alarms(); + assert_eq!(mems.len(), 0); + 0 + }; + + let mut options = AlarmOptions::new(); + options.with_member(member_id); + + // Test get no space alarm. + { + let resp = client + .alarm(AlarmAction::Get, AlarmType::Nospace, Some(options.clone())) + .await?; + let mems = resp.alarms(); + assert_eq!(mems.len(), 0); + } + + Ok(()) +} + +#[tokio::test] +async fn test_status() -> Result<()> { + let mut client = get_client().await?; + let resp = client.status().await?; + + let db_size = resp.db_size(); + assert_ne!(db_size, 0); + Ok(()) +} + +#[tokio::test] +async fn test_defragment() -> Result<()> { + let mut client = get_client().await?; + let resp = client.defragment().await?; + let hd = resp.header(); + assert!(hd.is_none()); + Ok(()) +} + +#[tokio::test] +async fn test_hash() -> Result<()> { + let mut client = get_client().await?; + let resp = client.hash().await?; + let hd = resp.header(); + assert!(hd.is_some()); + assert_ne!(resp.hash(), 0); + Ok(()) +} + +#[tokio::test] +async fn test_hash_kv() -> Result<()> { + let mut client = get_client().await?; + let resp = client.hash_kv(0).await?; + let hd = resp.header(); + assert!(hd.is_some()); + assert_ne!(resp.hash(), 0); + assert_ne!(resp.compact_version(), 0); + Ok(()) +} + +#[tokio::test] +async fn test_snapshot() -> Result<()> { + let mut client = get_client().await?; + let mut msg = client.snapshot().await?; + loop { + if let Some(resp) = msg.message().await? { + assert!(!resp.blob().is_empty()); + if resp.remaining_bytes() == 0 { + break; + } + } + } + Ok(()) +} + +#[ignore] +#[tokio::test] +async fn test_cluster() -> Result<()> { + let node1 = "localhost:2520"; + let node2 = "localhost:2530"; + let node3 = "localhost:2540"; + let mut client = get_client().await?; + let resp = client + .member_add([node1], Some(MemberAddOptions::new().with_is_learner())) + .await?; + let id1 = resp.member().unwrap().id(); + + let resp = client.member_add([node2], None).await?; + let id2 = resp.member().unwrap().id(); + let resp = client.member_add([node3], None).await?; + let id3 = resp.member().unwrap().id(); + + let resp = client.member_list().await?; + let members: Vec<_> = resp.members().iter().map(|member| member.id()).collect(); + assert!(members.contains(&id1)); + assert!(members.contains(&id2)); + assert!(members.contains(&id3)); + Ok(()) +} + +#[tokio::test] +async fn test_move_leader() -> Result<()> { + let mut client = get_client().await?; + let resp = client.member_list().await?; + let member_list = resp.members(); + + let resp = client.status().await?; + let leader_id = resp.leader(); + println!("status {:?}, leader_id {:?}", resp, resp.leader()); + + let mut member_id = leader_id; + for member in member_list { + println!("member_id {:?}, name is {:?}", member.id(), member.name()); + if member.id() != leader_id { + member_id = member.id(); + break; + } + } + + let resp = client.move_leader(member_id).await?; + let header = resp.header(); + if member_id == leader_id { + assert!(header.is_none()); + } else { + assert!(header.is_some()); + } + + Ok(()) +} + +#[tokio::test] +async fn test_election() -> Result<()> { + let mut client = get_client().await?; + let resp = client.lease_grant(10, None).await?; + let lease_id = resp.id(); + assert_eq!(resp.ttl(), 10); + + let resp = client.campaign("myElection", "123", lease_id).await?; + let leader = resp.leader().unwrap(); + assert_eq!(leader.name(), b"myElection"); + assert_eq!(leader.lease(), lease_id); + + let resp = client + .proclaim( + "123", + Some(ProclaimOptions::new().with_leader(leader.clone())), + ) + .await?; + let header = resp.header(); + println!("proclaim header {:?}", header.unwrap()); + assert!(header.is_some()); + + let mut msg = client.observe(leader.name()).await?; + loop { + if let Some(resp) = msg.message().await? { + assert!(resp.kv().is_some()); + println!("observe key {:?}", resp.kv().unwrap().key_str()); + if resp.kv().is_some() { + break; + } + } + } + + let resp = client.leader("myElection").await?; + let kv = resp.kv().unwrap(); + assert_eq!(kv.value(), b"123"); + assert_eq!(kv.key(), leader.key()); + println!("key is {:?}", kv.key_str()); + println!("value is {:?}", kv.value_str()); + + let resign_option = ResignOptions::new().with_leader(leader.clone()); + + let resp = client.resign(Some(resign_option)).await?; + let header = resp.header(); + println!("resign header {:?}", header.unwrap()); + assert!(header.is_some()); + + Ok(()) +} + +#[tokio::test] +async fn test_remove_and_add_endpoint() -> Result<()> { + let mut client = get_client().await?; + client.put("endpoint", "add_remove", None).await?; + + // get key + { + let resp = client.get("endpoint", None).await?; + assert_eq!(resp.count(), 1); + assert!(!resp.more()); + assert_eq!(resp.kvs().len(), 1); + assert_eq!(resp.kvs()[0].key(), b"endpoint"); + assert_eq!(resp.kvs()[0].value(), b"add_remove"); + } + + // remove endpoint + client.remove_endpoint(DEFAULT_TEST_ENDPOINT).await?; + // `Client::get` will hang before adding the endpoint back + client.add_endpoint(DEFAULT_TEST_ENDPOINT).await?; + + // get key after remove and add endpoint + { + let resp = client.get("endpoint", None).await?; + assert_eq!(resp.count(), 1); + assert!(!resp.more()); + assert_eq!(resp.kvs().len(), 1); + assert_eq!(resp.kvs()[0].key(), b"endpoint"); + assert_eq!(resp.kvs()[0].value(), b"add_remove"); + } + + Ok(()) +} diff --git a/tests/testing.rs b/tests/testing.rs new file mode 100644 index 0000000..4131ed5 --- /dev/null +++ b/tests/testing.rs @@ -0,0 +1,10 @@ +use etcd_client::{Client, Error}; + +pub const DEFAULT_TEST_ENDPOINT: &str = "localhost:2379"; + +pub type Result = std::result::Result; + +/// Get client for testing. +pub async fn get_client() -> Result { + Client::connect([DEFAULT_TEST_ENDPOINT], None).await +} From 75e96f81f60c9facd0b24bebceb04b1d2928c852 Mon Sep 17 00:00:00 2001 From: Liu Yihua Date: Wed, 20 Dec 2023 11:31:10 +0800 Subject: [PATCH 08/10] Support tcp keepalive (#73) --- src/client.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/client.rs b/src/client.rs index 5b31b4b..6fe9cbe 100644 --- a/src/client.rs +++ b/src/client.rs @@ -198,6 +198,10 @@ impl Client { if let Some(timeout) = opts.connect_timeout { endpoint = endpoint.connect_timeout(timeout); } + + if let Some(tcp_keepalive) = opts.tcp_keepalive { + endpoint = endpoint.tcp_keepalive(Some(tcp_keepalive)); + } } Ok(endpoint) @@ -741,6 +745,8 @@ pub struct ConnectOptions { timeout: Option, /// Apply a timeout to connecting to the endpoint. connect_timeout: Option, + /// TCP keepalive. + tcp_keepalive: Option, #[cfg(feature = "tls")] tls: Option, #[cfg(feature = "tls-openssl")] @@ -800,6 +806,13 @@ impl ConnectOptions { self } + /// Enable TCP keepalive. + #[inline] + pub fn with_tcp_keepalive(mut self, tcp_keepalive: Duration) -> Self { + self.tcp_keepalive = Some(tcp_keepalive); + self + } + /// Whether send keep alive pings even there are no active requests. /// If disabled, keep-alive pings are only sent while there are opened request/response streams. /// If enabled, pings are also sent when no streams are active. @@ -820,6 +833,7 @@ impl ConnectOptions { keep_alive_while_idle: true, timeout: None, connect_timeout: None, + tcp_keepalive: None, #[cfg(feature = "tls")] tls: None, #[cfg(feature = "tls-openssl")] From f8900b1e56d5e9f9c4c4788adf6353f750ad246f Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 1 Jan 2024 11:21:28 +0800 Subject: [PATCH 09/10] feat: implement namespace APIs (#71) Signed-off-by: tison Co-authored-by: David Li --- README.md | 1 + examples/namespace.rs | 39 ++++++++++++++++ src/lib.rs | 3 ++ src/namespace/kv.rs | 73 +++++++++++++++++++++++++++++ src/namespace/lease.rs | 25 ++++++++++ src/namespace/mod.rs | 5 ++ src/rpc/kv.rs | 103 +++++++++++++++++++++++++++++++++++++++++ src/rpc/lease.rs | 8 ++++ src/rpc/mod.rs | 7 +++ src/vec.rs | 99 +++++++++++++++++++++++++++++++++++++++ tests/namespace.rs | 41 ++++++++++++++++ 11 files changed, 404 insertions(+) create mode 100644 examples/namespace.rs create mode 100644 src/namespace/kv.rs create mode 100644 src/namespace/lease.rs create mode 100644 src/namespace/mod.rs create mode 100644 src/vec.rs create mode 100644 tests/namespace.rs diff --git a/README.md b/README.md index 7a10405..71b872b 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ It provides asynchronous client backed by [tokio](https://github.com/tokio-rs/to - [x] Cluster - [x] Lock - [x] Election +- [x] Namespace ## Usage diff --git a/examples/namespace.rs b/examples/namespace.rs new file mode 100644 index 0000000..17ab74d --- /dev/null +++ b/examples/namespace.rs @@ -0,0 +1,39 @@ +//! Namespace example + +use etcd_client::*; + +#[tokio::main] +async fn main() -> Result<(), Error> { + let client = Client::connect(["localhost:2379"], None).await?; + let mut kv_client = client.kv_client(); + let mut kv_client_prefix = KvClientPrefix::new(kv_client.clone(), "person/".into()); + + kv_client_prefix.put("Alice", "15", None).await?; + println!("put kv: {{Alice: 15}}"); + + // get prefixed kv + let resp = kv_client.get("person/Alice", None).await?; + if let Some(kv) = resp.kvs().first() { + println!( + "Get prefixed kv: {{{}: {}}}", + kv.key_str()?, + kv.value_str()? + ); + } + + // get kv + let resp = kv_client_prefix.get("Alice", None).await?; + if let Some(kv) = resp.kvs().first() { + println!("Get kv: {{{}: {}}}", kv.key_str()?, kv.value_str()?); + } + + // delete kv + let resp = kv_client_prefix + .delete("Alice", Some(DeleteOptions::new().with_prev_key())) + .await?; + if let Some(kv) = resp.prev_kvs().first() { + println!("Delete kv: {{{}: {}}}", kv.key_str()?, kv.value_str()?); + } + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 8e7c05b..b3dbdae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,11 +61,14 @@ mod auth; mod channel; mod client; mod error; +mod namespace; mod openssl_tls; mod rpc; +mod vec; pub use crate::client::{Client, ConnectOptions}; pub use crate::error::Error; +pub use crate::namespace::{KvClientPrefix, LeaseClientPrefix}; pub use crate::rpc::auth::{ AuthClient, AuthDisableResponse, AuthEnableResponse, AuthenticateResponse, Permission, PermissionType, RoleAddResponse, RoleDeleteResponse, RoleGetResponse, diff --git a/src/namespace/kv.rs b/src/namespace/kv.rs new file mode 100644 index 0000000..c99decd --- /dev/null +++ b/src/namespace/kv.rs @@ -0,0 +1,73 @@ +use crate::error::Result; +use crate::vec::VecExt; +use crate::{ + DeleteOptions, DeleteResponse, GetOptions, GetResponse, KvClient, PutOptions, PutResponse, Txn, + TxnResponse, +}; + +pub struct KvClientPrefix { + pfx: Vec, + kv: KvClient, +} + +impl KvClientPrefix { + pub fn new(kv: KvClient, pfx: Vec) -> Self { + Self { pfx, kv } + } + + #[inline] + fn prefixed_key(&self, key: impl Into>) -> Vec { + let mut key = key.into(); + key.prefix_with(&self.pfx); + key + } + + pub async fn put( + &mut self, + key: impl Into>, + value: impl Into>, + options: Option, + ) -> Result { + let key = self.prefixed_key(key); + let mut resp = self.kv.put(key, value, options).await?; + resp.strip_prev_key_prefix(&self.pfx); + Ok(resp) + } + + pub async fn get( + &mut self, + key: impl Into>, + mut options: Option, + ) -> Result { + let key = self.prefixed_key(key); + options = options.map(|mut opts| { + opts.key_range_end_mut().prefix_range_end_with(&self.pfx); + opts + }); + let mut resp = self.kv.get(key, options).await?; + resp.strip_kvs_prefix(&self.pfx); + Ok(resp) + } + + pub async fn delete( + &mut self, + key: impl Into>, + mut options: Option, + ) -> Result { + let key = self.prefixed_key(key); + options = options.map(|mut opts| { + opts.key_range_end_mut().prefix_range_end_with(&self.pfx); + opts + }); + let mut resp = self.kv.delete(key, options).await?; + resp.strip_prev_kvs_prefix(&self.pfx); + Ok(resp) + } + + pub async fn txn(&mut self, mut txn: Txn) -> Result { + txn.prefix_with(&self.pfx); + let mut resp = self.kv.txn(txn).await?; + resp.strip_key_prefix(&self.pfx); + Ok(resp) + } +} diff --git a/src/namespace/lease.rs b/src/namespace/lease.rs new file mode 100644 index 0000000..759110a --- /dev/null +++ b/src/namespace/lease.rs @@ -0,0 +1,25 @@ +use crate::error::Result; +use crate::{LeaseClient, LeaseTimeToLiveOptions, LeaseTimeToLiveResponse}; + +pub struct LeaseClientPrefix { + pfx: Vec, + lease: LeaseClient, +} + +impl LeaseClientPrefix { + /// Wrap a Lease interface to filter for only keys with a prefix + /// and remove that prefix when fetching attached keys through TimeToLive. + pub fn new(lease: LeaseClient, pfx: Vec) -> Self { + Self { pfx, lease } + } + + pub async fn time_to_live( + &mut self, + id: i64, + options: Option, + ) -> Result { + let mut resp = self.lease.time_to_live(id, options).await?; + resp.strip_keys_prefix(&self.pfx); + Ok(resp) + } +} diff --git a/src/namespace/mod.rs b/src/namespace/mod.rs new file mode 100644 index 0000000..7484099 --- /dev/null +++ b/src/namespace/mod.rs @@ -0,0 +1,5 @@ +mod kv; +mod lease; + +pub use kv::KvClientPrefix; +pub use lease::LeaseClientPrefix; diff --git a/src/rpc/kv.rs b/src/rpc/kv.rs index 1298b16..bdb21e3 100644 --- a/src/rpc/kv.rs +++ b/src/rpc/kv.rs @@ -19,6 +19,7 @@ use crate::rpc::pb::etcdserverpb::{ RequestOp as PbTxnRequestOp, TxnRequest as PbTxnRequest, TxnResponse as PbTxnResponse, }; use crate::rpc::{get_prefix, KeyRange, KeyValue, ResponseHeader}; +use crate::vec::VecExt; use http::HeaderValue; use std::sync::Arc; use tonic::{IntoRequest, Request}; @@ -240,6 +241,13 @@ impl PutResponse { pub fn take_prev_key(&mut self) -> Option { self.0.prev_kv.take().map(KeyValue::new) } + + #[inline] + pub(crate) fn strip_prev_key_prefix(&mut self, prefix: &[u8]) { + if let Some(kv) = self.0.prev_kv.as_mut() { + kv.key.strip_key_prefix(prefix); + } + } } /// Options for `Get` operation. @@ -402,6 +410,11 @@ impl GetOptions { self.req.max_create_revision = revision; self } + + #[inline] + pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec { + &mut self.key_range.range_end + } } impl From for PbRangeRequest { @@ -459,6 +472,13 @@ impl GetResponse { unsafe { std::mem::transmute(std::mem::take(&mut self.0.kvs)) } } + #[inline] + pub(crate) fn strip_kvs_prefix(&mut self, prefix: &[u8]) { + for kv in self.0.kvs.iter_mut() { + kv.key.strip_key_prefix(prefix); + } + } + /// Indicates if there are more keys to return in the requested range. #[inline] pub const fn more(&self) -> bool { @@ -535,6 +555,11 @@ impl DeleteOptions { self.req.prev_kv = true; self } + + #[inline] + pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec { + &mut self.key_range.range_end + } } impl From for PbDeleteRequest { @@ -596,6 +621,13 @@ impl DeleteResponse { pub fn take_prev_kvs(&mut self) -> Vec { unsafe { std::mem::transmute(std::mem::take(&mut self.0.prev_kvs)) } } + + #[inline] + pub(crate) fn strip_prev_kvs_prefix(&mut self, prefix: &[u8]) { + for kv in self.0.prev_kvs.iter_mut() { + kv.key.strip_key_prefix(prefix); + } + } } /// Options for `Compact` operation. @@ -872,6 +904,43 @@ impl Txn { .collect(); self } + + #[inline] + pub(crate) fn prefix_with(&mut self, prefix: &[u8]) { + self.req.prefix_with(prefix); + } +} + +impl PbTxnRequest { + fn prefix_with(&mut self, prefix: &[u8]) { + let prefix_op = |op: &mut PbTxnRequestOp| { + if let Some(request) = &mut op.request { + match request { + PbTxnOp::RequestRange(req) => { + req.key.prefix_with(prefix); + req.range_end.prefix_range_end_with(prefix); + } + PbTxnOp::RequestPut(req) => { + req.key.prefix_with(prefix); + } + PbTxnOp::RequestDeleteRange(req) => { + req.key.prefix_with(prefix); + req.range_end.prefix_range_end_with(prefix); + } + PbTxnOp::RequestTxn(req) => { + req.prefix_with(prefix); + } + } + } + }; + + self.compare.iter_mut().for_each(|cmp| { + cmp.key.prefix_with(prefix); + cmp.range_end.prefix_range_end_with(prefix); + }); + self.success.iter_mut().for_each(prefix_op); + self.failure.iter_mut().for_each(prefix_op); + } } impl From for PbTxnRequest { @@ -951,4 +1020,38 @@ impl TxnResponse { }) .collect() } + + #[inline] + pub(crate) fn strip_key_prefix(&mut self, prefix: &[u8]) { + self.0.strip_key_prefix(prefix); + } +} + +impl PbTxnResponse { + fn strip_key_prefix(&mut self, prefix: &[u8]) { + self.responses.iter_mut().for_each(|op| { + if let Some(resp) = &mut op.response { + match resp { + PbTxnOpResponse::ResponseRange(r) => { + for kv in r.kvs.iter_mut() { + kv.key.strip_key_prefix(prefix); + } + } + PbTxnOpResponse::ResponsePut(r) => { + if let Some(kv) = r.prev_kv.as_mut() { + kv.key.strip_key_prefix(prefix); + } + } + PbTxnOpResponse::ResponseDeleteRange(r) => { + for kv in r.prev_kvs.iter_mut() { + kv.key.strip_key_prefix(prefix); + } + } + PbTxnOpResponse::ResponseTxn(r) => { + r.strip_key_prefix(prefix); + } + } + } + }); + } } diff --git a/src/rpc/lease.rs b/src/rpc/lease.rs index 4c75f16..c456907 100644 --- a/src/rpc/lease.rs +++ b/src/rpc/lease.rs @@ -14,6 +14,7 @@ use crate::rpc::pb::etcdserverpb::{ LeaseTimeToLiveResponse as PbLeaseTimeToLiveResponse, }; use crate::rpc::ResponseHeader; +use crate::vec::VecExt; use crate::Error; use http::HeaderValue; use std::pin::Pin; @@ -429,6 +430,13 @@ impl LeaseTimeToLiveResponse { pub fn keys(&self) -> &[Vec] { &self.0.keys } + + #[inline] + pub(crate) fn strip_keys_prefix(&mut self, prefix: &[u8]) { + self.0.keys.iter_mut().for_each(|key| { + key.strip_key_prefix(prefix); + }); + } } /// Response for `Leases` operation. diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 92ecbe0..bacae86 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -167,6 +167,13 @@ impl From<&PbKeyValue> for &KeyValue { } } +impl From<&mut PbKeyValue> for &mut KeyValue { + #[inline] + fn from(src: &mut PbKeyValue) -> Self { + unsafe { &mut *(src as *mut _ as *mut KeyValue) } + } +} + /// Get prefix end key of `key`. #[inline] fn get_prefix(key: &[u8]) -> Vec { diff --git a/src/vec.rs b/src/vec.rs new file mode 100644 index 0000000..b3df995 --- /dev/null +++ b/src/vec.rs @@ -0,0 +1,99 @@ +pub trait VecExt { + // slice has a method named `strip_prefix` + fn strip_key_prefix(&mut self, prefix: &[u8]); + fn prefix_with(&mut self, prefix: &[u8]); + fn prefix_range_end_with(&mut self, prefix: &[u8]); +} + +impl VecExt for Vec { + fn strip_key_prefix(&mut self, prefix: &[u8]) { + if prefix.is_empty() { + return; + } + + if !self.starts_with(prefix) { + return; + } + + let pfx_len = prefix.len(); + let key_len = self.len(); + let new_len = key_len - pfx_len; + unsafe { + let ptr = self.as_mut_ptr(); + std::ptr::copy(ptr.add(pfx_len), ptr, new_len); + self.set_len(new_len); + } + } + + fn prefix_with(&mut self, prefix: &[u8]) { + if prefix.is_empty() { + return; + } + + let pfx_len = prefix.len(); + let key_len = self.len(); + self.reserve(pfx_len); + unsafe { + let ptr = self.as_mut_ptr(); + std::ptr::copy(ptr, ptr.add(pfx_len), key_len); + std::ptr::copy_nonoverlapping(prefix.as_ptr(), ptr, pfx_len); + self.set_len(key_len + pfx_len); + } + } + + fn prefix_range_end_with(&mut self, prefix: &[u8]) { + if prefix.is_empty() { + return; + } + + if self.len() == 1 && self[0] == 0 { + // the edge of the keyspace + self.clear(); + self.extend_from_slice(prefix); + let mut ok = false; + for i in (0..self.len()).rev() { + self[i] = self[i].wrapping_add(1); + if self[i] != 0 { + ok = true; + break; + } + } + if !ok { + // 0xff..ff => 0x00 + self.clear(); + self.push(0); + } + } else if !self.is_empty() { + self.prefix_with(prefix); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_prefix_with() { + fn assert_prefix_with(pfx: &[u8], key: &[u8], end: &[u8], w_key: &[u8], w_end: &[u8]) { + let mut key = key.to_vec(); + let mut end = end.to_vec(); + key.prefix_with(pfx); + end.prefix_range_end_with(pfx); + assert_eq!(key, w_key); + assert_eq!(end, w_end); + } + + // single key + assert_prefix_with(b"pfx/", b"a", b"", b"pfx/a", b""); + + // range + assert_prefix_with(b"pfx/", b"abc", b"def", b"pfx/abc", b"pfx/def"); + + // one-sided range (HACK - b'/' + 1 = b'0') + assert_prefix_with(b"pfx/", b"abc", b"\0", b"pfx/abc", b"pfx0"); + + // one-sided range, end of keyspace + assert_prefix_with(b"\xFF\xFF", b"abc", b"\0", b"\xff\xffabc", b"\0"); + } +} diff --git a/tests/namespace.rs b/tests/namespace.rs new file mode 100644 index 0000000..22b01bd --- /dev/null +++ b/tests/namespace.rs @@ -0,0 +1,41 @@ +mod testing; + +use crate::testing::{get_client, Result}; +use etcd_client::{DeleteOptions, KvClientPrefix}; + +#[tokio::test] +async fn test_namespace_kv() -> Result<()> { + let mut client = get_client().await?; + let mut c = KvClientPrefix::new(client.kv_client(), "foo/".into()); + + c.put("abc", "bar", None).await?; + + // get kv + { + let kvs = c.get("abc", None).await?.take_kvs(); + assert_eq!(kvs.len(), 1); + assert_eq!(kvs[0].key(), b"abc"); + assert_eq!(kvs[0].value(), b"bar"); + } + + // get prefixed kv + { + let kvs = client.get("foo/abc", None).await?.take_kvs(); + assert_eq!(kvs.len(), 1); + assert_eq!(kvs[0].key(), b"foo/abc"); + assert_eq!(kvs[0].value(), b"bar"); + } + + // delete kv + { + let resp = c + .delete("abc", Some(DeleteOptions::new().with_prev_key())) + .await?; + assert_eq!(resp.deleted(), 1); + let kvs = resp.prev_kvs(); + assert_eq!(kvs[0].key(), b"abc"); + assert_eq!(kvs[0].value(), b"bar"); + } + + Ok(()) +} From 3c6b2736c4d9bce64be323dcd280a6ef86119eb8 Mon Sep 17 00:00:00 2001 From: David Li Date: Mon, 1 Jan 2024 11:25:08 +0800 Subject: [PATCH 10/10] Release v0.12.4 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a835aa1..332d4d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "etcd-client" -version = "0.12.3" +version = "0.12.4" authors = ["The etcd-client Authors "] edition = "2021" readme = "README.md"