diff --git a/README.md b/README.md index 7a10405..71b872b 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ It provides asynchronous client backed by [tokio](https://github.com/tokio-rs/to - [x] Cluster - [x] Lock - [x] Election +- [x] Namespace ## Usage diff --git a/examples/namespace.rs b/examples/namespace.rs new file mode 100644 index 0000000..17ab74d --- /dev/null +++ b/examples/namespace.rs @@ -0,0 +1,39 @@ +//! Namespace example + +use etcd_client::*; + +#[tokio::main] +async fn main() -> Result<(), Error> { + let client = Client::connect(["localhost:2379"], None).await?; + let mut kv_client = client.kv_client(); + let mut kv_client_prefix = KvClientPrefix::new(kv_client.clone(), "person/".into()); + + kv_client_prefix.put("Alice", "15", None).await?; + println!("put kv: {{Alice: 15}}"); + + // get prefixed kv + let resp = kv_client.get("person/Alice", None).await?; + if let Some(kv) = resp.kvs().first() { + println!( + "Get prefixed kv: {{{}: {}}}", + kv.key_str()?, + kv.value_str()? + ); + } + + // get kv + let resp = kv_client_prefix.get("Alice", None).await?; + if let Some(kv) = resp.kvs().first() { + println!("Get kv: {{{}: {}}}", kv.key_str()?, kv.value_str()?); + } + + // delete kv + let resp = kv_client_prefix + .delete("Alice", Some(DeleteOptions::new().with_prev_key())) + .await?; + if let Some(kv) = resp.prev_kvs().first() { + println!("Delete kv: {{{}: {}}}", kv.key_str()?, kv.value_str()?); + } + + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 8e7c05b..b3dbdae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,11 +61,14 @@ mod auth; mod channel; mod client; mod error; +mod namespace; mod openssl_tls; mod rpc; +mod vec; pub use crate::client::{Client, ConnectOptions}; pub use crate::error::Error; +pub use crate::namespace::{KvClientPrefix, LeaseClientPrefix}; pub use crate::rpc::auth::{ AuthClient, AuthDisableResponse, AuthEnableResponse, AuthenticateResponse, Permission, PermissionType, RoleAddResponse, RoleDeleteResponse, RoleGetResponse, diff --git a/src/namespace/kv.rs b/src/namespace/kv.rs new file mode 100644 index 0000000..c99decd --- /dev/null +++ b/src/namespace/kv.rs @@ -0,0 +1,73 @@ +use crate::error::Result; +use crate::vec::VecExt; +use crate::{ + DeleteOptions, DeleteResponse, GetOptions, GetResponse, KvClient, PutOptions, PutResponse, Txn, + TxnResponse, +}; + +pub struct KvClientPrefix { + pfx: Vec, + kv: KvClient, +} + +impl KvClientPrefix { + pub fn new(kv: KvClient, pfx: Vec) -> Self { + Self { pfx, kv } + } + + #[inline] + fn prefixed_key(&self, key: impl Into>) -> Vec { + let mut key = key.into(); + key.prefix_with(&self.pfx); + key + } + + pub async fn put( + &mut self, + key: impl Into>, + value: impl Into>, + options: Option, + ) -> Result { + let key = self.prefixed_key(key); + let mut resp = self.kv.put(key, value, options).await?; + resp.strip_prev_key_prefix(&self.pfx); + Ok(resp) + } + + pub async fn get( + &mut self, + key: impl Into>, + mut options: Option, + ) -> Result { + let key = self.prefixed_key(key); + options = options.map(|mut opts| { + opts.key_range_end_mut().prefix_range_end_with(&self.pfx); + opts + }); + let mut resp = self.kv.get(key, options).await?; + resp.strip_kvs_prefix(&self.pfx); + Ok(resp) + } + + pub async fn delete( + &mut self, + key: impl Into>, + mut options: Option, + ) -> Result { + let key = self.prefixed_key(key); + options = options.map(|mut opts| { + opts.key_range_end_mut().prefix_range_end_with(&self.pfx); + opts + }); + let mut resp = self.kv.delete(key, options).await?; + resp.strip_prev_kvs_prefix(&self.pfx); + Ok(resp) + } + + pub async fn txn(&mut self, mut txn: Txn) -> Result { + txn.prefix_with(&self.pfx); + let mut resp = self.kv.txn(txn).await?; + resp.strip_key_prefix(&self.pfx); + Ok(resp) + } +} diff --git a/src/namespace/lease.rs b/src/namespace/lease.rs new file mode 100644 index 0000000..759110a --- /dev/null +++ b/src/namespace/lease.rs @@ -0,0 +1,25 @@ +use crate::error::Result; +use crate::{LeaseClient, LeaseTimeToLiveOptions, LeaseTimeToLiveResponse}; + +pub struct LeaseClientPrefix { + pfx: Vec, + lease: LeaseClient, +} + +impl LeaseClientPrefix { + /// Wrap a Lease interface to filter for only keys with a prefix + /// and remove that prefix when fetching attached keys through TimeToLive. + pub fn new(lease: LeaseClient, pfx: Vec) -> Self { + Self { pfx, lease } + } + + pub async fn time_to_live( + &mut self, + id: i64, + options: Option, + ) -> Result { + let mut resp = self.lease.time_to_live(id, options).await?; + resp.strip_keys_prefix(&self.pfx); + Ok(resp) + } +} diff --git a/src/namespace/mod.rs b/src/namespace/mod.rs new file mode 100644 index 0000000..7484099 --- /dev/null +++ b/src/namespace/mod.rs @@ -0,0 +1,5 @@ +mod kv; +mod lease; + +pub use kv::KvClientPrefix; +pub use lease::LeaseClientPrefix; diff --git a/src/rpc/kv.rs b/src/rpc/kv.rs index 1298b16..bdb21e3 100644 --- a/src/rpc/kv.rs +++ b/src/rpc/kv.rs @@ -19,6 +19,7 @@ use crate::rpc::pb::etcdserverpb::{ RequestOp as PbTxnRequestOp, TxnRequest as PbTxnRequest, TxnResponse as PbTxnResponse, }; use crate::rpc::{get_prefix, KeyRange, KeyValue, ResponseHeader}; +use crate::vec::VecExt; use http::HeaderValue; use std::sync::Arc; use tonic::{IntoRequest, Request}; @@ -240,6 +241,13 @@ impl PutResponse { pub fn take_prev_key(&mut self) -> Option { self.0.prev_kv.take().map(KeyValue::new) } + + #[inline] + pub(crate) fn strip_prev_key_prefix(&mut self, prefix: &[u8]) { + if let Some(kv) = self.0.prev_kv.as_mut() { + kv.key.strip_key_prefix(prefix); + } + } } /// Options for `Get` operation. @@ -402,6 +410,11 @@ impl GetOptions { self.req.max_create_revision = revision; self } + + #[inline] + pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec { + &mut self.key_range.range_end + } } impl From for PbRangeRequest { @@ -459,6 +472,13 @@ impl GetResponse { unsafe { std::mem::transmute(std::mem::take(&mut self.0.kvs)) } } + #[inline] + pub(crate) fn strip_kvs_prefix(&mut self, prefix: &[u8]) { + for kv in self.0.kvs.iter_mut() { + kv.key.strip_key_prefix(prefix); + } + } + /// Indicates if there are more keys to return in the requested range. #[inline] pub const fn more(&self) -> bool { @@ -535,6 +555,11 @@ impl DeleteOptions { self.req.prev_kv = true; self } + + #[inline] + pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec { + &mut self.key_range.range_end + } } impl From for PbDeleteRequest { @@ -596,6 +621,13 @@ impl DeleteResponse { pub fn take_prev_kvs(&mut self) -> Vec { unsafe { std::mem::transmute(std::mem::take(&mut self.0.prev_kvs)) } } + + #[inline] + pub(crate) fn strip_prev_kvs_prefix(&mut self, prefix: &[u8]) { + for kv in self.0.prev_kvs.iter_mut() { + kv.key.strip_key_prefix(prefix); + } + } } /// Options for `Compact` operation. @@ -872,6 +904,43 @@ impl Txn { .collect(); self } + + #[inline] + pub(crate) fn prefix_with(&mut self, prefix: &[u8]) { + self.req.prefix_with(prefix); + } +} + +impl PbTxnRequest { + fn prefix_with(&mut self, prefix: &[u8]) { + let prefix_op = |op: &mut PbTxnRequestOp| { + if let Some(request) = &mut op.request { + match request { + PbTxnOp::RequestRange(req) => { + req.key.prefix_with(prefix); + req.range_end.prefix_range_end_with(prefix); + } + PbTxnOp::RequestPut(req) => { + req.key.prefix_with(prefix); + } + PbTxnOp::RequestDeleteRange(req) => { + req.key.prefix_with(prefix); + req.range_end.prefix_range_end_with(prefix); + } + PbTxnOp::RequestTxn(req) => { + req.prefix_with(prefix); + } + } + } + }; + + self.compare.iter_mut().for_each(|cmp| { + cmp.key.prefix_with(prefix); + cmp.range_end.prefix_range_end_with(prefix); + }); + self.success.iter_mut().for_each(prefix_op); + self.failure.iter_mut().for_each(prefix_op); + } } impl From for PbTxnRequest { @@ -951,4 +1020,38 @@ impl TxnResponse { }) .collect() } + + #[inline] + pub(crate) fn strip_key_prefix(&mut self, prefix: &[u8]) { + self.0.strip_key_prefix(prefix); + } +} + +impl PbTxnResponse { + fn strip_key_prefix(&mut self, prefix: &[u8]) { + self.responses.iter_mut().for_each(|op| { + if let Some(resp) = &mut op.response { + match resp { + PbTxnOpResponse::ResponseRange(r) => { + for kv in r.kvs.iter_mut() { + kv.key.strip_key_prefix(prefix); + } + } + PbTxnOpResponse::ResponsePut(r) => { + if let Some(kv) = r.prev_kv.as_mut() { + kv.key.strip_key_prefix(prefix); + } + } + PbTxnOpResponse::ResponseDeleteRange(r) => { + for kv in r.prev_kvs.iter_mut() { + kv.key.strip_key_prefix(prefix); + } + } + PbTxnOpResponse::ResponseTxn(r) => { + r.strip_key_prefix(prefix); + } + } + } + }); + } } diff --git a/src/rpc/lease.rs b/src/rpc/lease.rs index 4c75f16..c456907 100644 --- a/src/rpc/lease.rs +++ b/src/rpc/lease.rs @@ -14,6 +14,7 @@ use crate::rpc::pb::etcdserverpb::{ LeaseTimeToLiveResponse as PbLeaseTimeToLiveResponse, }; use crate::rpc::ResponseHeader; +use crate::vec::VecExt; use crate::Error; use http::HeaderValue; use std::pin::Pin; @@ -429,6 +430,13 @@ impl LeaseTimeToLiveResponse { pub fn keys(&self) -> &[Vec] { &self.0.keys } + + #[inline] + pub(crate) fn strip_keys_prefix(&mut self, prefix: &[u8]) { + self.0.keys.iter_mut().for_each(|key| { + key.strip_key_prefix(prefix); + }); + } } /// Response for `Leases` operation. diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 92ecbe0..bacae86 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -167,6 +167,13 @@ impl From<&PbKeyValue> for &KeyValue { } } +impl From<&mut PbKeyValue> for &mut KeyValue { + #[inline] + fn from(src: &mut PbKeyValue) -> Self { + unsafe { &mut *(src as *mut _ as *mut KeyValue) } + } +} + /// Get prefix end key of `key`. #[inline] fn get_prefix(key: &[u8]) -> Vec { diff --git a/src/vec.rs b/src/vec.rs new file mode 100644 index 0000000..b3df995 --- /dev/null +++ b/src/vec.rs @@ -0,0 +1,99 @@ +pub trait VecExt { + // slice has a method named `strip_prefix` + fn strip_key_prefix(&mut self, prefix: &[u8]); + fn prefix_with(&mut self, prefix: &[u8]); + fn prefix_range_end_with(&mut self, prefix: &[u8]); +} + +impl VecExt for Vec { + fn strip_key_prefix(&mut self, prefix: &[u8]) { + if prefix.is_empty() { + return; + } + + if !self.starts_with(prefix) { + return; + } + + let pfx_len = prefix.len(); + let key_len = self.len(); + let new_len = key_len - pfx_len; + unsafe { + let ptr = self.as_mut_ptr(); + std::ptr::copy(ptr.add(pfx_len), ptr, new_len); + self.set_len(new_len); + } + } + + fn prefix_with(&mut self, prefix: &[u8]) { + if prefix.is_empty() { + return; + } + + let pfx_len = prefix.len(); + let key_len = self.len(); + self.reserve(pfx_len); + unsafe { + let ptr = self.as_mut_ptr(); + std::ptr::copy(ptr, ptr.add(pfx_len), key_len); + std::ptr::copy_nonoverlapping(prefix.as_ptr(), ptr, pfx_len); + self.set_len(key_len + pfx_len); + } + } + + fn prefix_range_end_with(&mut self, prefix: &[u8]) { + if prefix.is_empty() { + return; + } + + if self.len() == 1 && self[0] == 0 { + // the edge of the keyspace + self.clear(); + self.extend_from_slice(prefix); + let mut ok = false; + for i in (0..self.len()).rev() { + self[i] = self[i].wrapping_add(1); + if self[i] != 0 { + ok = true; + break; + } + } + if !ok { + // 0xff..ff => 0x00 + self.clear(); + self.push(0); + } + } else if !self.is_empty() { + self.prefix_with(prefix); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_prefix_with() { + fn assert_prefix_with(pfx: &[u8], key: &[u8], end: &[u8], w_key: &[u8], w_end: &[u8]) { + let mut key = key.to_vec(); + let mut end = end.to_vec(); + key.prefix_with(pfx); + end.prefix_range_end_with(pfx); + assert_eq!(key, w_key); + assert_eq!(end, w_end); + } + + // single key + assert_prefix_with(b"pfx/", b"a", b"", b"pfx/a", b""); + + // range + assert_prefix_with(b"pfx/", b"abc", b"def", b"pfx/abc", b"pfx/def"); + + // one-sided range (HACK - b'/' + 1 = b'0') + assert_prefix_with(b"pfx/", b"abc", b"\0", b"pfx/abc", b"pfx0"); + + // one-sided range, end of keyspace + assert_prefix_with(b"\xFF\xFF", b"abc", b"\0", b"\xff\xffabc", b"\0"); + } +} diff --git a/tests/namespace.rs b/tests/namespace.rs new file mode 100644 index 0000000..22b01bd --- /dev/null +++ b/tests/namespace.rs @@ -0,0 +1,41 @@ +mod testing; + +use crate::testing::{get_client, Result}; +use etcd_client::{DeleteOptions, KvClientPrefix}; + +#[tokio::test] +async fn test_namespace_kv() -> Result<()> { + let mut client = get_client().await?; + let mut c = KvClientPrefix::new(client.kv_client(), "foo/".into()); + + c.put("abc", "bar", None).await?; + + // get kv + { + let kvs = c.get("abc", None).await?.take_kvs(); + assert_eq!(kvs.len(), 1); + assert_eq!(kvs[0].key(), b"abc"); + assert_eq!(kvs[0].value(), b"bar"); + } + + // get prefixed kv + { + let kvs = client.get("foo/abc", None).await?.take_kvs(); + assert_eq!(kvs.len(), 1); + assert_eq!(kvs[0].key(), b"foo/abc"); + assert_eq!(kvs[0].value(), b"bar"); + } + + // delete kv + { + let resp = c + .delete("abc", Some(DeleteOptions::new().with_prev_key())) + .await?; + assert_eq!(resp.deleted(), 1); + let kvs = resp.prev_kvs(); + assert_eq!(kvs[0].key(), b"abc"); + assert_eq!(kvs[0].value(), b"bar"); + } + + Ok(()) +}