Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/redbit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ on:
push:
branches:
- master
pull_request:
branches:
- master
types:
- opened
- synchronize
workflow_dispatch:

permissions:
Expand Down Expand Up @@ -168,12 +174,14 @@ jobs:
' README.md > README.md.tmp && mv README.md.tmp README.md

- name: Commit benchmark results to README
if: github.event_name == 'push' && github.ref == 'refs/heads/master'
run: |
git config --global user.name "github-actions[bot]"
git config --global user.email "github-actions@users.noreply.github.com"
git diff --quiet || (git add README.md && git commit -m "Auto-update README with example code" && git push)

deploy:
if: github.event_name == 'push' && github.ref == 'refs/heads/master'
needs: bench
runs-on: ubuntu-latest
permissions:
Expand Down
6 changes: 4 additions & 2 deletions chain/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
## Chain

Chain keeps you in sync with arbitrary blockchain if you implement the [api](src/api.rs).
Chain is a read-optimized UTXO analytics engine as it builds utxo indexes with materialized prev-outs.
Utxo state is built on the fly during indexing, addresses and scripts are stored as a dictionary for deduplication purposes.
At the same time it is extremely space efficient as it uses hierarchical pointers to reference hashes.

Chain tip is "eventually consistent" with the settlement layer through eager fork competition such that
superseded forks are immediately deleted from DB and replaced with more valuable fork when it appears.
Ie. only one winning fork is kept in the DB at given moment. This allows for much better performance and space efficiency.

Utxo state is built on the fly during indexing, addresses are stored as a dictionary for deduplication purposes.
It keeps you in sync with arbitrary blockchain if you implement the [api](src/api.rs).

### Perf

Expand Down
37 changes: 4 additions & 33 deletions macros/src/column/get_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,8 @@ pub fn get_by_dict_def(
let read_tx_context_ty = &entity_def.read_ctx_type;
let fn_stream = quote! {
pub fn #fn_name(tx_context: &#read_tx_context_ty, val: &#column_type) -> Result<Vec<#entity_type>, AppError> {
let iter_opt = tx_context.#dict_table_var.get_keys(val)?;
match iter_opt {
None => Ok(Vec::new()),
Some(mut iter) => {
let mut results = Vec::new();
while let Some(x) = iter.next() {
let pk = x?.value();
match Self::compose(&tx_context, pk) {
Ok(item) => {
results.push(item);
}
Err(err) => {
return Err(AppError::Internal(err.into()));
}
}
}
Ok(results)
}
}
let iter = tx_context.#dict_table_var.get_keys(val)?.into_iter().flatten().map(|res| res.map(|kg| kg.value()));
Self::compose_many(&tx_context, iter, None)
}
};

Expand Down Expand Up @@ -78,20 +61,8 @@ pub fn get_by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type
let read_ctx_type = &entity_def.read_ctx_type;
let fn_stream = quote! {
pub fn #fn_name(tx_context: &#read_ctx_type, val: &#column_type) -> Result<Vec<#entity_type>, AppError> {
let mut iter = tx_context.#index_table_var.get_keys(val)?;
let mut results = Vec::new();
while let Some(x) = iter.next() {
let pk = x?.value();
match Self::compose(&tx_context, pk) {
Ok(item) => {
results.push(item);
}
Err(err) => {
return Err(AppError::Internal(err.into()));
}
}
}
Ok(results)
let iter = tx_context.#index_table_var.get_keys(val)?.map(|res| res.map(|kg| kg.value()));
Self::compose_many(&tx_context, iter, None)
}
};

Expand Down
28 changes: 3 additions & 25 deletions macros/src/column/get_keys_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,7 @@ pub fn by_dict_def(entity_def: &EntityDef, column_name: &Ident, column_type: &Ty
let read_ctx_ty = &entity_def.read_ctx_type;
let fn_stream = quote! {
pub fn #fn_name(tx_context: &#read_ctx_ty, val: &#column_type) -> Result<Vec<#pk_type>, AppError> {
let iter_opt = tx_context.#dict_table_var.get_keys(val)?;
let results =
match iter_opt {
None => Vec::new(),
Some(mut iter) => {
let mut results = Vec::new();
while let Some(x) = iter.next() {
let pk = x?.value();
results.push(pk);
}
results
},
};
Ok(results)
tx_context.#dict_table_var.get_keys(val)?.map_or(Ok(Vec::new()), redbit::collect_multimap_value)
}
};

