Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add KVPbCrudApi::crud_try_insert() and crud_try_upsert() #16484

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 136 additions & 2 deletions src/meta/api/src/kv_pb_crud_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_types::Change;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::With;
use databend_common_proto_conv::FromToProto;
use fastrace::func_name;
Expand All @@ -27,7 +30,10 @@ use log::debug;
use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_api::UpsertPB;
use crate::meta_txn_error::MetaTxnError;
use crate::send_txn;
use crate::txn_backoff::txn_backoff;
use crate::txn_cond_eq_seq;
use crate::util::txn_op_put_pb;

/// [`KVPbCrudApi`] provide generic meta-service access pattern implementations for `name -> value` mapping.
///
Expand All @@ -39,7 +45,113 @@ where
K: kvapi::Key + Clone + Send + Sync + 'static,
K::ValueType: FromToProto + Clone + Send + Sync + 'static,
{
/// Update or insert a `name -> value` mapping.
/// Attempts to insert a new key-value pair in it does not exist, without CAS loop.
///
/// See: [`KVPbCrudApi::crud_try_upsert`]
async fn crud_try_insert<E>(
&self,
key: &K,
value: K::ValueType,
ttl: Option<Duration>,
on_exist: impl FnOnce() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
self.crud_try_upsert(key, MatchSeq::Exact(0), value, ttl, on_exist)
.await
}

/// Attempts to insert or update a new key-value pair, without CAS loop.
///
/// # Arguments
/// * `key` - The identifier for the new entry.
/// * `value` - The value to be associated with the key.
/// * `ttl` - Optional time-to-live for the entry.
/// * `on_exist` - Callback function invoked if the key already exists.
///
/// # Returns
/// * `Ok(Ok(()))` if the insertion was successful.
/// * `Ok(Err(E))` if the key already exists and `on_exist` returned an error.
/// * `Err(MetaTxnError)` for transaction-related or meta-service errors.
async fn crud_try_upsert<E>(
&self,
key: &K,
match_seq: MatchSeq,
value: K::ValueType,
ttl: Option<Duration>,
on_exist: impl FnOnce() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
debug!(name_ident :? =key; "KVPbCrudApi: {}", func_name!());

let upsert = UpsertPB::insert(key.clone(), value).with(match_seq);
let upsert = if let Some(ttl) = ttl {
upsert.with_ttl(ttl)
} else {
upsert
};

let transition = self.upsert_pb(&upsert).await?;

if transition.is_changed() {
Ok(Ok(()))
} else {
Ok(on_exist())
}
}

/// Updates an existing key-value mapping with CAS loop.
///
/// # Arguments
/// * `name_ident` - The identifier of the key to update.
/// * `update` - A function that takes the current value and returns an optional tuple of
/// (new_value, ttl). If None is returned, the update is cancelled.
/// * `not_found` - A function called when the key doesn't exist. It should either return
/// an error or Ok(()) to cancel the update.
///
/// # Returns
/// * `Ok(Ok(()))` if the update was successful or cancelled.
/// * `Ok(Err(E))` if `not_found` returned an error.
/// * `Err(MetaTxnError)` for transaction-related errors.
///
/// # Note
/// This method uses optimistic locking and will retry on conflicts.
async fn crud_update_existing<E>(
&self,
name_ident: &K,
update: impl Fn(K::ValueType) -> Option<(K::ValueType, Option<Duration>)> + Send,
not_found: impl Fn() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
debug!(name_ident :? =name_ident; "KVPbCrudApi: {}", func_name!());

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let mut txn = TxnRequest::default();

let seq_meta = self.get_pb(name_ident).await?;
let seq = seq_meta.seq();

let updated = match seq_meta.into_value() {
Some(x) => update(x),
None => return Ok(not_found()),
};

let Some((updated, ttl)) = updated else {
// update is cancelled
return Ok(Ok(()));
};

txn.condition.push(txn_cond_eq_seq(name_ident, seq));
txn.if_then.push(txn_op_put_pb(name_ident, &updated, ttl)?);

let (succ, _responses) = send_txn(self, txn).await?;

if succ {
return Ok(Ok(()));
}
}
}

