diff --git a/src/meta/api/src/name_id_value_api.rs b/src/meta/api/src/name_id_value_api.rs index ff44c88a6f47..8b705272f0f6 100644 --- a/src/meta/api/src/name_id_value_api.rs +++ b/src/meta/api/src/name_id_value_api.rs @@ -16,6 +16,7 @@ use std::future::Future; use databend_common_meta_app::data_id::DataId; use databend_common_meta_app::id_generator::IdGenerator; +use databend_common_meta_app::primitive::Id; use databend_common_meta_app::tenant_key::ident::TIdent; use databend_common_meta_app::tenant_key::resource::TenantResource; use databend_common_meta_app::KeyWithTenant; @@ -305,3 +306,198 @@ where IdRsc::ValueType: FromToProto + Send + Sync + 'static, { } + +/// Similar to `NameIdValueApi`, but is compatibility with non-DataId ids. +#[tonic::async_trait] +pub trait NameIdValueApiCompat: KVApi +where + K: kvapi::Key>, + K: KeyWithTenant, + K: Send + Sync + 'static, + Id: FromToProto + Send + Sync + 'static, + IdTyp: kvapi::Key + Clone + Send + Sync + 'static, + IdTyp::ValueType: FromToProto + Send + Sync + 'static, +{ + /// mget by names, returns a list of `(name, id, value)` + /// + /// Returns an iterator of `(name, id, SeqV)` tuples. + /// This function mget all the ids by names, + /// then get the `id->value` mapping and finally zip these two together. + /// + /// If `name->id` or `id->value` mapping does not exist, it will be skipped. + /// Thus, the returned iterator may have less items than the input names. + // Using `async fn` does not allow using `impl Iterator` in the return type. + // Thus we use `impl Future` instead. + fn mget_id_value_compat( + &self, + names: impl IntoIterator + Send, + ) -> impl Future< + Output = Result)>, MetaError>, + > + Send { + async move { + let strm = self.get_pb_stream(names).await?; + let name_ids = strm.try_collect::>().await?; + + let name_ids = name_ids + .into_iter() + .filter_map(|(k, seq_v)| seq_v.map(|x| (k, x.data.into_inner()))) + .collect::>(); + + let id_idents = name_ids + .iter() + .map(|(_k, id)| id.clone()) + .collect::>(); + + let strm = self.get_pb_values(id_idents).await?; + let seq_metas = strm.try_collect::>().await?; + + let name_id_values = + name_ids + .into_iter() + .zip(seq_metas) + .filter_map(|((k, id), opt_seq_meta)| { + opt_seq_meta.map(|seq_meta| (k, id, seq_meta)) + }); + + Ok(name_id_values) + } + } +} + +impl NameIdValueApiCompat for T +where + T: KVApi + ?Sized, + K: kvapi::Key>, + K: KeyWithTenant, + K: Send + Sync + 'static, + Id: FromToProto + Send + Sync + 'static, + IdTyp: kvapi::Key + Clone + Send + Sync + 'static, + IdTyp::ValueType: FromToProto + Send + Sync + 'static, +{ +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use async_trait::async_trait; + use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; + use databend_common_meta_app::schema::DatabaseId; + use databend_common_meta_app::schema::DatabaseMeta; + use databend_common_meta_app::tenant::Tenant; + use databend_common_meta_kvapi::kvapi::KVApi; + use databend_common_meta_kvapi::kvapi::KVStream; + use databend_common_meta_kvapi::kvapi::UpsertKVReply; + use databend_common_meta_kvapi::kvapi::UpsertKVReq; + use databend_common_meta_types::protobuf::StreamItem; + use databend_common_meta_types::seq_value::SeqV; + use databend_common_meta_types::MetaError; + use databend_common_meta_types::TxnReply; + use databend_common_meta_types::TxnRequest; + use databend_common_proto_conv::FromToProto; + use futures::StreamExt; + use prost::Message; + + use crate::name_id_value_api::NameIdValueApiCompat; + + struct Foo { + kvs: BTreeMap, + } + + #[async_trait] + impl KVApi for Foo { + type Error = MetaError; + + async fn upsert_kv(&self, _req: UpsertKVReq) -> Result { + unimplemented!() + } + + async fn get_kv_stream( + &self, + keys: &[String], + ) -> Result, Self::Error> { + let mut res = Vec::with_capacity(keys.len()); + for key in keys.iter() { + let k = key.clone(); + let v = self.kvs.get(key).cloned(); + + let item = StreamItem::new(k, v.map(|v| v.into())); + res.push(Ok(item)); + } + + let strm = futures::stream::iter(res); + Ok(strm.boxed()) + } + + async fn list_kv(&self, _prefix: &str) -> Result, Self::Error> { + unimplemented!() + } + + async fn transaction(&self, _txn: TxnRequest) -> Result { + unimplemented!() + } + } + + /// If the backend stream returns early, the returned stream should be filled with error item at the end. + #[tokio::test] + async fn test_mget_id_values() -> anyhow::Result<()> { + let db_meta = |i: u64| DatabaseMeta { + engine: i.to_string(), + engine_options: Default::default(), + options: Default::default(), + created_on: Default::default(), + updated_on: Default::default(), + comment: "".to_string(), + drop_on: None, + }; + + let v = db_meta(1).to_pb()?.encode_to_vec(); + + let db_id = |i| DatabaseId::new(i).to_string().as_bytes().to_vec(); + + let foo = Foo { + kvs: vec![ + (s("__fd_database/tenant_foo/a"), SeqV::new(1, db_id(1))), + (s("__fd_database/tenant_foo/b"), SeqV::new(2, db_id(2))), + (s("__fd_database/tenant_foo/c"), SeqV::new(3, db_id(3))), + (s("__fd_database_by_id/1"), SeqV::new(4, v.clone())), + (s("__fd_database_by_id/2"), SeqV::new(5, v.clone())), + ] + .into_iter() + .collect(), + }; + + // MGet key value pairs + { + let tenant = Tenant::new_literal("tenant_foo"); + let it = foo + .mget_id_value_compat([ + DatabaseNameIdent::new(&tenant, "a"), + DatabaseNameIdent::new(&tenant, "b"), + DatabaseNameIdent::new(&tenant, "c"), + DatabaseNameIdent::new(&tenant, "d"), // No such key + ]) + .await?; + + let got = it.collect::>(); + assert_eq!(got, vec![ + ( + DatabaseNameIdent::new(&tenant, "a"), + DatabaseId::new(1), + SeqV::new(4, db_meta(1)) + ), + ( + DatabaseNameIdent::new(&tenant, "b"), + DatabaseId::new(2), + SeqV::new(5, db_meta(1)) + ), + ]); + } + + Ok(()) + } + + fn s(x: impl ToString) -> String { + x.to_string() + } +}