Expand Down Expand Up @@ -71,17 +58,8 @@ pub fn by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type: &T
let pk_type = &key_def.tpe;
let fn_name = format_ident!("get_{}s_by_{}", pk_name, column_name);
let fn_stream = quote! {
pub fn #fn_name(
tx_context: &#read_ctx_type,
val: &#column_type
) -> Result<Vec<#pk_type>, AppError> {
let mut iter = tx_context.#index_table.get_keys(val)?;
let mut results = Vec::new();
while let Some(x) = iter.next() {
let pk = x?.value();
results.push(pk);
}
Ok(results)
pub fn #fn_name(tx_context: &#read_ctx_type, val: &#column_type) -> Result<Vec<#pk_type>, AppError> {
redbit::collect_multimap_value(tx_context.#index_table.get_keys(val)?)
}
};

Expand Down
23 changes: 6 additions & 17 deletions macros/src/column/range_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,12 @@ pub fn by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type: &T
let read_ctx_type = &entity_def.read_ctx_type;
let fn_stream = quote! {
pub fn #fn_name(tx_context: &#read_ctx_type, from: &#column_type, until: &#column_type) -> Result<Vec<#entity_type>, AppError> {
let range_iter = tx_context.#index_table.range_keys::<#column_type>(from..until)?;
let mut results = Vec::new();
for entry_res in range_iter {
let (_, mut multi_iter) = entry_res?;
while let Some(x) = multi_iter.next() {
let pk = x?.value();
match Self::compose(&tx_context, pk) {
Ok(item) => {
results.push(item);
}
Err(err) => {
return Err(AppError::Internal(err.into()));
}
}
}
}
Ok(results)
let iter = tx_context.#index_table.range_keys::<#column_type>(from..until)?
.flat_map(|r| match r {
Ok((_k, value_iter)) => Either::Left(value_iter.map(|res| res.map(|kg| kg.value()))),
Err(e) => Either::Right(std::iter::once(Err(e))),
});
Self::compose_many(&tx_context, iter, None)
}
};

Expand Down
63 changes: 6 additions & 57 deletions macros/src/column/stream_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,12 @@ pub fn by_dict_def(
let EntityDef { key_def, entity_name, entity_type, query_type, read_ctx_type, ..} = &entity_def;
let pk_type = key_def.field_def().tpe;
let fn_stream = quote! {
pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result<Pin<Box<dyn futures::Stream<Item = Result<#entity_type, AppError>> + Send>>, AppError> {
let multi_value = tx_context.#dict_table_var.get_keys(val)?;
let iter_box: Box<dyn Iterator<Item = Result<_, _>> + Send> =
if let Some(v) = multi_value {
Box::new(v)
} else {
Box::new(std::iter::empty())
};

let stream = futures::stream::unfold(
(iter_box, tx_context, query),
|(mut iter, tx_context, query)| async move {
match iter.next() {
Some(Ok(guard)) => {
let pk = guard.value().clone();
if let Some(ref q) = query {
match Self::compose_with_filter(&tx_context, pk, q) {
Ok(Some(entity)) => Some((Ok(entity), (iter, tx_context, query))),
Ok(None) => None,
Err(e) => Some((Err(e), (iter, tx_context, query))),
}
} else {
Some((Self::compose(&tx_context, pk), (iter, tx_context, query)))
}
}
Some(Err(e)) => Some((Err(AppError::from(e)), (iter, tx_context, query))),
None => None,
}
},
).boxed();

Ok(stream)
pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result<impl futures::Stream<Item = Result<#entity_type, AppError>> + Send, AppError> {
let iter = tx_context.#dict_table_var.get_keys(val)?.into_iter().flatten().map(|res| res.map(|kg| kg.value()));
Self::compose_many_stream(tx_context, iter, query)
}
};