/// Update or insert a `name -> value` mapping, with CAS loop.
///
/// The `update` function is called with the previous value and should output the updated to write back.
/// - Ok(Some(x)): write back `x`.
Expand All @@ -49,7 +161,7 @@ where
/// This function returns an embedded result,
/// - the outer result is for underlying kvapi error,
/// - the inner result is for business logic error.
async fn upsert_with<E>(
async fn crud_upsert_with<E>(
&self,
name_ident: &K,
update: impl Fn(Option<SeqV<K::ValueType>>) -> Result<Option<K::ValueType>, E> + Send,
Expand Down Expand Up @@ -80,6 +192,28 @@ where
}
}
}

/// Remove the `name -> value` mapping by name.
///
/// `not_found` is called when the name does not exist.
/// And this function decide to:
/// - cancel update by returning `Ok(())`
/// - or return an error when the name does not exist.
async fn crud_remove<E>(
&self,
name_ident: &K,
not_found: impl Fn() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
debug!(key :? =name_ident; "NameValueApi: {}", func_name!());

let upsert = UpsertPB::delete(name_ident.clone());
let transition = self.upsert_pb(&upsert).await?;
if !transition.is_changed() {
return Ok(not_found());
}

Ok(Ok(()))
}
}

impl<K, T> KVPbCrudApi<K> for T
Expand Down
98 changes: 6 additions & 92 deletions src/meta/api/src/name_value_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,15 @@ use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::KeyCodec;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::With;
use databend_common_proto_conv::FromToProto;
use fastrace::func_name;
use log::debug;

use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_api::UpsertPB;
use crate::kv_pb_crud_api::KVPbCrudApi;
use crate::meta_txn_error::MetaTxnError;
use crate::send_txn;
use crate::txn_backoff::txn_backoff;
use crate::txn_cond_eq_seq;
use crate::util::txn_op_put_pb;

/// NameValueApi provide generic meta-service access pattern implementations for `name -> value` mapping.
///
Expand All @@ -53,25 +48,16 @@ where
/// Create a `name -> value` mapping.
async fn insert_name_value(
&self,
name_ident: TIdent<R, N>,
name_ident: &TIdent<R, N>,
value: R::ValueType,
ttl: Option<Duration>,
) -> Result<Result<(), ExistError<R, N>>, MetaTxnError> {
debug!(name_ident :? =name_ident; "NameValueApi: {}", func_name!());

let upsert = UpsertPB::insert(name_ident.clone(), value);
let upsert = if let Some(ttl) = ttl {
upsert.with_ttl(ttl)
} else {
upsert
};

let transition = self.upsert_pb(&upsert).await?;

if !transition.is_changed() {
return Ok(Err(name_ident.exist_error(func_name!())));
}
Ok(Ok(()))
self.crud_try_insert(name_ident, value, ttl, || {
Err(name_ident.exist_error(func_name!()))
})
.await
}

/// Create a `name -> value` mapping, with `CreateOption` support
Expand All @@ -96,78 +82,6 @@ where
}
Ok(Ok(()))
}

/// Update an existent `name -> value` mapping.
///
/// The `update` function is called with the previous value
/// and should output the updated to write back,
/// with an optional time-to-last value.
///
/// `not_found` is called when the name does not exist.
/// And this function decide to:
/// - cancel update by returning `Ok(())`
/// - or return an error when the name does not exist.
async fn update_existent_name_value<E>(
&self,
name_ident: &TIdent<R, N>,
update: impl Fn(R::ValueType) -> Option<(R::ValueType, Option<Duration>)> + Send,
not_found: impl Fn() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
debug!(name_ident :? =name_ident; "NameValueApi: {}", func_name!());

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let mut txn = TxnRequest::default();

let seq_meta = self.get_pb(name_ident).await?;
let seq = seq_meta.seq();

let updated = match seq_meta.into_value() {
Some(x) => update(x),
None => return Ok(not_found()),
};

let Some((updated, ttl)) = updated else {
// update is cancelled
return Ok(Ok(()));
};

txn.condition.push(txn_cond_eq_seq(name_ident, seq));
txn.if_then.push(txn_op_put_pb(name_ident, &updated, ttl)?);

let (succ, _responses) = send_txn(self, txn).await?;

if succ {
return Ok(Ok(()));
}
}
}

