Skip to content

Commit

Permalink
feat: implement namespace APIs (#71)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
Co-authored-by: David Li <davidli2010@foxmail.com>
  • Loading branch information
tisonkun and davidli2010 authored Jan 1, 2024
1 parent 75e96f8 commit f8900b1
Show file tree
Hide file tree
Showing 11 changed files with 404 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ It provides asynchronous client backed by [tokio](https://github.com/tokio-rs/to
- [x] Cluster
- [x] Lock
- [x] Election
- [x] Namespace

## Usage

Expand Down
39 changes: 39 additions & 0 deletions examples/namespace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//! Namespace example
use etcd_client::*;

#[tokio::main]
async fn main() -> Result<(), Error> {
let client = Client::connect(["localhost:2379"], None).await?;
let mut kv_client = client.kv_client();
let mut kv_client_prefix = KvClientPrefix::new(kv_client.clone(), "person/".into());

kv_client_prefix.put("Alice", "15", None).await?;
println!("put kv: {{Alice: 15}}");

// get prefixed kv
let resp = kv_client.get("person/Alice", None).await?;
if let Some(kv) = resp.kvs().first() {
println!(
"Get prefixed kv: {{{}: {}}}",
kv.key_str()?,
kv.value_str()?
);
}

// get kv
let resp = kv_client_prefix.get("Alice", None).await?;
if let Some(kv) = resp.kvs().first() {
println!("Get kv: {{{}: {}}}", kv.key_str()?, kv.value_str()?);
}

// delete kv
let resp = kv_client_prefix
.delete("Alice", Some(DeleteOptions::new().with_prev_key()))
.await?;
if let Some(kv) = resp.prev_kvs().first() {
println!("Delete kv: {{{}: {}}}", kv.key_str()?, kv.value_str()?);
}

Ok(())
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,14 @@ mod auth;
mod channel;
mod client;
mod error;
mod namespace;
mod openssl_tls;
mod rpc;
mod vec;

pub use crate::client::{Client, ConnectOptions};
pub use crate::error::Error;
pub use crate::namespace::{KvClientPrefix, LeaseClientPrefix};
pub use crate::rpc::auth::{
AuthClient, AuthDisableResponse, AuthEnableResponse, AuthenticateResponse, Permission,
PermissionType, RoleAddResponse, RoleDeleteResponse, RoleGetResponse,
Expand Down
73 changes: 73 additions & 0 deletions src/namespace/kv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use crate::error::Result;
use crate::vec::VecExt;
use crate::{
DeleteOptions, DeleteResponse, GetOptions, GetResponse, KvClient, PutOptions, PutResponse, Txn,
TxnResponse,
};

pub struct KvClientPrefix {
pfx: Vec<u8>,
kv: KvClient,
}

impl KvClientPrefix {
pub fn new(kv: KvClient, pfx: Vec<u8>) -> Self {
Self { pfx, kv }
}

#[inline]
fn prefixed_key(&self, key: impl Into<Vec<u8>>) -> Vec<u8> {
let mut key = key.into();
key.prefix_with(&self.pfx);
key
}

pub async fn put(
&mut self,
key: impl Into<Vec<u8>>,
value: impl Into<Vec<u8>>,
options: Option<PutOptions>,
) -> Result<PutResponse> {
let key = self.prefixed_key(key);
let mut resp = self.kv.put(key, value, options).await?;
resp.strip_prev_key_prefix(&self.pfx);
Ok(resp)
}

pub async fn get(
&mut self,
key: impl Into<Vec<u8>>,
mut options: Option<GetOptions>,
) -> Result<GetResponse> {
let key = self.prefixed_key(key);
options = options.map(|mut opts| {
opts.key_range_end_mut().prefix_range_end_with(&self.pfx);
opts
});
let mut resp = self.kv.get(key, options).await?;
resp.strip_kvs_prefix(&self.pfx);
Ok(resp)
}

pub async fn delete(
&mut self,
key: impl Into<Vec<u8>>,
mut options: Option<DeleteOptions>,
) -> Result<DeleteResponse> {
let key = self.prefixed_key(key);
options = options.map(|mut opts| {
opts.key_range_end_mut().prefix_range_end_with(&self.pfx);
opts
});
let mut resp = self.kv.delete(key, options).await?;
resp.strip_prev_kvs_prefix(&self.pfx);
Ok(resp)
}

pub async fn txn(&mut self, mut txn: Txn) -> Result<TxnResponse> {
txn.prefix_with(&self.pfx);
let mut resp = self.kv.txn(txn).await?;
resp.strip_key_prefix(&self.pfx);
Ok(resp)
}
}
25 changes: 25 additions & 0 deletions src/namespace/lease.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use crate::error::Result;
use crate::{LeaseClient, LeaseTimeToLiveOptions, LeaseTimeToLiveResponse};

pub struct LeaseClientPrefix {
pfx: Vec<u8>,
lease: LeaseClient,
}

impl LeaseClientPrefix {
/// Wrap a Lease interface to filter for only keys with a prefix
/// and remove that prefix when fetching attached keys through TimeToLive.
pub fn new(lease: LeaseClient, pfx: Vec<u8>) -> Self {
Self { pfx, lease }
}

pub async fn time_to_live(
&mut self,
id: i64,
options: Option<LeaseTimeToLiveOptions>,
) -> Result<LeaseTimeToLiveResponse> {
let mut resp = self.lease.time_to_live(id, options).await?;
resp.strip_keys_prefix(&self.pfx);
Ok(resp)
}
}
5 changes: 5 additions & 0 deletions src/namespace/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod kv;
mod lease;

pub use kv::KvClientPrefix;
pub use lease::LeaseClientPrefix;
103 changes: 103 additions & 0 deletions src/rpc/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::rpc::pb::etcdserverpb::{
RequestOp as PbTxnRequestOp, TxnRequest as PbTxnRequest, TxnResponse as PbTxnResponse,
};
use crate::rpc::{get_prefix, KeyRange, KeyValue, ResponseHeader};
use crate::vec::VecExt;
use http::HeaderValue;
use std::sync::Arc;
use tonic::{IntoRequest, Request};
Expand Down Expand Up @@ -240,6 +241,13 @@ impl PutResponse {
pub fn take_prev_key(&mut self) -> Option<KeyValue> {
self.0.prev_kv.take().map(KeyValue::new)
}

#[inline]
pub(crate) fn strip_prev_key_prefix(&mut self, prefix: &[u8]) {
if let Some(kv) = self.0.prev_kv.as_mut() {
kv.key.strip_key_prefix(prefix);
}
}
}

/// Options for `Get` operation.
Expand Down Expand Up @@ -402,6 +410,11 @@ impl GetOptions {
self.req.max_create_revision = revision;
self
}

#[inline]
pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec<u8> {
&mut self.key_range.range_end
}
}

impl From<GetOptions> for PbRangeRequest {
Expand Down Expand Up @@ -459,6 +472,13 @@ impl GetResponse {
unsafe { std::mem::transmute(std::mem::take(&mut self.0.kvs)) }
}

#[inline]
pub(crate) fn strip_kvs_prefix(&mut self, prefix: &[u8]) {
for kv in self.0.kvs.iter_mut() {
kv.key.strip_key_prefix(prefix);
}
}

/// Indicates if there are more keys to return in the requested range.
#[inline]
pub const fn more(&self) -> bool {
Expand Down Expand Up @@ -535,6 +555,11 @@ impl DeleteOptions {
self.req.prev_kv = true;
self
}

#[inline]
pub(crate) fn key_range_end_mut(&mut self) -> &mut Vec<u8> {
&mut self.key_range.range_end
}
}

impl From<DeleteOptions> for PbDeleteRequest {
Expand Down Expand Up @@ -596,6 +621,13 @@ impl DeleteResponse {
pub fn take_prev_kvs(&mut self) -> Vec<KeyValue> {
unsafe { std::mem::transmute(std::mem::take(&mut self.0.prev_kvs)) }
}

#[inline]
pub(crate) fn strip_prev_kvs_prefix(&mut self, prefix: &[u8]) {
for kv in self.0.prev_kvs.iter_mut() {
kv.key.strip_key_prefix(prefix);
}
}
}

/// Options for `Compact` operation.
Expand Down Expand Up @@ -872,6 +904,43 @@ impl Txn {
.collect();
self
}

#[inline]
pub(crate) fn prefix_with(&mut self, prefix: &[u8]) {
self.req.prefix_with(prefix);
}
}

impl PbTxnRequest {
fn prefix_with(&mut self, prefix: &[u8]) {
let prefix_op = |op: &mut PbTxnRequestOp| {
if let Some(request) = &mut op.request {
match request {
PbTxnOp::RequestRange(req) => {
req.key.prefix_with(prefix);
req.range_end.prefix_range_end_with(prefix);
}
PbTxnOp::RequestPut(req) => {
req.key.prefix_with(prefix);
}
PbTxnOp::RequestDeleteRange(req) => {
req.key.prefix_with(prefix);
req.range_end.prefix_range_end_with(prefix);
}
PbTxnOp::RequestTxn(req) => {
req.prefix_with(prefix);
}
}
}
};

self.compare.iter_mut().for_each(|cmp| {
cmp.key.prefix_with(prefix);
cmp.range_end.prefix_range_end_with(prefix);
});
self.success.iter_mut().for_each(prefix_op);
self.failure.iter_mut().for_each(prefix_op);
}
}

impl From<Txn> for PbTxnRequest {
Expand Down Expand Up @@ -951,4 +1020,38 @@ impl TxnResponse {
})
.collect()
}

#[inline]
pub(crate) fn strip_key_prefix(&mut self, prefix: &[u8]) {
self.0.strip_key_prefix(prefix);
}
}

impl PbTxnResponse {
fn strip_key_prefix(&mut self, prefix: &[u8]) {
self.responses.iter_mut().for_each(|op| {
if let Some(resp) = &mut op.response {
match resp {
PbTxnOpResponse::ResponseRange(r) => {
for kv in r.kvs.iter_mut() {
kv.key.strip_key_prefix(prefix);
}
}
PbTxnOpResponse::ResponsePut(r) => {
if let Some(kv) = r.prev_kv.as_mut() {
kv.key.strip_key_prefix(prefix);
}
}
PbTxnOpResponse::ResponseDeleteRange(r) => {
for kv in r.prev_kvs.iter_mut() {
kv.key.strip_key_prefix(prefix);
}
}
PbTxnOpResponse::ResponseTxn(r) => {
r.strip_key_prefix(prefix);
}
}
}
});
}
}
8 changes: 8 additions & 0 deletions src/rpc/lease.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::rpc::pb::etcdserverpb::{
LeaseTimeToLiveResponse as PbLeaseTimeToLiveResponse,
};
use crate::rpc::ResponseHeader;
use crate::vec::VecExt;
use crate::Error;
use http::HeaderValue;
use std::pin::Pin;
Expand Down Expand Up @@ -429,6 +430,13 @@ impl LeaseTimeToLiveResponse {
pub fn keys(&self) -> &[Vec<u8>] {
&self.0.keys
}

#[inline]
pub(crate) fn strip_keys_prefix(&mut self, prefix: &[u8]) {
self.0.keys.iter_mut().for_each(|key| {
key.strip_key_prefix(prefix);
});
}
}

/// Response for `Leases` operation.
Expand Down
7 changes: 7 additions & 0 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,13 @@ impl From<&PbKeyValue> for &KeyValue {
}
}

impl From<&mut PbKeyValue> for &mut KeyValue {
#[inline]
fn from(src: &mut PbKeyValue) -> Self {
unsafe { &mut *(src as *mut _ as *mut KeyValue) }
}
}

/// Get prefix end key of `key`.
#[inline]
fn get_prefix(key: &[u8]) -> Vec<u8> {
Expand Down
Loading

0 comments on commit f8900b1

Please sign in to comment.