Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add mget_id_value_compat() #16425

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 196 additions & 0 deletions src/meta/api/src/name_id_value_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::future::Future;

use databend_common_meta_app::data_id::DataId;
use databend_common_meta_app::id_generator::IdGenerator;
use databend_common_meta_app::primitive::Id;
use databend_common_meta_app::tenant_key::ident::TIdent;
use databend_common_meta_app::tenant_key::resource::TenantResource;
use databend_common_meta_app::KeyWithTenant;
Expand Down Expand Up @@ -305,3 +306,198 @@ where
IdRsc::ValueType: FromToProto + Send + Sync + 'static,
{
}

/// Similar to `NameIdValueApi`, but is compatibility with non-DataId ids.
#[tonic::async_trait]
pub trait NameIdValueApiCompat<K, IdTyp>: KVApi<Error = MetaError>
where
K: kvapi::Key<ValueType = Id<IdTyp>>,
K: KeyWithTenant,
K: Send + Sync + 'static,
Id<IdTyp>: FromToProto + Send + Sync + 'static,
IdTyp: kvapi::Key + Clone + Send + Sync + 'static,
IdTyp::ValueType: FromToProto + Send + Sync + 'static,
{
/// mget by names, returns a list of `(name, id, value)`
///
/// Returns an iterator of `(name, id, SeqV<value>)` tuples.
/// This function mget all the ids by names,
/// then get the `id->value` mapping and finally zip these two together.
///
/// If `name->id` or `id->value` mapping does not exist, it will be skipped.
/// Thus, the returned iterator may have less items than the input names.
// Using `async fn` does not allow using `impl Iterator` in the return type.
// Thus we use `impl Future` instead.
fn mget_id_value_compat(
&self,
names: impl IntoIterator<Item = K> + Send,
) -> impl Future<
Output = Result<impl Iterator<Item = (K, IdTyp, SeqV<IdTyp::ValueType>)>, MetaError>,
> + Send {
async move {
let strm = self.get_pb_stream(names).await?;
let name_ids = strm.try_collect::<Vec<_>>().await?;

let name_ids = name_ids
.into_iter()
.filter_map(|(k, seq_v)| seq_v.map(|x| (k, x.data.into_inner())))
.collect::<Vec<_>>();

let id_idents = name_ids
.iter()
.map(|(_k, id)| id.clone())
.collect::<Vec<_>>();

let strm = self.get_pb_values(id_idents).await?;
let seq_metas = strm.try_collect::<Vec<_>>().await?;

let name_id_values =
name_ids
.into_iter()
.zip(seq_metas)
.filter_map(|((k, id), opt_seq_meta)| {
opt_seq_meta.map(|seq_meta| (k, id, seq_meta))
});

Ok(name_id_values)
}
}
}

impl<K, IdTyp, T> NameIdValueApiCompat<K, IdTyp> for T
where
T: KVApi<Error = MetaError> + ?Sized,
K: kvapi::Key<ValueType = Id<IdTyp>>,
K: KeyWithTenant,
K: Send + Sync + 'static,
Id<IdTyp>: FromToProto + Send + Sync + 'static,
IdTyp: kvapi::Key + Clone + Send + Sync + 'static,
IdTyp::ValueType: FromToProto + Send + Sync + 'static,
{
}

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

use async_trait::async_trait;
use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::DatabaseId;
use databend_common_meta_app::schema::DatabaseMeta;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::KVStream;
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::TxnReply;
use databend_common_meta_types::TxnRequest;
use databend_common_proto_conv::FromToProto;
use futures::StreamExt;
use prost::Message;

use crate::name_id_value_api::NameIdValueApiCompat;

struct Foo {
kvs: BTreeMap<String, SeqV>,
}

#[async_trait]
impl KVApi for Foo {
type Error = MetaError;

async fn upsert_kv(&self, _req: UpsertKVReq) -> Result<UpsertKVReply, Self::Error> {
unimplemented!()
}

async fn get_kv_stream(
&self,
keys: &[String],
) -> Result<KVStream<Self::Error>, Self::Error> {
let mut res = Vec::with_capacity(keys.len());
for key in keys.iter() {
let k = key.clone();
let v = self.kvs.get(key).cloned();

let item = StreamItem::new(k, v.map(|v| v.into()));
res.push(Ok(item));
}

let strm = futures::stream::iter(res);
Ok(strm.boxed())
}

async fn list_kv(&self, _prefix: &str) -> Result<KVStream<Self::Error>, Self::Error> {
unimplemented!()
}

async fn transaction(&self, _txn: TxnRequest) -> Result<TxnReply, Self::Error> {
unimplemented!()
}
}

/// If the backend stream returns early, the returned stream should be filled with error item at the end.
#[tokio::test]
async fn test_mget_id_values() -> anyhow::Result<()> {
let db_meta = |i: u64| DatabaseMeta {
engine: i.to_string(),
engine_options: Default::default(),
options: Default::default(),
created_on: Default::default(),
updated_on: Default::default(),
comment: "".to_string(),
drop_on: None,
};

let v = db_meta(1).to_pb()?.encode_to_vec();

let db_id = |i| DatabaseId::new(i).to_string().as_bytes().to_vec();

let foo = Foo {
kvs: vec![
(s("__fd_database/tenant_foo/a"), SeqV::new(1, db_id(1))),
(s("__fd_database/tenant_foo/b"), SeqV::new(2, db_id(2))),
(s("__fd_database/tenant_foo/c"), SeqV::new(3, db_id(3))),
(s("__fd_database_by_id/1"), SeqV::new(4, v.clone())),
(s("__fd_database_by_id/2"), SeqV::new(5, v.clone())),
]
.into_iter()
.collect(),
};

// MGet key value pairs
{
let tenant = Tenant::new_literal("tenant_foo");
let it = foo
.mget_id_value_compat([
DatabaseNameIdent::new(&tenant, "a"),
DatabaseNameIdent::new(&tenant, "b"),
DatabaseNameIdent::new(&tenant, "c"),
DatabaseNameIdent::new(&tenant, "d"), // No such key
])
.await?;

let got = it.collect::<Vec<_>>();
assert_eq!(got, vec![
(
DatabaseNameIdent::new(&tenant, "a"),
DatabaseId::new(1),
SeqV::new(4, db_meta(1))
),
(
DatabaseNameIdent::new(&tenant, "b"),
DatabaseId::new(2),
SeqV::new(5, db_meta(1))
),
]);
}

Ok(())
}

fn s(x: impl ToString) -> String {
x.to_string()
}
}
Loading