let test_with_filter_fn_name = format_ident!("{}_with_filter", fn_name);
let test_stream = Some(quote! {
#[tokio::test]
Expand Down Expand Up @@ -145,30 +115,9 @@ pub fn by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type: &T
let EntityDef { key_def, entity_name, entity_type, query_type, read_ctx_type, ..} = &entity_def;
let pk_type = key_def.field_def().tpe;
let fn_stream = quote! {
pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result<Pin<Box<dyn futures::Stream<Item = Result<#entity_type, AppError>> + Send>>, AppError> {
let iter = tx_context.#index_table.get_keys(val)?;

let stream = futures::stream::unfold((iter, tx_context, query), |(mut iter, tx_context, query)| async move {
match iter.next() {
Some(Ok(guard)) => {
let pk = guard.value();
if let Some(ref stream_query) = query {
match Self::compose_with_filter(&tx_context, pk, stream_query) {
Ok(Some(entity)) => Some((Ok(entity), (iter, tx_context, query))),
Ok(None) => None,
Err(e) => Some((Err(e), (iter, tx_context, query))),
}
} else {
Some((Self::compose(&tx_context, pk), (iter, tx_context, query)))
}
}
Some(Err(e)) => Some((Err(AppError::from(e)), (iter, tx_context, query))),
None => None,
}
})
.boxed();

Ok(stream)
pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type, query: Option<#query_type>) -> Result<impl futures::Stream<Item = Result<#entity_type, AppError>> + Send, AppError> {
let iter = tx_context.#index_table.get_keys(val)?.map(|res| res.map(|kg| kg.value()));
Self::compose_many_stream(tx_context, iter, query)
}
};

Expand Down
20 changes: 3 additions & 17 deletions macros/src/column/stream_keys_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,11 @@ pub fn by_dict_def(entity_def: &EntityDef, column_name: &Ident, column_type: &Ty
let read_ctx_type = &entity_def.read_ctx_type;
let fn_stream = quote! {
pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type) -> Result<impl futures::Stream<Item = Result<#pk_type, AppError>> + Send, AppError> {
let multi_value = tx_context.#dict_table_var.get_keys(val)?;
let iter_box: Box<dyn Iterator<Item = Result<_, _>> + Send> =
if let Some(v) = multi_value {
Box::new(v)
} else {
Box::new(std::iter::empty())
};

let stream = stream::iter(iter_box)
.map(|res| res.map(|e| e.value().clone()).map_err(AppError::from));

Ok(stream)
let iter = tx_context.#dict_table_var.get_keys(val)?.into_iter().flatten().map(|res| res.map(|g| g.value().clone()).map_err(AppError::from));
Ok(stream::iter(iter))
}
};