/// Remove the `name -> id -> value` mapping by name, along with associated records, such `id->name` reverse index.
///
/// Returns the removed `SeqV<id>` and `SeqV<value>`, if the name exists.
/// Otherwise, returns None.
///
/// `associated_records` is used to generate additional key-values to remove along with the main operation.
/// Such operations do not have any condition constraints.
/// For example, a `name -> id` mapping can have a reverse `id -> name` mapping.
async fn remove_name_value<E>(
&self,
name_ident: &TIdent<R, N>,
not_found: impl Fn() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
debug!(key :? =name_ident; "NameValueApi: {}", func_name!());

let upsert = UpsertPB::delete(name_ident.clone());
let transition = self.upsert_pb(&upsert).await?;
if !transition.is_changed() {
return Ok(not_found());
}

Ok(Ok(()))
}
}

impl<R, N, T> NameValueApi<R, N> for T
Expand Down
18 changes: 8 additions & 10 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
};

self.update_existent_name_value(
self.crud_update_existing(
&req.name_ident,
|mut meta| {
meta.virtual_columns = req.virtual_columns.clone();
Expand All @@ -1007,7 +1007,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
};

self.remove_name_value(&req.name_ident, not_found).await??;
self.crud_remove(&req.name_ident, not_found).await??;

Ok(())
}
Expand Down Expand Up @@ -2306,7 +2306,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
table_id: req.table_id,
};

self.upsert_with(&tbid, |seq_meta: Option<SeqV<TableMeta>>| {
self.crud_upsert_with(&tbid, |seq_meta: Option<SeqV<TableMeta>>| {
let Some(seq_meta) = seq_meta else {
return Err(AppError::UnknownTableId(UnknownTableId::new(
req.table_id,
Expand Down Expand Up @@ -2964,8 +2964,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

// Revision is unique. if it presents, consider it as success.
// Thus, we could just ignore create result
let _create_res = self
.insert_name_value(key, lock_meta, Some(req.ttl))
let _ = self
.crud_try_insert(&key, lock_meta, Some(req.ttl), || Ok::<(), Infallible>(()))
.await?;

Ok(CreateLockRevReply { revision })
Expand All @@ -2982,7 +2982,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let table_id = lock_key.get_table_id();
let key = lock_key.gen_key(req.revision);

self.update_existent_name_value(
self.crud_update_existing(
&key,
|mut lock_meta| {
// Set `acquire_lock = true` to initialize `acquired_on` when the
Expand Down Expand Up @@ -3014,9 +3014,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let revision = req.revision;
let key = lock_key.gen_key(revision);

self.remove_name_value(&key, || Ok::<(), ()>(()))
.await?
.unwrap();
self.crud_remove(&key, || Ok::<(), ()>(())).await?.unwrap();

Ok(())
}
Expand Down Expand Up @@ -3130,7 +3128,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
debug!(req :? =(&name_ident, &value); "SchemaApi: {}", func_name!());

let transition = self
.upsert_with::<Infallible>(name_ident, |t: Option<SeqV<LeastVisibleTime>>| {
.crud_upsert_with::<Infallible>(name_ident, |t: Option<SeqV<LeastVisibleTime>>| {
let curr = t.into_value().unwrap_or_default();
if curr.time >= value.time {
Ok(None)
Expand Down
Loading