diff --git a/.github/workflows/redbit.yml b/.github/workflows/redbit.yml index 671486b..94ec43a 100644 --- a/.github/workflows/redbit.yml +++ b/.github/workflows/redbit.yml @@ -4,6 +4,12 @@ on: push: branches: - master + pull_request: + branches: + - master + types: + - opened + - synchronize workflow_dispatch: permissions: @@ -168,12 +174,14 @@ jobs: ' README.md > README.md.tmp && mv README.md.tmp README.md - name: Commit benchmark results to README + if: github.event_name == 'push' && github.ref == 'refs/heads/master' run: | git config --global user.name "github-actions[bot]" git config --global user.email "github-actions@users.noreply.github.com" git diff --quiet || (git add README.md && git commit -m "Auto-update README with example code" && git push) deploy: + if: github.event_name == 'push' && github.ref == 'refs/heads/master' needs: bench runs-on: ubuntu-latest permissions: diff --git a/chain/README.md b/chain/README.md index fa9dd3c..fc9cd83 100644 --- a/chain/README.md +++ b/chain/README.md @@ -1,12 +1,14 @@ ## Chain -Chain keeps you in sync with arbitrary blockchain if you implement the [api](src/api.rs). +Chain is a read-optimized UTXO analytics engine as it builds utxo indexes with materialized prev-outs. +Utxo state is built on the fly during indexing, addresses and scripts are stored as a dictionary for deduplication purposes. +At the same time it is extremely space efficient as it uses hierarchical pointers to reference hashes. Chain tip is "eventually consistent" with the settlement layer through eager fork competition such that superseded forks are immediately deleted from DB and replaced with more valuable fork when it appears. Ie. only one winning fork is kept in the DB at given moment. This allows for much better performance and space efficiency. -Utxo state is built on the fly during indexing, addresses are stored as a dictionary for deduplication purposes. +It keeps you in sync with arbitrary blockchain if you implement the [api](src/api.rs). ### Perf diff --git a/macros/src/column/get_by.rs b/macros/src/column/get_by.rs index 7def0a0..8a1d28c 100644 --- a/macros/src/column/get_by.rs +++ b/macros/src/column/get_by.rs @@ -16,25 +16,8 @@ pub fn get_by_dict_def( let read_tx_context_ty = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_tx_context_ty, val: &#column_type) -> Result, AppError> { - let iter_opt = tx_context.#dict_table_var.get_keys(val)?; - match iter_opt { - None => Ok(Vec::new()), - Some(mut iter) => { - let mut results = Vec::new(); - while let Some(x) = iter.next() { - let pk = x?.value(); - match Self::compose(&tx_context, pk) { - Ok(item) => { - results.push(item); - } - Err(err) => { - return Err(AppError::Internal(err.into())); - } - } - } - Ok(results) - } - } + let iter = tx_context.#dict_table_var.get_keys(val)?.into_iter().flatten().map(|res| res.map(|kg| kg.value())); + Self::compose_many(&tx_context, iter, None) } }; @@ -78,20 +61,8 @@ pub fn get_by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type let read_ctx_type = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_ctx_type, val: &#column_type) -> Result, AppError> { - let mut iter = tx_context.#index_table_var.get_keys(val)?; - let mut results = Vec::new(); - while let Some(x) = iter.next() { - let pk = x?.value(); - match Self::compose(&tx_context, pk) { - Ok(item) => { - results.push(item); - } - Err(err) => { - return Err(AppError::Internal(err.into())); - } - } - } - Ok(results) + let iter = tx_context.#index_table_var.get_keys(val)?.map(|res| res.map(|kg| kg.value())); + Self::compose_many(&tx_context, iter, None) } }; diff --git a/macros/src/column/get_keys_by.rs b/macros/src/column/get_keys_by.rs index 03e5cfd..4d24cdb 100644 --- a/macros/src/column/get_keys_by.rs +++ b/macros/src/column/get_keys_by.rs @@ -13,20 +13,7 @@ pub fn by_dict_def(entity_def: &EntityDef, column_name: &Ident, column_type: &Ty let read_ctx_ty = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_ctx_ty, val: &#column_type) -> Result, AppError> { - let iter_opt = tx_context.#dict_table_var.get_keys(val)?; - let results = - match iter_opt { - None => Vec::new(), - Some(mut iter) => { - let mut results = Vec::new(); - while let Some(x) = iter.next() { - let pk = x?.value(); - results.push(pk); - } - results - }, - }; - Ok(results) + tx_context.#dict_table_var.get_keys(val)?.map_or(Ok(Vec::new()), redbit::collect_multimap_value) } }; @@ -71,17 +58,8 @@ pub fn by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type: &T let pk_type = &key_def.tpe; let fn_name = format_ident!("get_{}s_by_{}", pk_name, column_name); let fn_stream = quote! { - pub fn #fn_name( - tx_context: &#read_ctx_type, - val: &#column_type - ) -> Result, AppError> { - let mut iter = tx_context.#index_table.get_keys(val)?; - let mut results = Vec::new(); - while let Some(x) = iter.next() { - let pk = x?.value(); - results.push(pk); - } - Ok(results) + pub fn #fn_name(tx_context: &#read_ctx_type, val: &#column_type) -> Result, AppError> { + redbit::collect_multimap_value(tx_context.#index_table.get_keys(val)?) } }; diff --git a/macros/src/column/range_by.rs b/macros/src/column/range_by.rs index 3b80008..d7b6c86 100644 --- a/macros/src/column/range_by.rs +++ b/macros/src/column/range_by.rs @@ -11,23 +11,12 @@ pub fn by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type: &T let read_ctx_type = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_ctx_type, from: &#column_type, until: &#column_type) -> Result, AppError> { - let range_iter = tx_context.#index_table.range_keys::<#column_type>(from..until)?; - let mut results = Vec::new(); - for entry_res in range_iter { - let (_, mut multi_iter) = entry_res?; - while let Some(x) = multi_iter.next() { - let pk = x?.value(); - match Self::compose(&tx_context, pk) { - Ok(item) => { - results.push(item); - } - Err(err) => { - return Err(AppError::Internal(err.into())); - } - } - } - } - Ok(results) + let iter = tx_context.#index_table.range_keys::<#column_type>(from..until)? + .flat_map(|r| match r { + Ok((_k, value_iter)) => Either::Left(value_iter.map(|res| res.map(|kg| kg.value()))), + Err(e) => Either::Right(std::iter::once(Err(e))), + }); + Self::compose_many(&tx_context, iter, None) } }; diff --git a/macros/src/column/stream_by.rs b/macros/src/column/stream_by.rs index dd9a1cd..b3728ad 100644 --- a/macros/src/column/stream_by.rs +++ b/macros/src/column/stream_by.rs @@ -16,42 +16,12 @@ pub fn by_dict_def( let EntityDef { key_def, entity_name, entity_type, query_type, read_ctx_type, ..} = &entity_def; let pk_type = key_def.field_def().tpe; let fn_stream = quote! { - pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result> + Send>>, AppError> { - let multi_value = tx_context.#dict_table_var.get_keys(val)?; - let iter_box: Box> + Send> = - if let Some(v) = multi_value { - Box::new(v) - } else { - Box::new(std::iter::empty()) - }; - - let stream = futures::stream::unfold( - (iter_box, tx_context, query), - |(mut iter, tx_context, query)| async move { - match iter.next() { - Some(Ok(guard)) => { - let pk = guard.value().clone(); - if let Some(ref q) = query { - match Self::compose_with_filter(&tx_context, pk, q) { - Ok(Some(entity)) => Some((Ok(entity), (iter, tx_context, query))), - Ok(None) => None, - Err(e) => Some((Err(e), (iter, tx_context, query))), - } - } else { - Some((Self::compose(&tx_context, pk), (iter, tx_context, query))) - } - } - Some(Err(e)) => Some((Err(AppError::from(e)), (iter, tx_context, query))), - None => None, - } - }, - ).boxed(); - - Ok(stream) + pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result> + Send, AppError> { + let iter = tx_context.#dict_table_var.get_keys(val)?.into_iter().flatten().map(|res| res.map(|kg| kg.value())); + Self::compose_many_stream(tx_context, iter, query) } }; - let test_with_filter_fn_name = format_ident!("{}_with_filter", fn_name); let test_stream = Some(quote! { #[tokio::test] @@ -145,30 +115,9 @@ pub fn by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type: &T let EntityDef { key_def, entity_name, entity_type, query_type, read_ctx_type, ..} = &entity_def; let pk_type = key_def.field_def().tpe; let fn_stream = quote! { - pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result> + Send>>, AppError> { - let iter = tx_context.#index_table.get_keys(val)?; - - let stream = futures::stream::unfold((iter, tx_context, query), |(mut iter, tx_context, query)| async move { - match iter.next() { - Some(Ok(guard)) => { - let pk = guard.value(); - if let Some(ref stream_query) = query { - match Self::compose_with_filter(&tx_context, pk, stream_query) { - Ok(Some(entity)) => Some((Ok(entity), (iter, tx_context, query))), - Ok(None) => None, - Err(e) => Some((Err(e), (iter, tx_context, query))), - } - } else { - Some((Self::compose(&tx_context, pk), (iter, tx_context, query))) - } - } - Some(Err(e)) => Some((Err(AppError::from(e)), (iter, tx_context, query))), - None => None, - } - }) - .boxed(); - - Ok(stream) + pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result> + Send, AppError> { + let iter = tx_context.#index_table.get_keys(val)?.map(|res| res.map(|kg| kg.value())); + Self::compose_many_stream(tx_context, iter, query) } }; diff --git a/macros/src/column/stream_keys_by.rs b/macros/src/column/stream_keys_by.rs index 8828ad5..29198df 100644 --- a/macros/src/column/stream_keys_by.rs +++ b/macros/src/column/stream_keys_by.rs @@ -16,22 +16,11 @@ pub fn by_dict_def(entity_def: &EntityDef, column_name: &Ident, column_type: &Ty let read_ctx_type = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type) -> Result> + Send, AppError> { - let multi_value = tx_context.#dict_table_var.get_keys(val)?; - let iter_box: Box> + Send> = - if let Some(v) = multi_value { - Box::new(v) - } else { - Box::new(std::iter::empty()) - }; - - let stream = stream::iter(iter_box) - .map(|res| res.map(|e| e.value().clone()).map_err(AppError::from)); - - Ok(stream) + let iter = tx_context.#dict_table_var.get_keys(val)?.into_iter().flatten().map(|res| res.map(|g| g.value().clone()).map_err(AppError::from)); + Ok(stream::iter(iter)) } }; - let test_stream = Some(quote! { #[tokio::test] async fn #fn_name() { @@ -110,10 +99,7 @@ pub fn by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type: &T let fn_name = format_ident!("stream_{}s_by_{}", pk_name, column_name); let fn_stream = quote! { pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type) -> Result> + Send, AppError> { - let iter = tx_context.#index_table.get_keys(&val)?; - let iter_box: Box> + Send> = Box::new(iter); - let stream = stream::iter(iter_box).map(|res| res.map(|e| e.value().clone()).map_err(AppError::from)); - Ok(stream) + Ok(stream::iter(tx_context.#index_table.get_keys(&val)?).map(|res| res.map(|e| e.value().clone()).map_err(AppError::from))) } }; diff --git a/macros/src/column/stream_parents_by.rs b/macros/src/column/stream_parents_by.rs index 09bf54d..bc58a36 100644 --- a/macros/src/column/stream_parents_by.rs +++ b/macros/src/column/stream_parents_by.rs @@ -23,45 +23,21 @@ pub fn by_dict_def( let fn_name = format_ident!("stream_{}s_by_{}", parent_ident.to_string().to_lowercase(), column_name); let fn_stream = quote! { - pub fn #fn_name(parent_tx_context: #parent_tx_context_type, val: #column_type, query: Option<#stream_parent_query_type>) -> Result> + Send>>, AppError> { - let multi_value_opt = parent_tx_context.#parent_one2many_field_name.#dict_table_var.get_keys(val)?; - let mut unique_parent_pointers = - match multi_value_opt { - None => Vec::new(), - Some(multi_value) => { - let mut pointers = Vec::new(); - for pk_guard in multi_value { - let pk = pk_guard?.value(); - pointers.push(pk.parent); - } - pointers - } - }; - unique_parent_pointers.dedup(); - let stream = futures::stream::unfold( - (unique_parent_pointers.into_iter(), parent_tx_context, query), - |(mut iter, parent_tx_context, query)| async move { - match iter.next() { - Some(parent_pointer) => { - if let Some(ref stream_query) = query { - match #parent_type::compose_with_filter(&parent_tx_context, parent_pointer, stream_query) { - Ok(Some(entity)) => Some((Ok(entity), (iter, parent_tx_context, query))), - Ok(None) => None, - Err(e) => Some((Err(e), (iter, parent_tx_context, query))), - } - } else { - Some((#parent_type::compose(&parent_tx_context, parent_pointer), (iter, parent_tx_context, query))) - } - } - None => None, - } - }, - ).boxed(); - Ok(stream) + pub fn #fn_name(parent_tx_context: #parent_tx_context_type, val: #column_type, query: Option<#stream_parent_query_type>) -> Result> + Send, AppError> { + let unique_parent_pk_iter = parent_tx_context.#parent_one2many_field_name.#dict_table_var.get_keys(val)? + .into_iter() + .flatten() + .map(|r| r.map(|g| g.value().parent)) + .scan(HashSet::new(), |seen, r| { + Some(match r { + Ok(parent) => if seen.insert(parent) { Some(Ok(parent)) } else { None }, + Err(e) => Some(Err(e)), + }) + }).flatten(); + #parent_type::compose_many_stream(parent_tx_context, unique_parent_pk_iter, query) } }; - let test_with_filter_fn_name = format_ident!("{}_with_filter", fn_name); let test_stream = Some(quote! { #[tokio::test] diff --git a/macros/src/column/stream_range_by.rs b/macros/src/column/stream_range_by.rs index 3e8d6c3..f37b37d 100644 --- a/macros/src/column/stream_range_by.rs +++ b/macros/src/column/stream_range_by.rs @@ -16,36 +16,14 @@ pub fn stream_range_by_index_def(entity_def: &EntityDef, column_name: &Ident, co from: #column_type, until: #column_type, query: Option<#query_type>, - ) -> Result> + Send>>, AppError> { - let outer_iter = tx_context.#index_table.range_keys::<#column_type>(from..until)?; - let outer_stream = futures::stream::iter(outer_iter).map_err(AppError::from); - let pk_stream = outer_stream.map_ok(|(_key, value_iter)| { - futures::stream::iter(value_iter).map(|res| { - res.map_err(AppError::from).map(|guard| guard.value().clone()) - }) - }); - Ok( - pk_stream - .try_flatten() - .map(move |pk_res| { - match pk_res { - Ok(pk) => { - if let Some(ref stream_query) = query { - match Self::compose_with_filter(&tx_context, pk, stream_query) { - Ok(Some(entity)) => Some(Ok(entity)), - Ok(None) => None, - Err(e) => Some(Err(e)), - } - } else { - Some(Self::compose(&tx_context, pk)) // <- already Result - } - } - Err(e) => Some(Err(e)), - } - }) - .filter_map(std::future::ready) // remove None - .boxed() - ) + ) -> Result> + Send, AppError> { + let pk_iter = + tx_context.#index_table.range_keys::<#column_type>(from..until)? + .flat_map(|r| match r { + Ok((_k, value_iter)) => Either::Left(value_iter.map(|res| res.map(|kg| kg.value()))), + Err(e) => Either::Right(std::iter::once(Err(e))), + }); + Self::compose_many_stream(tx_context, pk_iter, query) } }; diff --git a/macros/src/entity/compose.rs b/macros/src/entity/compose.rs index ab60126..5ede1c2 100644 --- a/macros/src/entity/compose.rs +++ b/macros/src/entity/compose.rs @@ -71,5 +71,77 @@ pub fn compose_with_filter_token_stream(entity_def: &EntityDef, field_names: &[I }), bench_stream: None, } +} + +pub fn compose_many_token_stream(entity_def: &EntityDef) -> FunctionDef { + let pk_type: &Type = &entity_def.key_def.field_def().tpe; + let EntityDef { entity_type, query_type, read_ctx_type, ..} = &entity_def; + FunctionDef { + fn_stream: quote! { + fn compose_many>>( + tx_context: &#read_ctx_type, + pk_values: I, + stream_query: Option<#query_type> + ) -> Result, AppError> { + let mut results = Vec::with_capacity(pk_values.size_hint().0); + match stream_query { + Some(ref q) => { + for pk in pk_values { + if let Ok(Some(item)) = Self::compose_with_filter(&tx_context, pk?, q) { + results.push(item); + } + } + } + None => { + for pk in pk_values { + results.push(Self::compose(&tx_context, pk?)?); + } + } + } + Ok(results) + } + }, + endpoint: None, + test_stream: None, + bench_stream: None, + } +} +pub fn compose_many_stream_token_stream(entity_def: &EntityDef) -> FunctionDef { + let pk_type: &Type = &entity_def.key_def.field_def().tpe; + let EntityDef { entity_type, query_type, read_ctx_type, ..} = &entity_def; + FunctionDef { + fn_stream: quote! { + pub fn compose_many_stream> + Send>( + tx_context: #read_ctx_type, + pk_values: I, + stream_query: Option<#query_type>, + ) -> Result> + Send, AppError> { + let iter = pk_values.filter_map(move |item_res| { + match item_res { + Err(e) => Some(Err(AppError::from(e))), + Ok(pk) => { + if let Some(ref q) = stream_query { + match Self::compose_with_filter(&tx_context, pk, q) { + Ok(Some(item)) => Some(Ok(item)), + Ok(None) => None, // skip + Err(err) => Some(Err(AppError::Internal(err.into()))), + } + } else { + match Self::compose(&tx_context, pk) { + Ok(item) => Some(Ok(item)), + Err(err) => Some(Err(AppError::Internal(err.into()))), + } + } + } + } + }); + Ok(stream::iter(iter)) + } + }, + endpoint: None, + test_stream: None, + bench_stream: None, + } } + diff --git a/macros/src/entity/mod.rs b/macros/src/entity/mod.rs index 1ce1277..7b71963 100644 --- a/macros/src/entity/mod.rs +++ b/macros/src/entity/mod.rs @@ -76,6 +76,8 @@ pub fn new(item_struct: &ItemStruct) -> Result<(KeyDef, Vec, TokenStre delete::delete_many_def(&entity_def, &delete_many_statements), compose::compose_token_stream(&entity_def, &field_names, &struct_inits), compose::compose_with_filter_token_stream(&entity_def, &field_names, &struct_inits_with_query), + compose::compose_many_token_stream(&entity_def), + compose::compose_many_stream_token_stream(&entity_def), ]; function_defs.extend(sample::sample_token_fns(&entity_def, &struct_default_inits, &struct_default_inits_with_query, &field_names)); function_defs.extend(column_function_defs.clone()); diff --git a/macros/src/pk/range.rs b/macros/src/pk/range.rs index e1d4bde..4a30d48 100644 --- a/macros/src/pk/range.rs +++ b/macros/src/pk/range.rs @@ -12,23 +12,8 @@ pub fn fn_def(entity_def: &EntityDef, table: &Ident, no_columns: bool) -> Functi quote! { pub fn #fn_name(tx_context: &#read_ctx_type, from: #pk_type, until: #pk_type, query: Option<#query_type>) -> Result, AppError> { let range = from..until; - let mut iter = tx_context.#table.underlying.range::<#pk_type>(range)?; - let mut results = Vec::new(); - if let Some(ref q) = query { - while let Some(entry_res) = iter.next() { - let pk = entry_res?.0.value(); - match Self::compose_with_filter(&tx_context, pk, q)? { - Some(entity) => results.push(entity), - None => {}, - } - } - } else { - while let Some(entry_res) = iter.next() { - let pk = entry_res?.0.value(); - results.push(Self::compose(&tx_context, pk)?); - } - } - Ok(results) + let iter = tx_context.#table.underlying.range::<#pk_type>(range)?.map(|res| res.map(|(kg, _)| kg.value())); + Self::compose_many(&tx_context, iter, query) } }; diff --git a/macros/src/pk/stream_range.rs b/macros/src/pk/stream_range.rs index f25e334..f0db763 100644 --- a/macros/src/pk/stream_range.rs +++ b/macros/src/pk/stream_range.rs @@ -14,31 +14,10 @@ pub fn fn_def(entity_def: &EntityDef, table: &Ident, range_query_ty: &Type, no_c let fn_name = format_ident!("stream_range"); let fn_stream = quote! { - pub fn #fn_name(tx_context: #read_ctx_type, from: #pk_type, until: #pk_type, query: Option<#query_type>) -> Result> + Send>>, AppError> { + pub fn #fn_name(tx_context: #read_ctx_type, from: #pk_type, until: #pk_type, query: Option<#query_type>) -> Result> + Send, AppError> { let range = from..until; - let iter_box = Box::new(tx_context.#table.underlying.range::<#pk_type>(range)?); - let stream = futures::stream::unfold( - (iter_box, tx_context, query), - |(mut iter, tx_context, query)| async move { - match iter.next() { - Some(Ok((key, _val))) => { - let pk = key.value(); - if let Some(ref stream_query) = query { - match Self::compose_with_filter(&tx_context, pk, stream_query) { - Ok(Some(entity)) => Some((Ok(entity), (iter, tx_context, query))), - Ok(None) => None, - Err(e) => Some((Err(e), (iter, tx_context, query))), - } - } else { - Some((Self::compose(&tx_context, pk), (iter, tx_context, query))) - } - } - Some(Err(e)) => Some((Err(AppError::from(e)), (iter, tx_context, query))), - None => None, - } - }, - ).boxed(); - Ok(stream) + let iter = tx_context.#table.underlying.range::<#pk_type>(range)?.map(|res| res.map(|(kg, _)| kg.value())); + Self::compose_many_stream(tx_context, iter, query) } }; diff --git a/macros/src/pk/tail.rs b/macros/src/pk/tail.rs index 2ee1eab..24da20b 100644 --- a/macros/src/pk/tail.rs +++ b/macros/src/pk/tail.rs @@ -13,28 +13,24 @@ pub fn fn_def(entity_def: &EntityDef, table: &Ident) -> FunctionDef { let read_ctx_type = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_ctx_type, n: usize) -> Result, AppError> { + if n == 0 { + return Ok(Vec::new()); + } + if n > 100 { + return Err(AppError::Internal("Cannot take more than 100 entities at once".into())); + } let Some((key_guard, _)) = tx_context.#table.underlying.last()? else { return Ok(Vec::new()); }; - let key = key_guard.value(); - let until = key.next_index(); + let until = key_guard.value().next_index(); let from = until.rollback_or_init(n as u32); let range = from..until; - let iter = tx_context.#table.underlying.range(range)?; - let mut queue = VecDeque::with_capacity(n); - - for entry_res in iter { - let pk = entry_res?.0.value(); - if queue.len() == n { - queue.pop_front(); // remove oldest - } - queue.push_back(pk); - } - queue - .into_iter() - .map(|pk| Self::compose(tx_context, pk)) - .collect::, AppError>>() + let mut pks = tx_context.#table.underlying.range(range)?.rev().take(n) + .map(|entry_res| entry_res.map(|(kg, _)| kg.value())) + .collect::>>()?; + pks.reverse(); + Self::compose_many(&tx_context, pks.into_iter().map(Ok), None) } }; diff --git a/macros/src/pk/take.rs b/macros/src/pk/take.rs index 2c06b95..deb2510 100644 --- a/macros/src/pk/take.rs +++ b/macros/src/pk/take.rs @@ -13,21 +13,11 @@ pub fn fn_def(entity_def: &EntityDef, table: &Ident) -> FunctionDef { let fn_name = format_ident!("take"); let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_ctx_type, n: usize) -> Result, AppError> { - let mut iter = tx_context.#table.underlying.iter()?; - let mut results = Vec::new(); - let mut count: usize = 0; if n > 100 { Err(AppError::Internal("Cannot take more than 100 entities at once".into())) } else { - while let Some(entry_res) = iter.next() { - if count >= n { - break; - } - let pk = entry_res?.0.value(); - results.push(Self::compose(&tx_context, pk)?); - count += 1; - } - Ok(results) + let iter =tx_context.#table.underlying.iter()?.take(n).map(|entry_res| entry_res.map(|(pk_guard, _)| pk_guard.value())); + Self::compose_many(&tx_context, iter, None) } } }; diff --git a/redbit/Cargo.toml b/redbit/Cargo.toml index c9e74a1..465d338 100644 --- a/redbit/Cargo.toml +++ b/redbit/Cargo.toml @@ -45,4 +45,5 @@ anyhow = "1.0.99" log = "0.4.27" crossbeam = "0.8.4" xxhash-rust = { version = "0.8.12", features = ["xxh3"] } -indexmap = "2.12.0" \ No newline at end of file +indexmap = "2.12.0" +itertools = "0.14.0" \ No newline at end of file diff --git a/redbit/src/lib.rs b/redbit/src/lib.rs index 34257ad..b154b90 100644 --- a/redbit/src/lib.rs +++ b/redbit/src/lib.rs @@ -23,6 +23,8 @@ pub use futures::stream::{self, StreamExt}; pub use futures_util::stream::TryStreamExt; pub use http; pub use http::HeaderValue; +pub use itertools::Either; +pub use indexmap; pub use inventory; pub use lru::LruCache; pub use macros::column; @@ -34,12 +36,11 @@ pub use macros::PointerKey; pub use macros::RootKey; pub use once_cell; pub use query::*; -pub use indexmap; pub use rand; pub use redb; pub use redb::{ - Database, Durability, Key, TypeName, Value, MultimapTable, MultimapTableDefinition, ReadOnlyMultimapTable, ReadOnlyTable, ReadTransaction, - ReadableDatabase, ReadableMultimapTable, ReadableTable, ReadableTableMetadata, Table, TableDefinition, TableError, TableStats, TransactionError, WriteTransaction, + AccessGuard, Database, Durability, Key, MultimapTable, MultimapTableDefinition, MultimapValue, ReadOnlyMultimapTable, ReadOnlyTable, ReadTransaction, ReadableDatabase, ReadableMultimapTable, + ReadableTable, ReadableTableMetadata, Table, TableDefinition, TableError, TableStats, TransactionError, TypeName, Value, WriteTransaction, }; pub use serde; pub use serde::Deserialize; @@ -57,29 +58,29 @@ pub use std::collections::HashMap; pub use std::collections::VecDeque; pub use std::fmt::Debug; use std::hash::Hash; -pub use std::time::Instant; pub use std::pin::Pin; pub use std::sync::Arc; pub use std::sync::Weak; pub use std::thread; pub use std::time::Duration; +pub use std::time::Instant; +pub use storage::async_boundary::CopyOwnedValue; pub use storage::context::{ReadTxContext, WriteTxContext}; -pub use storage::partitioning::{Partitioning, BytesPartitioner, Xxh3Partitioner, KeyPartitioner, ValuePartitioner}; +pub use storage::init::Storage; +pub use storage::init::StorageOwner; +pub use storage::partitioning::{BytesPartitioner, KeyPartitioner, Partitioning, ValuePartitioner, Xxh3Partitioner}; pub use storage::table_dict_read::ReadOnlyDictTable; pub use storage::table_dict_read_sharded::ShardedReadOnlyDictTable; -pub use storage::table_dict_write::{DictTable, DictFactory}; +pub use storage::table_dict_write::{DictFactory, DictTable}; pub use storage::table_index_read::ReadOnlyIndexTable; pub use storage::table_index_read_sharded::ShardedReadOnlyIndexTable; -pub use storage::table_index_write::{IndexTable, IndexFactory}; +pub use storage::table_index_write::{IndexFactory, IndexTable}; pub use storage::table_plain_read::ReadOnlyPlainTable; pub use storage::table_plain_read_sharded::ShardedReadOnlyPlainTable; pub use storage::table_plain_write::PlainFactory; -pub use storage::table_writer_api::{StartFuture, StopFuture, TaskResult, FlushFuture, WriterLike, WriteTableLike}; -pub use storage::async_boundary::CopyOwnedValue; -pub use storage::tx_fsm::TxFSM; pub use storage::table_writer::ShardedTableWriter; -pub use storage::init::Storage; -pub use storage::init::StorageOwner; +pub use storage::table_writer_api::{FlushFuture, StartFuture, StopFuture, TaskResult, WriteTableLike, WriterLike}; +pub use storage::tx_fsm::TxFSM; pub use urlencoding; pub use utoipa; pub use utoipa::openapi; @@ -89,6 +90,7 @@ pub use utoipa::ToSchema; pub use utoipa_axum; pub use utoipa_axum::router::OpenApiRouter; pub use utoipa_swagger_ui; +pub use std::collections::HashSet; use crate::axum::extract::rejection::JsonRejection; use crate::axum::extract::FromRequest; @@ -584,3 +586,15 @@ where assert!(matches!(ord, Ordering::Less | Ordering::Equal), "{} must be sorted by key", label); } } + +pub fn collect_multimap_value<'a, V: Key + 'a>(mut mmv: MultimapValue<'a, V>) -> Result, AppError> +where + for<'b> ::SelfType<'b>: ToOwned, +{ + let mut results = Vec::new(); + while let Some(item_res) = mmv.next() { + let guard = item_res?; + results.push(guard.value().to_owned()); + } + Ok(results) +} \ No newline at end of file diff --git a/redbit/src/storage/table_index_read.rs b/redbit/src/storage/table_index_read.rs index 1006f6d..89f0905 100644 --- a/redbit/src/storage/table_index_read.rs +++ b/redbit/src/storage/table_index_read.rs @@ -25,7 +25,6 @@ impl ReadOnlyIndexTable { pub fn get_keys<'v>(&self, val: impl Borrow>) -> redb::Result> { self.pk_by_index.get(val.borrow()) - } pub fn range_keys<'a, KR: Borrow>>(&self, range: impl RangeBounds) -> redb::Result> { diff --git a/redbit/src/storage/table_writer.rs b/redbit/src/storage/table_writer.rs index ce8a258..bcc4436 100644 --- a/redbit/src/storage/table_writer.rs +++ b/redbit/src/storage/table_writer.rs @@ -277,8 +277,7 @@ mod index_sharded { { let want = 3usize; // querying [a1, a2, a3] let (tx, rx) = channel::unbounded::>)>>(); - router.query_and_write(vec![a1.clone(), a2.clone(), a3.clone()], true, Arc::new(move |last_shards, batch| { - // non-blocking: just forward the shard batch + router.query_and_write(vec![a1.clone(), a2.clone(), a3.clone()], true, Arc::new(move |_last_shards, batch| { tx.send(batch)?; Ok(()) })).expect("enqueue heads_before");