let test_stream = Some(quote! {
#[tokio::test]
async fn #fn_name() {
Expand Down Expand Up @@ -110,10 +99,7 @@ pub fn by_index_def(entity_def: &EntityDef, column_name: &Ident, column_type: &T
let fn_name = format_ident!("stream_{}s_by_{}", pk_name, column_name);
let fn_stream = quote! {
pub fn #fn_name(tx_context: #read_ctx_type, val: #column_type) -> Result<impl futures::Stream<Item = Result<#pk_type, AppError>> + Send, AppError> {
let iter = tx_context.#index_table.get_keys(&val)?;
let iter_box: Box<dyn Iterator<Item = Result<_, _>> + Send> = Box::new(iter);
let stream = stream::iter(iter_box).map(|res| res.map(|e| e.value().clone()).map_err(AppError::from));
Ok(stream)
Ok(stream::iter(tx_context.#index_table.get_keys(&val)?).map(|res| res.map(|e| e.value().clone()).map_err(AppError::from)))
}
};

Expand Down
48 changes: 12 additions & 36 deletions macros/src/column/stream_parents_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,45 +23,21 @@ pub fn by_dict_def(

let fn_name = format_ident!("stream_{}s_by_{}", parent_ident.to_string().to_lowercase(), column_name);
let fn_stream = quote! {
pub fn #fn_name(parent_tx_context: #parent_tx_context_type, val: #column_type, query: Option<#stream_parent_query_type>) -> Result<Pin<Box<dyn futures::Stream<Item = Result<#parent_type, AppError>> + Send>>, AppError> {
let multi_value_opt = parent_tx_context.#parent_one2many_field_name.#dict_table_var.get_keys(val)?;
let mut unique_parent_pointers =
match multi_value_opt {
None => Vec::new(),
Some(multi_value) => {
let mut pointers = Vec::new();
for pk_guard in multi_value {
let pk = pk_guard?.value();
pointers.push(pk.parent);
}
pointers
}
};
unique_parent_pointers.dedup();
let stream = futures::stream::unfold(
(unique_parent_pointers.into_iter(), parent_tx_context, query),
|(mut iter, parent_tx_context, query)| async move {
match iter.next() {
Some(parent_pointer) => {
if let Some(ref stream_query) = query {
match #parent_type::compose_with_filter(&parent_tx_context, parent_pointer, stream_query) {
Ok(Some(entity)) => Some((Ok(entity), (iter, parent_tx_context, query))),
Ok(None) => None,
Err(e) => Some((Err(e), (iter, parent_tx_context, query))),
}
} else {
Some((#parent_type::compose(&parent_tx_context, parent_pointer), (iter, parent_tx_context, query)))
}
}
None => None,
}
},
).boxed();
Ok(stream)
pub fn #fn_name(parent_tx_context: #parent_tx_context_type, val: #column_type, query: Option<#stream_parent_query_type>) -> Result<impl futures::Stream<Item = Result<#parent_type, AppError>> + Send, AppError> {
let unique_parent_pk_iter = parent_tx_context.#parent_one2many_field_name.#dict_table_var.get_keys(val)?
.into_iter()
.flatten()
.map(|r| r.map(|g| g.value().parent))
.scan(HashSet::new(), |seen, r| {
Some(match r {
Ok(parent) => if seen.insert(parent) { Some(Ok(parent)) } else { None },
Err(e) => Some(Err(e)),
})
}).flatten();
#parent_type::compose_many_stream(parent_tx_context, unique_parent_pk_iter, query)
}
};


let test_with_filter_fn_name = format_ident!("{}_with_filter", fn_name);
let test_stream = Some(quote! {
#[tokio::test]
Expand Down
38 changes: 8 additions & 30 deletions macros/src/column/stream_range_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,14 @@ pub fn stream_range_by_index_def(entity_def: &EntityDef, column_name: &Ident, co
from: #column_type,
until: #column_type,
query: Option<#query_type>,
) -> Result<Pin<Box<dyn futures::Stream<Item = Result<#entity_type, AppError>> + Send>>, AppError> {
let outer_iter = tx_context.#index_table.range_keys::<#column_type>(from..until)?;
let outer_stream = futures::stream::iter(outer_iter).map_err(AppError::from);
let pk_stream = outer_stream.map_ok(|(_key, value_iter)| {
futures::stream::iter(value_iter).map(|res| {
res.map_err(AppError::from).map(|guard| guard.value().clone())
})
});
Ok(
pk_stream
.try_flatten()
.map(move |pk_res| {
match pk_res {
Ok(pk) => {
if let Some(ref stream_query) = query {
match Self::compose_with_filter(&tx_context, pk, stream_query) {
Ok(Some(entity)) => Some(Ok(entity)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
} else {
Some(Self::compose(&tx_context, pk)) // <- already Result<T, AppError>
}
}
Err(e) => Some(Err(e)),
}
})
.filter_map(std::future::ready) // remove None
.boxed()
)
) -> Result<impl futures::Stream<Item = Result<#entity_type, AppError>> + Send, AppError> {
let pk_iter =
tx_context.#index_table.range_keys::<#column_type>(from..until)?
.flat_map(|r| match r {
Ok((_k, value_iter)) => Either::Left(value_iter.map(|res| res.map(|kg| kg.value()))),
Err(e) => Either::Right(std::iter::once(Err(e))),
});
Self::compose_many_stream(tx_context, pk_iter, query)
}
};

Expand Down
Loading