From 0e6a5a0ebb767f98cc0465418f9a394bf8ff7725 Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Sat, 1 Nov 2025 13:26:37 +0100 Subject: [PATCH 1/3] compose_many --- macros/src/column/get_by.rs | 33 ++--------------- macros/src/column/get_keys_by.rs | 28 ++------------- macros/src/column/stream_by.rs | 8 +---- macros/src/column/stream_keys_by.rs | 17 ++------- macros/src/entity/compose.rs | 49 ++++++++++++++++++++++++++ macros/src/entity/mod.rs | 2 ++ redbit/src/lib.rs | 36 ++++++++++++------- redbit/src/storage/table_index_read.rs | 1 - 8 files changed, 84 insertions(+), 90 deletions(-) diff --git a/macros/src/column/get_by.rs b/macros/src/column/get_by.rs index 7def0a0e..f4c7dd8a 100644 --- a/macros/src/column/get_by.rs +++ b/macros/src/column/get_by.rs @@ -16,24 +16,9 @@ pub fn get_by_dict_def( let read_tx_context_ty = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_tx_context_ty, val: &#column_type) -> Result, AppError> { - let iter_opt = tx_context.#dict_table_var.get_keys(val)?; - match iter_opt { + match tx_context.#dict_table_var.get_keys(val)? { None => Ok(Vec::new()), - Some(mut iter) => { - let mut results = Vec::new(); - while let Some(x) = iter.next() { - let pk = x?.value(); - match Self::compose(&tx_context, pk) { - Ok(item) => { - results.push(item); - } - Err(err) => { - return Err(AppError::Internal(err.into())); - } - } - } - Ok(results) - } + Some(mut iter) => Self::compose_many(&tx_context, &mut iter) } } }; @@ -79,19 +64,7 @@ pub fn get_by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_ctx_type, val: &#column_type) -> Result, AppError> { let mut iter = tx_context.#index_table_var.get_keys(val)?; - let mut results = Vec::new(); - while let Some(x) = iter.next() { - let pk = x?.value(); - match Self::compose(&tx_context, pk) { - Ok(item) => { - results.push(item); - } - Err(err) => { - return Err(AppError::Internal(err.into())); - } - } - } - Ok(results) + Self::compose_many(&tx_context, &mut iter) } }; diff --git a/macros/src/column/get_keys_by.rs b/macros/src/column/get_keys_by.rs index 03e5cfd0..4d24cdb1 100644 --- a/macros/src/column/get_keys_by.rs +++ b/macros/src/column/get_keys_by.rs @@ -13,20 +13,7 @@ pub fn by_dict_def(entity_def: &EntityDef, column_name: &Ident, column_type: &Ty let read_ctx_ty = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_ctx_ty, val: &#column_type) -> Result, AppError> { - let iter_opt = tx_context.#dict_table_var.get_keys(val)?; - let results = - match iter_opt { - None => Vec::new(), - Some(mut iter) => { - let mut results = Vec::new(); - while let Some(x) = iter.next() { - let pk = x?.value(); - results.push(pk); - } - results - }, - }; - Ok(results) + tx_context.#dict_table_var.get_keys(val)?.map_or(Ok(Vec::new()), redbit::collect_multimap_value) } }; @@ -71,17 +58,8 @@ pub fn by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type: &T let pk_type = &key_def.tpe; let fn_name = format_ident!("get_{}s_by_{}", pk_name, column_name); let fn_stream = quote! { - pub fn #fn_name( - tx_context: &#read_ctx_type, - val: &#column_type - ) -> Result, AppError> { - let mut iter = tx_context.#index_table.get_keys(val)?; - let mut results = Vec::new(); - while let Some(x) = iter.next() { - let pk = x?.value(); - results.push(pk); - } - Ok(results) + pub fn #fn_name(tx_context: &#read_ctx_type, val: &#column_type) -> Result, AppError> { + redbit::collect_multimap_value(tx_context.#index_table.get_keys(val)?) } }; diff --git a/macros/src/column/stream_by.rs b/macros/src/column/stream_by.rs index dd9a1cd1..b90ad5c7 100644 --- a/macros/src/column/stream_by.rs +++ b/macros/src/column/stream_by.rs @@ -18,13 +18,7 @@ pub fn by_dict_def( let fn_stream = quote! { pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result> + Send>>, AppError> { let multi_value = tx_context.#dict_table_var.get_keys(val)?; - let iter_box: Box> + Send> = - if let Some(v) = multi_value { - Box::new(v) - } else { - Box::new(std::iter::empty()) - }; - + let iter_box = multi_value.into_iter().flatten(); let stream = futures::stream::unfold( (iter_box, tx_context, query), |(mut iter, tx_context, query)| async move { diff --git a/macros/src/column/stream_keys_by.rs b/macros/src/column/stream_keys_by.rs index 8828ad56..25d3589e 100644 --- a/macros/src/column/stream_keys_by.rs +++ b/macros/src/column/stream_keys_by.rs @@ -17,17 +17,7 @@ pub fn by_dict_def(entity_def: &EntityDef, column_name: &Ident, column_type: &Ty let fn_stream = quote! { pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type) -> Result> + Send, AppError> { let multi_value = tx_context.#dict_table_var.get_keys(val)?; - let iter_box: Box> + Send> = - if let Some(v) = multi_value { - Box::new(v) - } else { - Box::new(std::iter::empty()) - }; - - let stream = stream::iter(iter_box) - .map(|res| res.map(|e| e.value().clone()).map_err(AppError::from)); - - Ok(stream) + Ok(stream::iter(multi_value.into_iter().flatten()).map(|res| res.map(|g| g.value().clone()).map_err(AppError::from))) } }; @@ -110,10 +100,7 @@ pub fn by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type: &T let fn_name = format_ident!("stream_{}s_by_{}", pk_name, column_name); let fn_stream = quote! { pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type) -> Result> + Send, AppError> { - let iter = tx_context.#index_table.get_keys(&val)?; - let iter_box: Box> + Send> = Box::new(iter); - let stream = stream::iter(iter_box).map(|res| res.map(|e| e.value().clone()).map_err(AppError::from)); - Ok(stream) + Ok(stream::iter(tx_context.#index_table.get_keys(&val)?).map(|res| res.map(|e| e.value().clone()).map_err(AppError::from))) } }; diff --git a/macros/src/entity/compose.rs b/macros/src/entity/compose.rs index ab601262..04396d62 100644 --- a/macros/src/entity/compose.rs +++ b/macros/src/entity/compose.rs @@ -39,6 +39,31 @@ pub fn compose_token_stream(entity_def: &EntityDef, field_names: &[Ident], struc } } +pub fn compose_many_token_stream(entity_def: &EntityDef) -> FunctionDef { + let entity_type = &entity_def.entity_type; + let pk_type: &Type = &entity_def.key_def.field_def().tpe; + let read_ctx_type: &Type = &entity_def.read_ctx_type; + + FunctionDef { + fn_stream: quote! { + fn compose_many(tx_context: &#read_ctx_type, pk_values: &mut MultimapValue<#pk_type>) -> Result, AppError> { + let mut results = Vec::new(); + while let Some(pk_val) = pk_values.next() { + let pk = pk_val?.value(); + match Self::compose(&tx_context, pk) { + Ok(item) => results.push(item), + Err(err) => return Err(AppError::Internal(err.into())), + } + } + Ok(results) + } + }, + endpoint: None, + test_stream: None, + bench_stream: None, + } +} + pub fn compose_with_filter_token_stream(entity_def: &EntityDef, field_names: &[Ident], struct_inits_with_query: &[TokenStream]) -> FunctionDef { let EntityDef { key_def, entity_type, query_type, read_ctx_type, ..} = &entity_def; let pk_type: &Type = &key_def.field_def().tpe; @@ -71,5 +96,29 @@ pub fn compose_with_filter_token_stream(entity_def: &EntityDef, field_names: &[I }), bench_stream: None, } +} +pub fn compose_many_with_filter_token_stream(entity_def: &EntityDef) -> FunctionDef { + let pk_type: &Type = &entity_def.key_def.field_def().tpe; + let EntityDef { entity_type, query_type, read_ctx_type, ..} = &entity_def; + FunctionDef { + fn_stream: quote! { + fn compose_many_with_filter(tx_context: &#read_ctx_type, pk_values: &mut MultimapValue<#pk_type>, stream_query: &#query_type) -> Result, AppError> { + let mut results = Vec::new(); + while let Some(pk_val) = pk_values.next() { + let pk = pk_val?.value(); + match Self::compose_with_filter(&tx_context, pk, stream_query) { + Ok(Some(item)) => results.push(item), + Ok(None) => {}, + Err(err) => return Err(AppError::Internal(err.into())), + } + } + Ok(results) + } + }, + endpoint: None, + test_stream: None, + bench_stream: None, + } } + diff --git a/macros/src/entity/mod.rs b/macros/src/entity/mod.rs index 1ce12772..575f93c8 100644 --- a/macros/src/entity/mod.rs +++ b/macros/src/entity/mod.rs @@ -75,7 +75,9 @@ pub fn new(item_struct: &ItemStruct) -> Result<(KeyDef, Vec, TokenStre delete::delete_def(&entity_def, &delete_statements), delete::delete_many_def(&entity_def, &delete_many_statements), compose::compose_token_stream(&entity_def, &field_names, &struct_inits), + compose::compose_many_token_stream(&entity_def), compose::compose_with_filter_token_stream(&entity_def, &field_names, &struct_inits_with_query), + compose::compose_many_with_filter_token_stream(&entity_def), ]; function_defs.extend(sample::sample_token_fns(&entity_def, &struct_default_inits, &struct_default_inits_with_query, &field_names)); function_defs.extend(column_function_defs.clone()); diff --git a/redbit/src/lib.rs b/redbit/src/lib.rs index 34257ad0..4fa81849 100644 --- a/redbit/src/lib.rs +++ b/redbit/src/lib.rs @@ -23,6 +23,7 @@ pub use futures::stream::{self, StreamExt}; pub use futures_util::stream::TryStreamExt; pub use http; pub use http::HeaderValue; +pub use indexmap; pub use inventory; pub use lru::LruCache; pub use macros::column; @@ -34,12 +35,11 @@ pub use macros::PointerKey; pub use macros::RootKey; pub use once_cell; pub use query::*; -pub use indexmap; pub use rand; pub use redb; pub use redb::{ - Database, Durability, Key, TypeName, Value, MultimapTable, MultimapTableDefinition, ReadOnlyMultimapTable, ReadOnlyTable, ReadTransaction, - ReadableDatabase, ReadableMultimapTable, ReadableTable, ReadableTableMetadata, Table, TableDefinition, TableError, TableStats, TransactionError, WriteTransaction, + Database, Durability, Key, MultimapTable, MultimapTableDefinition, MultimapValue, ReadOnlyMultimapTable, ReadOnlyTable, ReadTransaction, ReadableDatabase, ReadableMultimapTable, + ReadableTable, ReadableTableMetadata, Table, TableDefinition, TableError, TableStats, TransactionError, TypeName, Value, WriteTransaction, }; pub use serde; pub use serde::Deserialize; @@ -57,29 +57,29 @@ pub use std::collections::HashMap; pub use std::collections::VecDeque; pub use std::fmt::Debug; use std::hash::Hash; -pub use std::time::Instant; pub use std::pin::Pin; pub use std::sync::Arc; pub use std::sync::Weak; pub use std::thread; pub use std::time::Duration; +pub use std::time::Instant; +pub use storage::async_boundary::CopyOwnedValue; pub use storage::context::{ReadTxContext, WriteTxContext}; -pub use storage::partitioning::{Partitioning, BytesPartitioner, Xxh3Partitioner, KeyPartitioner, ValuePartitioner}; +pub use storage::init::Storage; +pub use storage::init::StorageOwner; +pub use storage::partitioning::{BytesPartitioner, KeyPartitioner, Partitioning, ValuePartitioner, Xxh3Partitioner}; pub use storage::table_dict_read::ReadOnlyDictTable; pub use storage::table_dict_read_sharded::ShardedReadOnlyDictTable; -pub use storage::table_dict_write::{DictTable, DictFactory}; +pub use storage::table_dict_write::{DictFactory, DictTable}; pub use storage::table_index_read::ReadOnlyIndexTable; pub use storage::table_index_read_sharded::ShardedReadOnlyIndexTable; -pub use storage::table_index_write::{IndexTable, IndexFactory}; +pub use storage::table_index_write::{IndexFactory, IndexTable}; pub use storage::table_plain_read::ReadOnlyPlainTable; pub use storage::table_plain_read_sharded::ShardedReadOnlyPlainTable; pub use storage::table_plain_write::PlainFactory; -pub use storage::table_writer_api::{StartFuture, StopFuture, TaskResult, FlushFuture, WriterLike, WriteTableLike}; -pub use storage::async_boundary::CopyOwnedValue; -pub use storage::tx_fsm::TxFSM; pub use storage::table_writer::ShardedTableWriter; -pub use storage::init::Storage; -pub use storage::init::StorageOwner; +pub use storage::table_writer_api::{FlushFuture, StartFuture, StopFuture, TaskResult, WriteTableLike, WriterLike}; +pub use storage::tx_fsm::TxFSM; pub use urlencoding; pub use utoipa; pub use utoipa::openapi; @@ -584,3 +584,15 @@ where assert!(matches!(ord, Ordering::Less | Ordering::Equal), "{} must be sorted by key", label); } } + +pub fn collect_multimap_value<'a, V: Key + 'a>(mut mmv: MultimapValue<'a, V>) -> Result, AppError> +where + for<'b> ::SelfType<'b>: ToOwned, +{ + let mut results = Vec::new(); + while let Some(item_res) = mmv.next() { + let guard = item_res?; + results.push(guard.value().to_owned()); + } + Ok(results) +} \ No newline at end of file diff --git a/redbit/src/storage/table_index_read.rs b/redbit/src/storage/table_index_read.rs index 1006f6d7..89f09058 100644 --- a/redbit/src/storage/table_index_read.rs +++ b/redbit/src/storage/table_index_read.rs @@ -25,7 +25,6 @@ impl ReadOnlyIndexTable { pub fn get_keys<'v>(&self, val: impl Borrow>) -> redb::Result> { self.pk_by_index.get(val.borrow()) - } pub fn range_keys<'a, KR: Borrow>>(&self, range: impl RangeBounds) -> redb::Result> { From 374abf211df7234f2ab92ea44f9962608f3afc9f Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Sun, 2 Nov 2025 13:01:16 +0100 Subject: [PATCH 2/3] compose_many_stream --- macros/src/column/get_by.rs | 10 ++- macros/src/column/range_by.rs | 23 ++----- macros/src/column/stream_by.rs | 57 ++-------------- macros/src/column/stream_keys_by.rs | 5 +- macros/src/column/stream_parents_by.rs | 48 ++++---------- macros/src/column/stream_range_by.rs | 38 +++-------- macros/src/entity/compose.rs | 91 ++++++++++++++++---------- macros/src/entity/mod.rs | 4 +- macros/src/pk/range.rs | 19 +----- macros/src/pk/stream_range.rs | 27 +------- macros/src/pk/tail.rs | 28 ++++---- macros/src/pk/take.rs | 14 +--- redbit/Cargo.toml | 3 +- redbit/src/lib.rs | 4 +- redbit/src/storage/table_writer.rs | 3 +- 15 files changed, 122 insertions(+), 252 deletions(-) diff --git a/macros/src/column/get_by.rs b/macros/src/column/get_by.rs index f4c7dd8a..8a1d28c4 100644 --- a/macros/src/column/get_by.rs +++ b/macros/src/column/get_by.rs @@ -16,10 +16,8 @@ pub fn get_by_dict_def( let read_tx_context_ty = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_tx_context_ty, val: &#column_type) -> Result, AppError> { - match tx_context.#dict_table_var.get_keys(val)? { - None => Ok(Vec::new()), - Some(mut iter) => Self::compose_many(&tx_context, &mut iter) - } + let iter = tx_context.#dict_table_var.get_keys(val)?.into_iter().flatten().map(|res| res.map(|kg| kg.value())); + Self::compose_many(&tx_context, iter, None) } }; @@ -63,8 +61,8 @@ pub fn get_by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type let read_ctx_type = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_ctx_type, val: &#column_type) -> Result, AppError> { - let mut iter = tx_context.#index_table_var.get_keys(val)?; - Self::compose_many(&tx_context, &mut iter) + let iter = tx_context.#index_table_var.get_keys(val)?.map(|res| res.map(|kg| kg.value())); + Self::compose_many(&tx_context, iter, None) } }; diff --git a/macros/src/column/range_by.rs b/macros/src/column/range_by.rs index 3b800085..d7b6c865 100644 --- a/macros/src/column/range_by.rs +++ b/macros/src/column/range_by.rs @@ -11,23 +11,12 @@ pub fn by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type: &T let read_ctx_type = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_ctx_type, from: &#column_type, until: &#column_type) -> Result, AppError> { - let range_iter = tx_context.#index_table.range_keys::<#column_type>(from..until)?; - let mut results = Vec::new(); - for entry_res in range_iter { - let (_, mut multi_iter) = entry_res?; - while let Some(x) = multi_iter.next() { - let pk = x?.value(); - match Self::compose(&tx_context, pk) { - Ok(item) => { - results.push(item); - } - Err(err) => { - return Err(AppError::Internal(err.into())); - } - } - } - } - Ok(results) + let iter = tx_context.#index_table.range_keys::<#column_type>(from..until)? + .flat_map(|r| match r { + Ok((_k, value_iter)) => Either::Left(value_iter.map(|res| res.map(|kg| kg.value()))), + Err(e) => Either::Right(std::iter::once(Err(e))), + }); + Self::compose_many(&tx_context, iter, None) } }; diff --git a/macros/src/column/stream_by.rs b/macros/src/column/stream_by.rs index b90ad5c7..b3728ad3 100644 --- a/macros/src/column/stream_by.rs +++ b/macros/src/column/stream_by.rs @@ -16,36 +16,12 @@ pub fn by_dict_def( let EntityDef { key_def, entity_name, entity_type, query_type, read_ctx_type, ..} = &entity_def; let pk_type = key_def.field_def().tpe; let fn_stream = quote! { - pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result> + Send>>, AppError> { - let multi_value = tx_context.#dict_table_var.get_keys(val)?; - let iter_box = multi_value.into_iter().flatten(); - let stream = futures::stream::unfold( - (iter_box, tx_context, query), - |(mut iter, tx_context, query)| async move { - match iter.next() { - Some(Ok(guard)) => { - let pk = guard.value().clone(); - if let Some(ref q) = query { - match Self::compose_with_filter(&tx_context, pk, q) { - Ok(Some(entity)) => Some((Ok(entity), (iter, tx_context, query))), - Ok(None) => None, - Err(e) => Some((Err(e), (iter, tx_context, query))), - } - } else { - Some((Self::compose(&tx_context, pk), (iter, tx_context, query))) - } - } - Some(Err(e)) => Some((Err(AppError::from(e)), (iter, tx_context, query))), - None => None, - } - }, - ).boxed(); - - Ok(stream) + pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result> + Send, AppError> { + let iter = tx_context.#dict_table_var.get_keys(val)?.into_iter().flatten().map(|res| res.map(|kg| kg.value())); + Self::compose_many_stream(tx_context, iter, query) } }; - let test_with_filter_fn_name = format_ident!("{}_with_filter", fn_name); let test_stream = Some(quote! { #[tokio::test] @@ -139,30 +115,9 @@ pub fn by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type: &T let EntityDef { key_def, entity_name, entity_type, query_type, read_ctx_type, ..} = &entity_def; let pk_type = key_def.field_def().tpe; let fn_stream = quote! { - pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result> + Send>>, AppError> { - let iter = tx_context.#index_table.get_keys(val)?; - - let stream = futures::stream::unfold((iter, tx_context, query), |(mut iter, tx_context, query)| async move { - match iter.next() { - Some(Ok(guard)) => { - let pk = guard.value(); - if let Some(ref stream_query) = query { - match Self::compose_with_filter(&tx_context, pk, stream_query) { - Ok(Some(entity)) => Some((Ok(entity), (iter, tx_context, query))), - Ok(None) => None, - Err(e) => Some((Err(e), (iter, tx_context, query))), - } - } else { - Some((Self::compose(&tx_context, pk), (iter, tx_context, query))) - } - } - Some(Err(e)) => Some((Err(AppError::from(e)), (iter, tx_context, query))), - None => None, - } - }) - .boxed(); - - Ok(stream) + pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result> + Send, AppError> { + let iter = tx_context.#index_table.get_keys(val)?.map(|res| res.map(|kg| kg.value())); + Self::compose_many_stream(tx_context, iter, query) } }; diff --git a/macros/src/column/stream_keys_by.rs b/macros/src/column/stream_keys_by.rs index 25d3589e..29198df6 100644 --- a/macros/src/column/stream_keys_by.rs +++ b/macros/src/column/stream_keys_by.rs @@ -16,12 +16,11 @@ pub fn by_dict_def(entity_def: &EntityDef, column_name: &Ident, column_type: &Ty let read_ctx_type = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type) -> Result> + Send, AppError> { - let multi_value = tx_context.#dict_table_var.get_keys(val)?; - Ok(stream::iter(multi_value.into_iter().flatten()).map(|res| res.map(|g| g.value().clone()).map_err(AppError::from))) + let iter = tx_context.#dict_table_var.get_keys(val)?.into_iter().flatten().map(|res| res.map(|g| g.value().clone()).map_err(AppError::from)); + Ok(stream::iter(iter)) } }; - let test_stream = Some(quote! { #[tokio::test] async fn #fn_name() { diff --git a/macros/src/column/stream_parents_by.rs b/macros/src/column/stream_parents_by.rs index 09bf54de..bc58a367 100644 --- a/macros/src/column/stream_parents_by.rs +++ b/macros/src/column/stream_parents_by.rs @@ -23,45 +23,21 @@ pub fn by_dict_def( let fn_name = format_ident!("stream_{}s_by_{}", parent_ident.to_string().to_lowercase(), column_name); let fn_stream = quote! { - pub fn #fn_name(parent_tx_context: #parent_tx_context_type, val: #column_type, query: Option<#stream_parent_query_type>) -> Result> + Send>>, AppError> { - let multi_value_opt = parent_tx_context.#parent_one2many_field_name.#dict_table_var.get_keys(val)?; - let mut unique_parent_pointers = - match multi_value_opt { - None => Vec::new(), - Some(multi_value) => { - let mut pointers = Vec::new(); - for pk_guard in multi_value { - let pk = pk_guard?.value(); - pointers.push(pk.parent); - } - pointers - } - }; - unique_parent_pointers.dedup(); - let stream = futures::stream::unfold( - (unique_parent_pointers.into_iter(), parent_tx_context, query), - |(mut iter, parent_tx_context, query)| async move { - match iter.next() { - Some(parent_pointer) => { - if let Some(ref stream_query) = query { - match #parent_type::compose_with_filter(&parent_tx_context, parent_pointer, stream_query) { - Ok(Some(entity)) => Some((Ok(entity), (iter, parent_tx_context, query))), - Ok(None) => None, - Err(e) => Some((Err(e), (iter, parent_tx_context, query))), - } - } else { - Some((#parent_type::compose(&parent_tx_context, parent_pointer), (iter, parent_tx_context, query))) - } - } - None => None, - } - }, - ).boxed(); - Ok(stream) + pub fn #fn_name(parent_tx_context: #parent_tx_context_type, val: #column_type, query: Option<#stream_parent_query_type>) -> Result> + Send, AppError> { + let unique_parent_pk_iter = parent_tx_context.#parent_one2many_field_name.#dict_table_var.get_keys(val)? + .into_iter() + .flatten() + .map(|r| r.map(|g| g.value().parent)) + .scan(HashSet::new(), |seen, r| { + Some(match r { + Ok(parent) => if seen.insert(parent) { Some(Ok(parent)) } else { None }, + Err(e) => Some(Err(e)), + }) + }).flatten(); + #parent_type::compose_many_stream(parent_tx_context, unique_parent_pk_iter, query) } }; - let test_with_filter_fn_name = format_ident!("{}_with_filter", fn_name); let test_stream = Some(quote! { #[tokio::test] diff --git a/macros/src/column/stream_range_by.rs b/macros/src/column/stream_range_by.rs index 3e8d6c3a..f37b37d7 100644 --- a/macros/src/column/stream_range_by.rs +++ b/macros/src/column/stream_range_by.rs @@ -16,36 +16,14 @@ pub fn stream_range_by_index_def(entity_def: &EntityDef, column_name: &Ident, co from: #column_type, until: #column_type, query: Option<#query_type>, - ) -> Result> + Send>>, AppError> { - let outer_iter = tx_context.#index_table.range_keys::<#column_type>(from..until)?; - let outer_stream = futures::stream::iter(outer_iter).map_err(AppError::from); - let pk_stream = outer_stream.map_ok(|(_key, value_iter)| { - futures::stream::iter(value_iter).map(|res| { - res.map_err(AppError::from).map(|guard| guard.value().clone()) - }) - }); - Ok( - pk_stream - .try_flatten() - .map(move |pk_res| { - match pk_res { - Ok(pk) => { - if let Some(ref stream_query) = query { - match Self::compose_with_filter(&tx_context, pk, stream_query) { - Ok(Some(entity)) => Some(Ok(entity)), - Ok(None) => None, - Err(e) => Some(Err(e)), - } - } else { - Some(Self::compose(&tx_context, pk)) // <- already Result - } - } - Err(e) => Some(Err(e)), - } - }) - .filter_map(std::future::ready) // remove None - .boxed() - ) + ) -> Result> + Send, AppError> { + let pk_iter = + tx_context.#index_table.range_keys::<#column_type>(from..until)? + .flat_map(|r| match r { + Ok((_k, value_iter)) => Either::Left(value_iter.map(|res| res.map(|kg| kg.value()))), + Err(e) => Either::Right(std::iter::once(Err(e))), + }); + Self::compose_many_stream(tx_context, pk_iter, query) } }; diff --git a/macros/src/entity/compose.rs b/macros/src/entity/compose.rs index 04396d62..5ede1c2e 100644 --- a/macros/src/entity/compose.rs +++ b/macros/src/entity/compose.rs @@ -39,31 +39,6 @@ pub fn compose_token_stream(entity_def: &EntityDef, field_names: &[Ident], struc } } -pub fn compose_many_token_stream(entity_def: &EntityDef) -> FunctionDef { - let entity_type = &entity_def.entity_type; - let pk_type: &Type = &entity_def.key_def.field_def().tpe; - let read_ctx_type: &Type = &entity_def.read_ctx_type; - - FunctionDef { - fn_stream: quote! { - fn compose_many(tx_context: &#read_ctx_type, pk_values: &mut MultimapValue<#pk_type>) -> Result, AppError> { - let mut results = Vec::new(); - while let Some(pk_val) = pk_values.next() { - let pk = pk_val?.value(); - match Self::compose(&tx_context, pk) { - Ok(item) => results.push(item), - Err(err) => return Err(AppError::Internal(err.into())), - } - } - Ok(results) - } - }, - endpoint: None, - test_stream: None, - bench_stream: None, - } -} - pub fn compose_with_filter_token_stream(entity_def: &EntityDef, field_names: &[Ident], struct_inits_with_query: &[TokenStream]) -> FunctionDef { let EntityDef { key_def, entity_type, query_type, read_ctx_type, ..} = &entity_def; let pk_type: &Type = &key_def.field_def().tpe; @@ -98,19 +73,29 @@ pub fn compose_with_filter_token_stream(entity_def: &EntityDef, field_names: &[I } } -pub fn compose_many_with_filter_token_stream(entity_def: &EntityDef) -> FunctionDef { +pub fn compose_many_token_stream(entity_def: &EntityDef) -> FunctionDef { let pk_type: &Type = &entity_def.key_def.field_def().tpe; let EntityDef { entity_type, query_type, read_ctx_type, ..} = &entity_def; FunctionDef { fn_stream: quote! { - fn compose_many_with_filter(tx_context: &#read_ctx_type, pk_values: &mut MultimapValue<#pk_type>, stream_query: &#query_type) -> Result, AppError> { - let mut results = Vec::new(); - while let Some(pk_val) = pk_values.next() { - let pk = pk_val?.value(); - match Self::compose_with_filter(&tx_context, pk, stream_query) { - Ok(Some(item)) => results.push(item), - Ok(None) => {}, - Err(err) => return Err(AppError::Internal(err.into())), + fn compose_many>>( + tx_context: &#read_ctx_type, + pk_values: I, + stream_query: Option<#query_type> + ) -> Result, AppError> { + let mut results = Vec::with_capacity(pk_values.size_hint().0); + match stream_query { + Some(ref q) => { + for pk in pk_values { + if let Ok(Some(item)) = Self::compose_with_filter(&tx_context, pk?, q) { + results.push(item); + } + } + } + None => { + for pk in pk_values { + results.push(Self::compose(&tx_context, pk?)?); + } } } Ok(results) @@ -122,3 +107,41 @@ pub fn compose_many_with_filter_token_stream(entity_def: &EntityDef) -> Function } } +pub fn compose_many_stream_token_stream(entity_def: &EntityDef) -> FunctionDef { + let pk_type: &Type = &entity_def.key_def.field_def().tpe; + let EntityDef { entity_type, query_type, read_ctx_type, ..} = &entity_def; + FunctionDef { + fn_stream: quote! { + pub fn compose_many_stream> + Send>( + tx_context: #read_ctx_type, + pk_values: I, + stream_query: Option<#query_type>, + ) -> Result> + Send, AppError> { + let iter = pk_values.filter_map(move |item_res| { + match item_res { + Err(e) => Some(Err(AppError::from(e))), + Ok(pk) => { + if let Some(ref q) = stream_query { + match Self::compose_with_filter(&tx_context, pk, q) { + Ok(Some(item)) => Some(Ok(item)), + Ok(None) => None, // skip + Err(err) => Some(Err(AppError::Internal(err.into()))), + } + } else { + match Self::compose(&tx_context, pk) { + Ok(item) => Some(Ok(item)), + Err(err) => Some(Err(AppError::Internal(err.into()))), + } + } + } + } + }); + Ok(stream::iter(iter)) + } + }, + endpoint: None, + test_stream: None, + bench_stream: None, + } +} + diff --git a/macros/src/entity/mod.rs b/macros/src/entity/mod.rs index 575f93c8..7b719635 100644 --- a/macros/src/entity/mod.rs +++ b/macros/src/entity/mod.rs @@ -75,9 +75,9 @@ pub fn new(item_struct: &ItemStruct) -> Result<(KeyDef, Vec, TokenStre delete::delete_def(&entity_def, &delete_statements), delete::delete_many_def(&entity_def, &delete_many_statements), compose::compose_token_stream(&entity_def, &field_names, &struct_inits), - compose::compose_many_token_stream(&entity_def), compose::compose_with_filter_token_stream(&entity_def, &field_names, &struct_inits_with_query), - compose::compose_many_with_filter_token_stream(&entity_def), + compose::compose_many_token_stream(&entity_def), + compose::compose_many_stream_token_stream(&entity_def), ]; function_defs.extend(sample::sample_token_fns(&entity_def, &struct_default_inits, &struct_default_inits_with_query, &field_names)); function_defs.extend(column_function_defs.clone()); diff --git a/macros/src/pk/range.rs b/macros/src/pk/range.rs index e1d4bde3..4a30d481 100644 --- a/macros/src/pk/range.rs +++ b/macros/src/pk/range.rs @@ -12,23 +12,8 @@ pub fn fn_def(entity_def: &EntityDef, table: &Ident, no_columns: bool) -> Functi quote! { pub fn #fn_name(tx_context: &#read_ctx_type, from: #pk_type, until: #pk_type, query: Option<#query_type>) -> Result, AppError> { let range = from..until; - let mut iter = tx_context.#table.underlying.range::<#pk_type>(range)?; - let mut results = Vec::new(); - if let Some(ref q) = query { - while let Some(entry_res) = iter.next() { - let pk = entry_res?.0.value(); - match Self::compose_with_filter(&tx_context, pk, q)? { - Some(entity) => results.push(entity), - None => {}, - } - } - } else { - while let Some(entry_res) = iter.next() { - let pk = entry_res?.0.value(); - results.push(Self::compose(&tx_context, pk)?); - } - } - Ok(results) + let iter = tx_context.#table.underlying.range::<#pk_type>(range)?.map(|res| res.map(|(kg, _)| kg.value())); + Self::compose_many(&tx_context, iter, query) } }; diff --git a/macros/src/pk/stream_range.rs b/macros/src/pk/stream_range.rs index f25e334f..f0db7632 100644 --- a/macros/src/pk/stream_range.rs +++ b/macros/src/pk/stream_range.rs @@ -14,31 +14,10 @@ pub fn fn_def(entity_def: &EntityDef, table: &Ident, range_query_ty: &Type, no_c let fn_name = format_ident!("stream_range"); let fn_stream = quote! { - pub fn #fn_name(tx_context: #read_ctx_type, from: #pk_type, until: #pk_type, query: Option<#query_type>) -> Result> + Send>>, AppError> { + pub fn #fn_name(tx_context: #read_ctx_type, from: #pk_type, until: #pk_type, query: Option<#query_type>) -> Result> + Send, AppError> { let range = from..until; - let iter_box = Box::new(tx_context.#table.underlying.range::<#pk_type>(range)?); - let stream = futures::stream::unfold( - (iter_box, tx_context, query), - |(mut iter, tx_context, query)| async move { - match iter.next() { - Some(Ok((key, _val))) => { - let pk = key.value(); - if let Some(ref stream_query) = query { - match Self::compose_with_filter(&tx_context, pk, stream_query) { - Ok(Some(entity)) => Some((Ok(entity), (iter, tx_context, query))), - Ok(None) => None, - Err(e) => Some((Err(e), (iter, tx_context, query))), - } - } else { - Some((Self::compose(&tx_context, pk), (iter, tx_context, query))) - } - } - Some(Err(e)) => Some((Err(AppError::from(e)), (iter, tx_context, query))), - None => None, - } - }, - ).boxed(); - Ok(stream) + let iter = tx_context.#table.underlying.range::<#pk_type>(range)?.map(|res| res.map(|(kg, _)| kg.value())); + Self::compose_many_stream(tx_context, iter, query) } }; diff --git a/macros/src/pk/tail.rs b/macros/src/pk/tail.rs index 2ee1eab9..24da20b0 100644 --- a/macros/src/pk/tail.rs +++ b/macros/src/pk/tail.rs @@ -13,28 +13,24 @@ pub fn fn_def(entity_def: &EntityDef, table: &Ident) -> FunctionDef { let read_ctx_type = &entity_def.read_ctx_type; let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_ctx_type, n: usize) -> Result, AppError> { + if n == 0 { + return Ok(Vec::new()); + } + if n > 100 { + return Err(AppError::Internal("Cannot take more than 100 entities at once".into())); + } let Some((key_guard, _)) = tx_context.#table.underlying.last()? else { return Ok(Vec::new()); }; - let key = key_guard.value(); - let until = key.next_index(); + let until = key_guard.value().next_index(); let from = until.rollback_or_init(n as u32); let range = from..until; - let iter = tx_context.#table.underlying.range(range)?; - let mut queue = VecDeque::with_capacity(n); - - for entry_res in iter { - let pk = entry_res?.0.value(); - if queue.len() == n { - queue.pop_front(); // remove oldest - } - queue.push_back(pk); - } - queue - .into_iter() - .map(|pk| Self::compose(tx_context, pk)) - .collect::, AppError>>() + let mut pks = tx_context.#table.underlying.range(range)?.rev().take(n) + .map(|entry_res| entry_res.map(|(kg, _)| kg.value())) + .collect::>>()?; + pks.reverse(); + Self::compose_many(&tx_context, pks.into_iter().map(Ok), None) } }; diff --git a/macros/src/pk/take.rs b/macros/src/pk/take.rs index 2c06b953..deb2510c 100644 --- a/macros/src/pk/take.rs +++ b/macros/src/pk/take.rs @@ -13,21 +13,11 @@ pub fn fn_def(entity_def: &EntityDef, table: &Ident) -> FunctionDef { let fn_name = format_ident!("take"); let fn_stream = quote! { pub fn #fn_name(tx_context: &#read_ctx_type, n: usize) -> Result, AppError> { - let mut iter = tx_context.#table.underlying.iter()?; - let mut results = Vec::new(); - let mut count: usize = 0; if n > 100 { Err(AppError::Internal("Cannot take more than 100 entities at once".into())) } else { - while let Some(entry_res) = iter.next() { - if count >= n { - break; - } - let pk = entry_res?.0.value(); - results.push(Self::compose(&tx_context, pk)?); - count += 1; - } - Ok(results) + let iter =tx_context.#table.underlying.iter()?.take(n).map(|entry_res| entry_res.map(|(pk_guard, _)| pk_guard.value())); + Self::compose_many(&tx_context, iter, None) } } }; diff --git a/redbit/Cargo.toml b/redbit/Cargo.toml index c9e74a1c..465d338f 100644 --- a/redbit/Cargo.toml +++ b/redbit/Cargo.toml @@ -45,4 +45,5 @@ anyhow = "1.0.99" log = "0.4.27" crossbeam = "0.8.4" xxhash-rust = { version = "0.8.12", features = ["xxh3"] } -indexmap = "2.12.0" \ No newline at end of file +indexmap = "2.12.0" +itertools = "0.14.0" \ No newline at end of file diff --git a/redbit/src/lib.rs b/redbit/src/lib.rs index 4fa81849..b154b904 100644 --- a/redbit/src/lib.rs +++ b/redbit/src/lib.rs @@ -23,6 +23,7 @@ pub use futures::stream::{self, StreamExt}; pub use futures_util::stream::TryStreamExt; pub use http; pub use http::HeaderValue; +pub use itertools::Either; pub use indexmap; pub use inventory; pub use lru::LruCache; @@ -38,7 +39,7 @@ pub use query::*; pub use rand; pub use redb; pub use redb::{ - Database, Durability, Key, MultimapTable, MultimapTableDefinition, MultimapValue, ReadOnlyMultimapTable, ReadOnlyTable, ReadTransaction, ReadableDatabase, ReadableMultimapTable, + AccessGuard, Database, Durability, Key, MultimapTable, MultimapTableDefinition, MultimapValue, ReadOnlyMultimapTable, ReadOnlyTable, ReadTransaction, ReadableDatabase, ReadableMultimapTable, ReadableTable, ReadableTableMetadata, Table, TableDefinition, TableError, TableStats, TransactionError, TypeName, Value, WriteTransaction, }; pub use serde; @@ -89,6 +90,7 @@ pub use utoipa::ToSchema; pub use utoipa_axum; pub use utoipa_axum::router::OpenApiRouter; pub use utoipa_swagger_ui; +pub use std::collections::HashSet; use crate::axum::extract::rejection::JsonRejection; use crate::axum::extract::FromRequest; diff --git a/redbit/src/storage/table_writer.rs b/redbit/src/storage/table_writer.rs index ce8a258e..bcc44364 100644 --- a/redbit/src/storage/table_writer.rs +++ b/redbit/src/storage/table_writer.rs @@ -277,8 +277,7 @@ mod index_sharded { { let want = 3usize; // querying [a1, a2, a3] let (tx, rx) = channel::unbounded::>)>>(); - router.query_and_write(vec![a1.clone(), a2.clone(), a3.clone()], true, Arc::new(move |last_shards, batch| { - // non-blocking: just forward the shard batch + router.query_and_write(vec![a1.clone(), a2.clone(), a3.clone()], true, Arc::new(move |_last_shards, batch| { tx.send(batch)?; Ok(()) })).expect("enqueue heads_before"); From 28aa1503167de8abd5a294bf372c4f5ce56b8ff4 Mon Sep 17 00:00:00 2001 From: pragmaxim Date: Sun, 2 Nov 2025 14:21:54 +0100 Subject: [PATCH 3/3] GH actions - run on PR too --- .github/workflows/redbit.yml | 8 ++++++++ chain/README.md | 6 ++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/.github/workflows/redbit.yml b/.github/workflows/redbit.yml index 671486b9..94ec43ac 100644 --- a/.github/workflows/redbit.yml +++ b/.github/workflows/redbit.yml @@ -4,6 +4,12 @@ on: push: branches: - master + pull_request: + branches: + - master + types: + - opened + - synchronize workflow_dispatch: permissions: @@ -168,12 +174,14 @@ jobs: ' README.md > README.md.tmp && mv README.md.tmp README.md - name: Commit benchmark results to README + if: github.event_name == 'push' && github.ref == 'refs/heads/master' run: | git config --global user.name "github-actions[bot]" git config --global user.email "github-actions@users.noreply.github.com" git diff --quiet || (git add README.md && git commit -m "Auto-update README with example code" && git push) deploy: + if: github.event_name == 'push' && github.ref == 'refs/heads/master' needs: bench runs-on: ubuntu-latest permissions: diff --git a/chain/README.md b/chain/README.md index fa9dd3ca..fc9cd832 100644 --- a/chain/README.md +++ b/chain/README.md @@ -1,12 +1,14 @@ ## Chain -Chain keeps you in sync with arbitrary blockchain if you implement the [api](src/api.rs). +Chain is a read-optimized UTXO analytics engine as it builds utxo indexes with materialized prev-outs. +Utxo state is built on the fly during indexing, addresses and scripts are stored as a dictionary for deduplication purposes. +At the same time it is extremely space efficient as it uses hierarchical pointers to reference hashes. Chain tip is "eventually consistent" with the settlement layer through eager fork competition such that superseded forks are immediately deleted from DB and replaced with more valuable fork when it appears. Ie. only one winning fork is kept in the DB at given moment. This allows for much better performance and space efficiency. -Utxo state is built on the fly during indexing, addresses are stored as a dictionary for deduplication purposes. +It keeps you in sync with arbitrary blockchain if you implement the [api](src/api.rs). ### Perf