diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d0873c10..66980e03c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -180,6 +180,7 @@ NIP35 support, better logs and docs, performance improvements, bugs fix and more * nostr: don't use reply event as root `e` tag i no root is set in `EventBuilder::text_note_reply` ([Yuki Kishimoto]) * database: add manual trait implementations for `BTreeCappedSet` ([Yuki Kishimoto]) * database: replace LRU with custom memory cache for IDs tracking ([Yuki Kishimoto]) +* database: remove `DatabaseHelper` from `MemoryDatabase` ([Yuki Kishimoto]) * lmdb: use `async-utility` to spawn blocking tasks ([Yuki Kishimoto]) * ndb: bump `nostr-ndb` to 0.5 ([Yuki Kishimoto]) * pool: add `PingTracker` and improve relay ping management ([Yuki Kishimoto]) @@ -210,6 +211,7 @@ NIP35 support, better logs and docs, performance improvements, bugs fix and more * nostr: add `Kind::is_addressable` and `ADDRESSABLE_RANGE` ([Yuki Kishimoto]) * database: impl PartialEq and Eq for `Events` ([Yuki Kishimoto]) * database: add `SaveEventStatus` enum ([Yuki Kishimoto]) +* redb: add `nostr-redb` ([Yuki Kishimoto]) * pool: add `ReceiverStream` ([Yuki Kishimoto]) * Add `SubscribeAutoCloseOptions::idle_timeout` ([Yuki Kishimoto]) * sdk: automatically resend event after NIP-42 authentication ([Yuki Kishimoto]) diff --git a/Cargo.lock b/Cargo.lock index 5159d5d7a..48789af61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -325,8 +325,7 @@ dependencies = [ [[package]] name = "async-utility" version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a34a3b57207a7a1007832416c3e4862378c8451b4e8e093e436f48c2d3d2c151" +source = "git+https://github.com/yukibtc/async-utility?rev=575b6fb0b4e270ae44208d4b7c234366d33e32f6#575b6fb0b4e270ae44208d4b7c234366d33e32f6" dependencies = [ "futures-util", "gloo-timers", @@ -2769,6 +2768,21 @@ dependencies = [ "ureq", ] +[[package]] +name = "nostr-redb" +version = "0.38.0" +dependencies = [ + "async-utility", + "futures", + "indexed_db_futures", + "nostr", + "nostr-database", + "redb", + "tokio", + "tracing", + "wasm-bindgen", +] + [[package]] name = "nostr-relay-builder" version = "0.38.0" @@ -2782,6 +2796,7 @@ dependencies = [ "negentropy 0.4.3", "nostr", "nostr-database", + "nostr-redb", "tokio", "tracing", "tracing-subscriber", @@ -2815,6 +2830,7 @@ dependencies = [ "nostr-indexeddb", "nostr-lmdb", "nostr-ndb", + "nostr-redb", "nostr-relay-pool", "tokio", "tracing", @@ -2844,6 +2860,7 @@ dependencies = [ "console_error_panic_hook", "js-sys", "nostr-connect", + "nostr-redb", "nostr-sdk", "nwc", "tracing", @@ -3572,6 +3589,15 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f60fcc7d6849342eff22c4350c8b9a989ee8ceabc4b481253e8946b9fe83d684" +[[package]] +name = "redb" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84b1de48a7cf7ba193e81e078d17ee2b786236eed1d3f7c60f8a09545efc4925" +dependencies = [ + "libc", +] + [[package]] name = "redox_syscall" version = "0.5.5" diff --git a/Cargo.toml b/Cargo.toml index ac46cb55b..d134066fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ nostr-database = { version = "0.38", path = "./crates/nostr-database", default-f nostr-indexeddb = { version = "0.38", path = "./crates/nostr-indexeddb", default-features = false } nostr-lmdb = { version = "0.38", path = "./crates/nostr-lmdb", default-features = false } nostr-ndb = { version = "0.38", path = "./crates/nostr-ndb", default-features = false } +nostr-redb = { version = "0.38", path = "./crates/nostr-redb", default-features = false } nostr-relay-builder = { version = "0.38", path = "./crates/nostr-relay-builder", default-features = false } nostr-relay-pool = { version = "0.38", path = "./crates/nostr-relay-pool", default-features = false } nostr-sdk = { version = "0.38", path = "./crates/nostr-sdk", default-features = false } @@ -44,6 +45,7 @@ wasm-bindgen-futures = "0.4" web-sys = { version = "0.3", default-features = false } [patch.crates-io] +async-utility = { git = "https://github.com/yukibtc/async-utility", rev = "575b6fb0b4e270ae44208d4b7c234366d33e32f6" } # Patch needed to reduce bindings size bip39 = { git = "https://github.com/yukibtc/rust-bip39", rev = "eade7c56eff5f320e8eb5beee23dd8fb46413938" } # Uses bitcoin_hashes v0.14 diff --git a/bindings/nostr-sdk-js/Cargo.toml b/bindings/nostr-sdk-js/Cargo.toml index bf22d95ea..24b7c6b57 100644 --- a/bindings/nostr-sdk-js/Cargo.toml +++ b/bindings/nostr-sdk-js/Cargo.toml @@ -11,6 +11,7 @@ crate-type = ["cdylib"] console_error_panic_hook = "0.1" js-sys.workspace = true nostr-connect.workspace = true +nostr-redb.workspace = true nostr-sdk = { workspace = true, default-features = false, features = ["all-nips", "indexeddb"] } nwc.workspace = true tracing.workspace = true diff --git a/bindings/nostr-sdk-js/examples/negentropy.js b/bindings/nostr-sdk-js/examples/negentropy.js index 740dbcf6b..81628a16e 100644 --- a/bindings/nostr-sdk-js/examples/negentropy.js +++ b/bindings/nostr-sdk-js/examples/negentropy.js @@ -1,22 +1,26 @@ -const { loadWasmAsync, initLogger, LogLevel, SyncOptions, SyncDirection, Filter, Client, NostrDatabase } = require("../"); - -// NOTE: this code work only on browser (due to indexeddb)! +const { loadWasmAsync, initLogger, LogLevel, SyncOptions, SyncDirection, Filter, Client, NostrDatabase, Kind } = require("../"); async function main() { await loadWasmAsync(); initLogger(LogLevel.info()); - let db = await NostrDatabase.indexeddb("js-test"); + let db = await NostrDatabase.inMemory(); let client = Client.builder().database(db).build(); await client.addRelay("wss://relay.damus.io"); await client.connect(); - let filter = new Filter().kind(1).limit(1000); + let filter = new Filter().kind(new Kind(1)).limit(1000); let opts = new SyncOptions().direction(SyncDirection.Down); await client.sync(filter, opts); + + let f = new Filter().limit(2); + let events = await db.query([f]); + events.forEach((e) => { + console.log(e.asJson()) + }) } main(); diff --git a/bindings/nostr-sdk-js/examples/webapp/package.json b/bindings/nostr-sdk-js/examples/webapp/package.json index 94ab64f3f..2abbfb2b2 100644 --- a/bindings/nostr-sdk-js/examples/webapp/package.json +++ b/bindings/nostr-sdk-js/examples/webapp/package.json @@ -3,7 +3,7 @@ "version": "0.1.0", "private": true, "dependencies": { - "@rust-nostr/nostr-sdk": "^0.33.0", + "@rust-nostr/nostr-sdk": "file:../../rust-nostr-nostr-sdk-0.37.0.tgz", "react": "^18.2.0", "react-dom": "^18.2.0", "react-scripts": "5.0.1", diff --git a/bindings/nostr-sdk-js/examples/webapp/src/App.js b/bindings/nostr-sdk-js/examples/webapp/src/App.js index b72c029ba..bd89f1676 100644 --- a/bindings/nostr-sdk-js/examples/webapp/src/App.js +++ b/bindings/nostr-sdk-js/examples/webapp/src/App.js @@ -18,7 +18,8 @@ class App extends Component { // Try to initialize log try { - initLogger(LogLevel.info()); + initLogger(LogLevel.debug()); + console.log("Logger initialized"); } catch (error) {} } @@ -28,7 +29,11 @@ class App extends Component { let nip07_signer = new BrowserSigner(); let signer = NostrSigner.nip07(nip07_signer); let zapper = await NostrZapper.webln(); - let db = await NostrDatabase.indexeddb("nostr-sdk-webapp-example"); + + console.log("Opening database..."); + let db = await NostrDatabase.web("nostr-sdk-webapp-example-2"); + console.log("Database opened."); + let client = new ClientBuilder().signer(signer).zapper(zapper).database(db).build(); let public_key = await nip07_signer.getPublicKey(); @@ -51,7 +56,7 @@ class App extends Component { handleReconcile = async () => { try { let filter = new Filter().author(this.state.public_key); - let opts = new NegentropyOptions(); + let opts = new SyncOptions(); await this.state.client.sync(filter, opts); } catch (error) { console.log(error) @@ -65,7 +70,7 @@ class App extends Component { console.time("query"); let events = await database.query([filter]); console.timeEnd("query"); - console.log("Got", events.length, "events"); + console.log("Got", events.len(), "events"); } catch (error) { console.log(error) } @@ -73,7 +78,7 @@ class App extends Component { handlePublishTextNote = async () => { try { - let builder = EventBuilder.textNote("Test from rust-nostr JavaScript bindings with NIP07 signer!", []); + let builder = EventBuilder.textNote("Test from rust-nostr JavaScript bindings with NIP07 signer!"); await this.state.client.sendEventBuilder(builder); } catch (error) { console.log(error) @@ -82,7 +87,7 @@ class App extends Component { handleZap = async () => { try { - let pk = PublicKey.fromBech32("npub1drvpzev3syqt0kjrls50050uzf25gehpz9vgdw08hvex7e0vgfeq0eseet"); + let pk = PublicKey.parse("npub1drvpzev3syqt0kjrls50050uzf25gehpz9vgdw08hvex7e0vgfeq0eseet"); let entity = ZapEntity.publicKey(pk); let details = new ZapDetails(ZapType.Public).message("Zap for Rust Nostr!"); await this.state.client.zap(entity, 1000, details); diff --git a/bindings/nostr-sdk-js/src/database/mod.rs b/bindings/nostr-sdk-js/src/database/mod.rs index 9f1e084b1..e494cfa5e 100644 --- a/bindings/nostr-sdk-js/src/database/mod.rs +++ b/bindings/nostr-sdk-js/src/database/mod.rs @@ -5,6 +5,7 @@ use std::ops::Deref; use std::sync::Arc; +use nostr_redb::NostrRedb; use nostr_sdk::prelude::*; use wasm_bindgen::prelude::*; @@ -74,22 +75,17 @@ impl From> for JsNostrDatabase { #[wasm_bindgen(js_class = NostrDatabase)] impl JsNostrDatabase { - /// Open/Create database with **unlimited** capacity - pub async fn indexeddb(name: &str) -> Result { - let db = WebDatabase::open(name).await.map_err(into_err)?; + /// Open (or create) persistent web database + pub async fn web(name: &str) -> Result { + let db = NostrRedb::web(name).await.map_err(into_err)?; Ok(Self { inner: db.into_nostr_database(), }) } - /// Open/Create database with **limited** capacity - #[wasm_bindgen(js_name = indexeddbBounded)] - pub async fn indexeddb_bounded(name: &str, max_capacity: u64) -> Result { - let db = Arc::new( - WebDatabase::open_bounded(name, max_capacity as usize) - .await - .map_err(into_err)?, - ); + #[wasm_bindgen(js_name = inMemory)] + pub fn in_memory() -> Result { + let db = NostrRedb::in_memory().map_err(into_err)?; Ok(Self { inner: db.into_nostr_database(), }) diff --git a/crates/nostr-database/src/memory.rs b/crates/nostr-database/src/memory.rs index fcb73f702..976d35876 100644 --- a/crates/nostr-database/src/memory.rs +++ b/crates/nostr-database/src/memory.rs @@ -11,16 +11,14 @@ use nostr::prelude::*; use tokio::sync::RwLock; use crate::{ - Backend, DatabaseError, DatabaseEventResult, DatabaseEventStatus, DatabaseHelper, Events, - NostrDatabase, NostrDatabaseWipe, NostrEventsDatabase, RejectedReason, SaveEventStatus, + Backend, DatabaseError, DatabaseEventStatus, Events, NostrDatabase, NostrDatabaseWipe, + NostrEventsDatabase, RejectedReason, SaveEventStatus, }; /// Database options #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct MemoryDatabaseOptions { - /// Store events (default: false) - pub events: bool, - /// Max events and IDs to store in memory (default: 35_000) + /// Max IDs to store in memory (default: 35_000) /// /// `None` means no limits. pub max_events: Option, @@ -29,7 +27,6 @@ pub struct MemoryDatabaseOptions { impl Default for MemoryDatabaseOptions { fn default() -> Self { Self { - events: false, max_events: Some(35_000), } } @@ -43,11 +40,11 @@ impl MemoryDatabaseOptions { } /// Memory Database (RAM) +/// +/// This database keep track only of seen event IDs! #[derive(Debug, Clone)] pub struct MemoryDatabase { - opts: MemoryDatabaseOptions, seen_event_ids: Arc>, - helper: DatabaseHelper, } impl Default for MemoryDatabase { @@ -65,12 +62,7 @@ impl MemoryDatabase { /// New Memory database pub fn with_opts(opts: MemoryDatabaseOptions) -> Self { Self { - opts, seen_event_ids: Arc::new(RwLock::new(SeenTracker::new(opts.max_events))), - helper: match opts.max_events { - Some(max) => DatabaseHelper::bounded(max), - None => DatabaseHelper::unbounded(), - }, } } } @@ -87,16 +79,11 @@ impl NostrEventsDatabase for MemoryDatabase { event: &'a Event, ) -> BoxedFuture<'a, Result> { Box::pin(async move { - if self.opts.events { - let DatabaseEventResult { status, .. } = self.helper.index_event(event).await; - Ok(status) - } else { - // Mark it as seen - let mut seen_event_ids = self.seen_event_ids.write().await; - seen_event_ids.seen(event.id, None); + // Mark it as seen + let mut seen_event_ids = self.seen_event_ids.write().await; + seen_event_ids.seen(event.id, None); - Ok(SaveEventStatus::Rejected(RejectedReason::Other)) - } + Ok(SaveEventStatus::Rejected(RejectedReason::Other)) }) } @@ -105,36 +92,21 @@ impl NostrEventsDatabase for MemoryDatabase { event_id: &'a EventId, ) -> BoxedFuture<'a, Result> { Box::pin(async move { - if self.opts.events { - if self.helper.has_event_id_been_deleted(event_id).await { - Ok(DatabaseEventStatus::Deleted) - } else if self.helper.has_event(event_id).await { - Ok(DatabaseEventStatus::Saved) - } else { - Ok(DatabaseEventStatus::NotExistent) - } + let seen_event_ids = self.seen_event_ids.read().await; + Ok(if seen_event_ids.contains(event_id) { + DatabaseEventStatus::Saved } else { - let seen_event_ids = self.seen_event_ids.read().await; - Ok(if seen_event_ids.contains(event_id) { - DatabaseEventStatus::Saved - } else { - DatabaseEventStatus::NotExistent - }) - } + DatabaseEventStatus::NotExistent + }) }) } fn has_coordinate_been_deleted<'a>( &'a self, - coordinate: &'a CoordinateBorrow<'a>, - timestamp: &'a Timestamp, + _coordinate: &'a CoordinateBorrow<'a>, + _timestamp: &'a Timestamp, ) -> BoxedFuture<'a, Result> { - Box::pin(async move { - Ok(self - .helper - .has_coordinate_been_deleted(coordinate, timestamp) - .await) - }) + Box::pin(async move { Ok(false) }) } fn event_id_seen( @@ -154,41 +126,34 @@ impl NostrEventsDatabase for MemoryDatabase { fn event_by_id<'a>( &'a self, - event_id: &'a EventId, + _event_id: &'a EventId, ) -> BoxedFuture<'a, Result, DatabaseError>> { - Box::pin(async move { Ok(self.helper.event_by_id(event_id).await) }) + Box::pin(async move { Ok(None) }) } - fn count(&self, filters: Vec) -> BoxedFuture> { - Box::pin(async move { Ok(self.helper.count(filters).await) }) + fn count(&self, _filters: Vec) -> BoxedFuture> { + Box::pin(async move { Ok(0) }) } fn query(&self, filters: Vec) -> BoxedFuture> { - Box::pin(async move { Ok(self.helper.query(filters).await) }) + Box::pin(async move { Ok(Events::new(&filters)) }) } fn negentropy_items( &self, - filter: Filter, + _filter: Filter, ) -> BoxedFuture, DatabaseError>> { - Box::pin(async move { Ok(self.helper.negentropy_items(filter).await) }) + Box::pin(async move { Ok(Vec::new()) }) } - fn delete(&self, filter: Filter) -> BoxedFuture> { - Box::pin(async move { - self.helper.delete(filter).await; - Ok(()) - }) + fn delete(&self, _filter: Filter) -> BoxedFuture> { + Box::pin(async move { Ok(()) }) } } impl NostrDatabaseWipe for MemoryDatabase { fn wipe(&self) -> BoxedFuture> { Box::pin(async move { - // Clear helper - self.helper.clear().await; - - // Clear let mut seen_event_ids = self.seen_event_ids.write().await; seen_event_ids.clear(); Ok(()) diff --git a/crates/nostr-lmdb/src/store/error.rs b/crates/nostr-lmdb/src/store/error.rs index 6bae5cf10..953eb9bde 100644 --- a/crates/nostr-lmdb/src/store/error.rs +++ b/crates/nostr-lmdb/src/store/error.rs @@ -5,7 +5,7 @@ use std::{fmt, io}; -use async_utility::tokio::task::JoinError; +use async_utility::task::Error as JoinError; use nostr::{key, secp256k1}; use nostr_database::flatbuffers; diff --git a/crates/nostr-lmdb/src/store/mod.rs b/crates/nostr-lmdb/src/store/mod.rs index 2b8defd6e..b35683d0c 100644 --- a/crates/nostr-lmdb/src/store/mod.rs +++ b/crates/nostr-lmdb/src/store/mod.rs @@ -50,7 +50,7 @@ impl Store { R: Send + 'static, { let db = self.db.clone(); - Ok(task::spawn_blocking(move || f(db)).await?) + Ok(task::spawn_blocking(move || f(db)).join().await?) } #[inline] @@ -61,7 +61,7 @@ impl Store { { let db = self.db.clone(); let fbb = self.fbb.clone(); - Ok(task::spawn_blocking(move || f(db, fbb)).await?) + Ok(task::spawn_blocking(move || f(db, fbb)).join().await?) } /// Store an event. diff --git a/crates/nostr-redb/Cargo.toml b/crates/nostr-redb/Cargo.toml new file mode 100644 index 000000000..4f62261b7 --- /dev/null +++ b/crates/nostr-redb/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "nostr-redb" +version = "0.38.0" +edition = "2021" +description = "redb storage backend for nostr apps" +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +readme = "README.md" +rust-version.workspace = true +keywords = ["nostr", "database", "redb"] + +[dependencies] +async-utility.workspace = true +redb = { version = "2.2", default-features = false } # v2.3 has MSRV at 1.81 +nostr = { workspace = true, features = ["std"] } +nostr-database = { workspace = true, features = ["flatbuf"] } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +futures = "0.3" +indexed_db_futures = "0.5" +tracing.workspace = true +wasm-bindgen.workspace = true + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/crates/nostr-redb/README.md b/crates/nostr-redb/README.md new file mode 100644 index 000000000..d40274808 --- /dev/null +++ b/crates/nostr-redb/README.md @@ -0,0 +1,15 @@ +# Nostr Redb + +redb storage backend for nostr apps + +## State + +**This library is in an ALPHA state**, things that are implemented generally work but the API will change in breaking ways. + +## Donations + +`rust-nostr` is free and open-source. This means we do not earn any revenue by selling it. Instead, we rely on your financial support. If you actively use any of the `rust-nostr` libs/software/services, then please [donate](https://rust-nostr.org/donate). + +## License + +This project is distributed under the MIT software license - see the [LICENSE](../../LICENSE) file for details diff --git a/crates/nostr-redb/src/lib.rs b/crates/nostr-redb/src/lib.rs new file mode 100644 index 000000000..2ae233395 --- /dev/null +++ b/crates/nostr-redb/src/lib.rs @@ -0,0 +1,521 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2024 Rust Nostr Developers +// Distributed under the MIT software license + +//! redb storage backend for nostr apps + +#![warn(missing_docs)] +#![warn(rustdoc::bare_urls)] +#![allow(clippy::mutable_key_type)] + +use std::collections::HashSet; +#[cfg(not(target_arch = "wasm32"))] +use std::path::Path; + +use nostr_database::prelude::*; + +mod store; + +use self::store::Store; + +/// Redb Nostr Database +#[derive(Debug)] +pub struct NostrRedb { + db: Store, +} + +impl NostrRedb { + /// Persistent database + #[inline] + #[cfg(not(target_arch = "wasm32"))] + pub fn persistent

(path: P) -> Result + where + P: AsRef, + { + Ok(Self { + db: Store::persistent(path).map_err(DatabaseError::backend)?, + }) + } + + /// Web database + #[inline] + #[cfg(target_arch = "wasm32")] + pub async fn web(name: &str) -> Result { + Ok(Self { + db: Store::web(name).await.map_err(DatabaseError::backend)?, + temp: MemoryDatabase::with_opts(MemoryDatabaseOptions { + max_events: Some(100_000), + }), + }) + } + + /// Memory database + #[inline] + pub fn in_memory() -> Self { + Self { + db: Store::in_memory(), + } + } +} + +impl NostrDatabase for NostrRedb { + #[inline] + fn backend(&self) -> Backend { + if self.db.is_persistent() { + // TODO: not really LMDB + Backend::LMDB + } else { + Backend::Memory + } + } +} + +impl NostrEventsDatabase for NostrRedb { + fn save_event<'a>( + &'a self, + event: &'a Event, + ) -> BoxedFuture<'a, Result> { + Box::pin(async move { + self.db + .save_event(event) + .await + .map_err(DatabaseError::backend) + }) + } + + fn check_id<'a>( + &'a self, + event_id: &'a EventId, + ) -> BoxedFuture<'a, Result> { + Box::pin(async move { + if self + .db + .event_is_deleted(*event_id) + .await + .map_err(DatabaseError::backend)? + { + Ok(DatabaseEventStatus::Deleted) + } else if self + .db + .has_event(event_id) + .await + .map_err(DatabaseError::backend)? + { + Ok(DatabaseEventStatus::Saved) + } else { + Ok(DatabaseEventStatus::NotExistent) + } + }) + } + + fn has_coordinate_been_deleted<'a>( + &'a self, + coordinate: &'a CoordinateBorrow<'a>, + timestamp: &'a Timestamp, + ) -> BoxedFuture<'a, Result> { + Box::pin(async move { + if let Some(t) = self + .db + .when_is_coordinate_deleted(coordinate) + .await + .map_err(DatabaseError::backend)? + { + Ok(&t >= timestamp) + } else { + Ok(false) + } + }) + } + + fn event_id_seen( + &self, + _event_id: EventId, + _relay_url: RelayUrl, + ) -> BoxedFuture> { + Box::pin(async move { Ok(()) }) + } + + fn event_seen_on_relays<'a>( + &'a self, + _event_id: &'a EventId, + ) -> BoxedFuture<'a, Result>, DatabaseError>> { + Box::pin(async move { Err(DatabaseError::NotSupported) }) + } + + fn event_by_id<'a>( + &'a self, + event_id: &'a EventId, + ) -> BoxedFuture<'a, Result, DatabaseError>> { + Box::pin(async move { + self.db + .get_event_by_id(event_id) + .await + .map_err(DatabaseError::backend) + }) + } + + fn count(&self, filters: Vec) -> BoxedFuture> { + Box::pin(async move { self.db.count(filters).await.map_err(DatabaseError::backend) }) + } + + fn query(&self, filters: Vec) -> BoxedFuture> { + Box::pin(async move { self.db.query(filters).await.map_err(DatabaseError::backend) }) + } + + fn negentropy_items( + &self, + filter: Filter, + ) -> BoxedFuture, DatabaseError>> { + Box::pin(async move { + self.db + .negentropy_items(filter) + .await + .map_err(DatabaseError::backend) + }) + } + + fn delete(&self, filter: Filter) -> BoxedFuture> { + Box::pin(async move { self.db.delete(filter).await.map_err(DatabaseError::backend) }) + } +} + +impl NostrDatabaseWipe for NostrRedb { + fn wipe(&self) -> BoxedFuture> { + Box::pin(async move { self.db.wipe().await.map_err(DatabaseError::backend) }) + } +} + +#[cfg(test)] +mod tests { + use std::ops::Deref; + use std::time::Duration; + + use super::*; + + const EVENTS: [&str; 14] = [ + r#"{"id":"b7b1fb52ad8461a03e949820ae29a9ea07e35bcd79c95c4b59b0254944f62805","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704644581,"kind":1,"tags":[],"content":"Text note","sig":"ed73a8a4e7c26cd797a7b875c634d9ecb6958c57733305fed23b978109d0411d21b3e182cb67c8ad750884e30ca383b509382ae6187b36e76ee76e6a142c4284"}"#, + r#"{"id":"7296747d91c53f1d71778ef3e12d18b66d494a41f688ef244d518abf37c959b6","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704644586,"kind":32121,"tags":[["d","id-1"]],"content":"Empty 1","sig":"8848989a8e808f7315e950f871b231c1dff7752048f8957d4a541881d2005506c30e85c7dd74dab022b3e01329c88e69c9d5d55d961759272a738d150b7dbefc"}"#, + r#"{"id":"ec6ea04ba483871062d79f78927df7979f67545b53f552e47626cb1105590442","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704644591,"kind":32122,"tags":[["d","id-1"]],"content":"Empty 2","sig":"89946113a97484850fe35fefdb9120df847b305de1216dae566616fe453565e8707a4da7e68843b560fa22a932f81fc8db2b5a2acb4dcfd3caba9a91320aac92"}"#, + r#"{"id":"63b8b829aa31a2de870c3a713541658fcc0187be93af2032ec2ca039befd3f70","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704644596,"kind":32122,"tags":[["d","id-2"]],"content":"","sig":"607b1a67bef57e48d17df4e145718d10b9df51831d1272c149f2ab5ad4993ae723f10a81be2403ae21b2793c8ed4c129e8b031e8b240c6c90c9e6d32f62d26ff"}"#, + r#"{"id":"6fe9119c7db13ae13e8ecfcdd2e5bf98e2940ba56a2ce0c3e8fba3d88cd8e69d","pubkey":"79dff8f82963424e0bb02708a22e44b4980893e3a4be0fa3cb60a43b946764e3","created_at":1704644601,"kind":32122,"tags":[["d","id-3"]],"content":"","sig":"d07146547a726fc9b4ec8d67bbbe690347d43dadfe5d9890a428626d38c617c52e6945f2b7144c4e0c51d1e2b0be020614a5cadc9c0256b2e28069b70d9fc26e"}"#, + r#"{"id":"a82f6ebfc709f4e7c7971e6bf738e30a3bc112cfdb21336054711e6779fd49ef","pubkey":"79dff8f82963424e0bb02708a22e44b4980893e3a4be0fa3cb60a43b946764e3","created_at":1704644606,"kind":32122,"tags":[["d","id-1"]],"content":"","sig":"96d3349b42ed637712b4d07f037457ab6e9180d58857df77eb5fa27ff1fd68445c72122ec53870831ada8a4d9a0b484435f80d3ff21a862238da7a723a0d073c"}"#, + r#"{"id":"8ab0cb1beceeb68f080ec11a3920b8cc491ecc7ec5250405e88691d733185832","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704644611,"kind":32122,"tags":[["d","id-1"]],"content":"Test","sig":"49153b482d7110e2538eb48005f1149622247479b1c0057d902df931d5cea105869deeae908e4e3b903e3140632dc780b3f10344805eab77bb54fb79c4e4359d"}"#, + r#"{"id":"63dc49a8f3278a2de8dc0138939de56d392b8eb7a18c627e4d78789e2b0b09f2","pubkey":"79dff8f82963424e0bb02708a22e44b4980893e3a4be0fa3cb60a43b946764e3","created_at":1704644616,"kind":5,"tags":[["a","32122:aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4:"]],"content":"","sig":"977e54e5d57d1fbb83615d3a870037d9eb5182a679ca8357523bbf032580689cf481f76c88c7027034cfaf567ba9d9fe25fc8cd334139a0117ad5cf9fe325eef"}"#, + r#"{"id":"6975ace0f3d66967f330d4758fbbf45517d41130e2639b54ca5142f37757c9eb","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704644621,"kind":5,"tags":[["a","32122:aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4:id-2"]],"content":"","sig":"9bb09e4759899d86e447c3fa1be83905fe2eda74a5068a909965ac14fcdabaed64edaeb732154dab734ca41f2fc4d63687870e6f8e56e3d9e180e4a2dd6fb2d2"}"#, + r#"{"id":"33f5b4e6a38e107638c20f4536db35191d4b8651ba5a2cefec983b9ec2d65084","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704645586,"kind":0,"tags":[],"content":"{\"name\":\"Key A\"}","sig":"285d090f45a6adcae717b33771149f7840a8c27fb29025d63f1ab8d95614034a54e9f4f29cee9527c4c93321a7ebff287387b7a19ba8e6f764512a40e7120429"}"#, + r#"{"id":"90a761aec9b5b60b399a76826141f529db17466deac85696a17e4a243aa271f9","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704645606,"kind":0,"tags":[],"content":"{\"name\":\"key-a\",\"display_name\":\"Key A\",\"lud16\":\"keya@ln.address\"}","sig":"ec8f49d4c722b7ccae102d49befff08e62db775e5da43ef51b25c47dfdd6a09dc7519310a3a63cbdb6ec6b3250e6f19518eb47be604edeb598d16cdc071d3dbc"}"#, + r#"{"id":"a295422c636d3532875b75739e8dae3cdb4dd2679c6e4994c9a39c7ebf8bc620","pubkey":"79dff8f82963424e0bb02708a22e44b4980893e3a4be0fa3cb60a43b946764e3","created_at":1704646569,"kind":5,"tags":[["e","90a761aec9b5b60b399a76826141f529db17466deac85696a17e4a243aa271f9"]],"content":"","sig":"d4dc8368a4ad27eef63cacf667345aadd9617001537497108234fc1686d546c949cbb58e007a4d4b632c65ea135af4fbd7a089cc60ab89b6901f5c3fc6a47b29"}"#, // Invalid event deletion + r#"{"id":"999e3e270100d7e1eaa98fcfab4a98274872c1f2dfdab024f32e42a5a12d5b5e","pubkey":"aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4","created_at":1704646606,"kind":5,"tags":[["e","90a761aec9b5b60b399a76826141f529db17466deac85696a17e4a243aa271f9"]],"content":"","sig":"4f3a33fd52784cea7ca8428fd35d94d65049712e9aa11a70b1a16a1fcd761c7b7e27afac325728b1c00dfa11e33e78b2efd0430a7e4b28f4ede5b579b3f32614"}"#, + r#"{"id":"99a022e6d61c4e39c147d08a2be943b664e8030c0049325555ac1766429c2832","pubkey":"79dff8f82963424e0bb02708a22e44b4980893e3a4be0fa3cb60a43b946764e3","created_at":1705241093,"kind":30333,"tags":[["d","multi-id"],["p","aa4fc8665f5696e33db7e1a572e3b0f5b3d615837b0f362dcb1c8068b098c7b4"]],"content":"Multi-tags","sig":"0abfb2b696a7ed7c9e8e3bf7743686190f3f1b3d4045b72833ab6187c254f7ed278d289d52dfac3de28be861c1471421d9b1bfb5877413cbc81c84f63207a826"}"#, + ]; + + struct TempDatabase { + db: NostrRedb, + } + + impl Deref for TempDatabase { + type Target = NostrRedb; + + fn deref(&self) -> &Self::Target { + &self.db + } + } + + impl TempDatabase { + fn new() -> Self { + Self { + db: NostrRedb::in_memory(), + } + } + + // Return the number of added events + async fn add_random_events(&self) -> usize { + let keys_a = Keys::generate(); + let keys_b = Keys::generate(); + + let events = vec![ + EventBuilder::text_note("Text Note A") + .sign_with_keys(&keys_a) + .unwrap(), + EventBuilder::text_note("Text Note B") + .sign_with_keys(&keys_b) + .unwrap(), + EventBuilder::metadata( + &Metadata::new().name("account-a").display_name("Account A"), + ) + .sign_with_keys(&keys_a) + .unwrap(), + EventBuilder::metadata( + &Metadata::new().name("account-b").display_name("Account B"), + ) + .sign_with_keys(&keys_b) + .unwrap(), + EventBuilder::new(Kind::Custom(33_333), "") + .tag(Tag::identifier("my-id-a")) + .sign_with_keys(&keys_a) + .unwrap(), + EventBuilder::new(Kind::Custom(33_333), "") + .tag(Tag::identifier("my-id-b")) + .sign_with_keys(&keys_b) + .unwrap(), + ]; + + // Store + for event in events.iter() { + self.db.save_event(event).await.unwrap(); + } + + events.len() + } + + async fn add_event(&self, builder: EventBuilder) -> (Keys, Event) { + let keys = Keys::generate(); + let event = builder.sign_with_keys(&keys).unwrap(); + self.db.save_event(&event).await.unwrap(); + (keys, event) + } + + async fn add_event_with_keys( + &self, + builder: EventBuilder, + keys: &Keys, + ) -> (Event, SaveEventStatus) { + let event = builder.sign_with_keys(keys).unwrap(); + let status = self.db.save_event(&event).await.unwrap(); + (event, status) + } + + async fn count_all(&self) -> usize { + self.db.count(vec![Filter::new()]).await.unwrap() + } + } + + #[tokio::test] + async fn test_event_by_id() { + let db = TempDatabase::new(); + + let added_events: usize = db.add_random_events().await; + + let (_keys, expected_event) = db.add_event(EventBuilder::text_note("Test")).await; + + let event = db.event_by_id(&expected_event.id).await.unwrap().unwrap(); + assert_eq!(event, expected_event); + + // Check if number of events in database match the expected + assert_eq!(db.count_all().await, added_events + 1) + } + + #[tokio::test] + async fn test_replaceable_event() { + let db = TempDatabase::new(); + + let added_events: usize = db.add_random_events().await; + + let now = Timestamp::now(); + let metadata = Metadata::new() + .name("my-account") + .display_name("My Account"); + + let (keys, expected_event) = db + .add_event( + EventBuilder::metadata(&metadata).custom_created_at(now - Duration::from_secs(120)), + ) + .await; + + // Test event by ID + let event = db.event_by_id(&expected_event.id).await.unwrap().unwrap(); + assert_eq!(event, expected_event); + + // Test filter query + let events = db + .query(vec![Filter::new() + .author(keys.public_key) + .kind(Kind::Metadata)]) + .await + .unwrap(); + assert_eq!(events.to_vec(), vec![expected_event.clone()]); + + // Check if number of events in database match the expected + assert_eq!(db.count_all().await, added_events + 1); + + // Replace previous event + let (new_expected_event, status) = db + .add_event_with_keys( + EventBuilder::metadata(&metadata).custom_created_at(now), + &keys, + ) + .await; + assert!(status.is_success()); + + // Test event by ID (MUST be None because replaced) + assert!(db.event_by_id(&expected_event.id).await.unwrap().is_none()); + + // Test event by ID + let event = db + .event_by_id(&new_expected_event.id) + .await + .unwrap() + .unwrap(); + assert_eq!(event, new_expected_event); + + // Test filter query + let events = db + .query(vec![Filter::new() + .author(keys.public_key) + .kind(Kind::Metadata)]) + .await + .unwrap(); + assert_eq!(events.to_vec(), vec![new_expected_event]); + + // Check if number of events in database match the expected + assert_eq!(db.count_all().await, added_events + 1); + } + + #[tokio::test] + async fn test_param_replaceable_event() { + let db = TempDatabase::new(); + + let added_events: usize = db.add_random_events().await; + + let now = Timestamp::now(); + + let (keys, expected_event) = db + .add_event( + EventBuilder::new(Kind::Custom(33_333), "") + .tag(Tag::identifier("my-id-a")) + .custom_created_at(now - Duration::from_secs(120)), + ) + .await; + let coordinate = Coordinate::new(Kind::from(33_333), keys.public_key).identifier("my-id-a"); + + // Test event by ID + let event = db.event_by_id(&expected_event.id).await.unwrap().unwrap(); + assert_eq!(event, expected_event); + + // Test filter query + let events = db.query(vec![coordinate.clone().into()]).await.unwrap(); + assert_eq!(events.to_vec(), vec![expected_event.clone()]); + + // Check if number of events in database match the expected + assert_eq!(db.count_all().await, added_events + 1); + + // Replace previous event + let (new_expected_event, status) = db + .add_event_with_keys( + EventBuilder::new(Kind::Custom(33_333), "Test replace") + .tag(Tag::identifier("my-id-a")) + .custom_created_at(now), + &keys, + ) + .await; + assert!(status.is_success()); + + // Test event by ID (MUST be None` because replaced) + assert!(db.event_by_id(&expected_event.id).await.unwrap().is_none()); + + // Test event by ID + let event = db + .event_by_id(&new_expected_event.id) + .await + .unwrap() + .unwrap(); + assert_eq!(event, new_expected_event); + + // Test filter query + let events = db.query(vec![coordinate.into()]).await.unwrap(); + assert_eq!(events.to_vec(), vec![new_expected_event]); + + // Check if number of events in database match the expected + assert_eq!(db.count_all().await, added_events + 1); + + // Trey to add param replaceable event with older timestamp (MUSTN'T be stored) + let (_, status) = db + .add_event_with_keys( + EventBuilder::new(Kind::Custom(33_333), "Test replace 2") + .tag(Tag::identifier("my-id-a")) + .custom_created_at(now - Duration::from_secs(2000)), + &keys, + ) + .await; + assert!(!status.is_success()); + } + + #[tokio::test] + async fn test_full_text_search() { + let db = TempDatabase::new(); + + let _added_events: usize = db.add_random_events().await; + + let events = db + .query(vec![Filter::new().search("Account A")]) + .await + .unwrap(); + assert_eq!(events.len(), 1); + + let events = db + .query(vec![Filter::new().search("account a")]) + .await + .unwrap(); + assert_eq!(events.len(), 1); + + let events = db + .query(vec![Filter::new().search("text note")]) + .await + .unwrap(); + assert_eq!(events.len(), 2); + + let events = db.query(vec![Filter::new().search("notes")]).await.unwrap(); + assert_eq!(events.len(), 0); + + let events = db.query(vec![Filter::new().search("hola")]).await.unwrap(); + assert_eq!(events.len(), 0); + } + + #[tokio::test] + async fn test_expected_query_result() { + let db = TempDatabase::new(); + + for event in EVENTS.into_iter() { + let event = Event::from_json(event).unwrap(); + let _ = db.save_event(&event).await; + } + + // Test expected output + let expected_output = vec![ + Event::from_json(EVENTS[13]).unwrap(), + Event::from_json(EVENTS[12]).unwrap(), + // Event 11 is invalid deletion + // Event 10 deleted by event 12 + // Event 9 replaced by event 10 + Event::from_json(EVENTS[8]).unwrap(), + // Event 7 is an invalid deletion + Event::from_json(EVENTS[6]).unwrap(), + Event::from_json(EVENTS[5]).unwrap(), + Event::from_json(EVENTS[4]).unwrap(), + // Event 3 deleted by Event 8 + // Event 2 replaced by Event 6 + Event::from_json(EVENTS[1]).unwrap(), + Event::from_json(EVENTS[0]).unwrap(), + ]; + assert_eq!( + db.query(vec![Filter::new()]).await.unwrap().to_vec(), + expected_output + ); + assert_eq!(db.count_all().await, 8); + } + + #[tokio::test] + async fn test_delete_events_with_filter() { + let db = TempDatabase::new(); + + let added_events: usize = db.add_random_events().await; + + assert_eq!(db.count_all().await, added_events); + + // Delete all kinds except text note + let filter = Filter::new().kinds([Kind::Metadata, Kind::Custom(33_333)]); + db.delete(filter).await.unwrap(); + + assert_eq!(db.count_all().await, 2); + } +} diff --git a/crates/nostr-redb/src/store/core/index.rs b/crates/nostr-redb/src/store/core/index.rs new file mode 100644 index 000000000..574d509a6 --- /dev/null +++ b/crates/nostr-redb/src/store/core/index.rs @@ -0,0 +1,182 @@ +// Copyright (c) 2024 Michael Dilger +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2024 Rust Nostr Developers +// Distributed under the MIT software license + +use core::{cmp, iter}; + +use nostr::nips::nip01::CoordinateBorrow; +use nostr::{EventId, PublicKey, SingleLetterTag, Timestamp}; + +const CREATED_AT_BE: usize = 8; +const KIND_BE: usize = 2; +const TAG_VALUE_PAD_LEN: usize = 182; + +/// Reverse created_at and convert `u64` to big-endian byte order +#[inline] +fn reverse_and_conv_to_be64(created_at: &Timestamp) -> [u8; 8] { + // Reverse + let created_at: u64 = u64::MAX - created_at.as_u64(); + + // Convert to big-endian + created_at.to_be_bytes() +} + +/// Extend the key with the `tag_value` (fixed len of 182 bytes) +fn extend_key_with_tag_value(key: &mut Vec, len: usize, tag_value: &str) { + let tag_value: &[u8] = tag_value.as_bytes(); + if len <= TAG_VALUE_PAD_LEN { + key.extend(tag_value); + key.extend(iter::repeat(0).take(TAG_VALUE_PAD_LEN - len)); + } else { + key.extend(&tag_value[..TAG_VALUE_PAD_LEN]); + } +} + +/// Make CreatedAt + ID index key +/// +/// ## Structure +/// +/// `reverse_created_at(8)` + `event_id(32)` +pub fn make_ci_index_key(created_at: &Timestamp, event_id: &[u8; EventId::LEN]) -> Vec { + let mut key: Vec = Vec::with_capacity(CREATED_AT_BE + EventId::LEN); + key.extend(reverse_and_conv_to_be64(created_at)); + key.extend(event_id); + key +} + +/// Make Tag + CreatedAt + ID index key (for looking up event by `tag`) +/// +/// ## Structure +/// +/// `tag_name(1)` + `tag_value(182)` + `reverse_created_at(8)` + `event_id(32)` +pub fn make_tc_index_key( + tag_name: &SingleLetterTag, + tag_value: &str, + created_at: &Timestamp, + event_id: &[u8; EventId::LEN], +) -> Vec { + let mut key: Vec = Vec::with_capacity(1 + TAG_VALUE_PAD_LEN + CREATED_AT_BE + EventId::LEN); + + // Push tag name + key.push(tag_name.as_char() as u8); + + // Add tag value + extend_key_with_tag_value(&mut key, tag_value.len(), tag_value); + + key.extend(reverse_and_conv_to_be64(created_at)); + key.extend(event_id); + key +} + +/// Make Author + CreatedAt + ID index key (for looking up event by `author`) +/// +/// ## Structure +/// +/// `author(32)` + `reverse_created_at(8)` + `event_id(32)` +pub fn make_ac_index_key( + author: &[u8; PublicKey::LEN], + created_at: &Timestamp, + event_id: &[u8; EventId::LEN], +) -> Vec { + let mut key: Vec = Vec::with_capacity(PublicKey::LEN + CREATED_AT_BE + EventId::LEN); + key.extend(author); + key.extend(reverse_and_conv_to_be64(created_at)); + key.extend(event_id); + key +} + +/// Make Author + Kind + CreatedAt + ID index key (for looking up event by `author` and `kind`) +/// +/// ## Structure +/// +/// `author(32)` + `kind(2)` + `reverse_created_at(8)` + `event_id(32)` +pub fn make_akc_index_key( + author: &[u8; PublicKey::LEN], + kind: u16, + created_at: &Timestamp, + event_id: &[u8; EventId::LEN], +) -> Vec { + let mut key: Vec = + Vec::with_capacity(PublicKey::LEN + KIND_BE + CREATED_AT_BE + EventId::LEN); + key.extend(author); + key.extend(kind.to_be_bytes()); + key.extend(reverse_and_conv_to_be64(created_at)); + key.extend(event_id); + key +} + +/// Make Author + Tag + CreatedAt + ID index key (for looking up event by `author` and `tag`) +/// +/// ## Structure +/// +/// `author(32)` + `tag_name(1)` + `tag_value(182)` + `reverse_created_at(8)` + `event_id(32)` +pub fn make_atc_index_key( + author: &[u8; PublicKey::LEN], + tag_name: &SingleLetterTag, + tag_value: &str, + created_at: &Timestamp, + event_id: &[u8; EventId::LEN], +) -> Vec { + let mut key: Vec = + Vec::with_capacity(PublicKey::LEN + 1 + TAG_VALUE_PAD_LEN + CREATED_AT_BE + EventId::LEN); + + // Add author + key.extend(author); + + // Add tag name + key.push(tag_name.as_char() as u8); + + // Add tag value + extend_key_with_tag_value(&mut key, tag_value.len(), tag_value); + + // Add reverse created at + key.extend(reverse_and_conv_to_be64(created_at)); + + // Add event ID + key.extend(event_id); + + key +} + +/// Make Kind + Tag + CreatedAt + ID index (for looking up event by `kind` and `tag`) +/// +/// ## Structure +/// +/// `kind(2)` + `tag_name(1)` + `tag_value(182)` + `reverse_created_at(8)` + `event_id(32)` +pub fn make_ktc_index_key( + kind: u16, + tag_name: &SingleLetterTag, + tag_value: &str, + created_at: &Timestamp, + event_id: &[u8; EventId::LEN], +) -> Vec { + let mut key: Vec = + Vec::with_capacity(KIND_BE + TAG_VALUE_PAD_LEN + CREATED_AT_BE + EventId::LEN); + key.extend(kind.to_be_bytes()); + key.push(tag_name.as_char() as u8); + extend_key_with_tag_value(&mut key, tag_value.len(), tag_value); + key.extend(reverse_and_conv_to_be64(created_at)); + key.extend(event_id); + key +} + +/// Make coordinate index key +/// +/// ## Structure +/// +/// `kind(2)` + `author(32)` + `d_len(1)` + `d(182)` +pub fn make_coordinate_index_key(coordinate: &CoordinateBorrow) -> Vec { + let mut key: Vec = Vec::with_capacity(KIND_BE + PublicKey::LEN + 1 + TAG_VALUE_PAD_LEN); + key.extend(coordinate.kind.as_u16().to_be_bytes()); + key.extend(coordinate.public_key.to_bytes()); + + let identifier: &str = coordinate.identifier.unwrap_or_default(); + + let dlen: usize = cmp::min(identifier.len(), TAG_VALUE_PAD_LEN); + key.push(dlen as u8); + + extend_key_with_tag_value(&mut key, dlen, identifier); + + key +} diff --git a/crates/nostr-redb/src/store/core/mod.rs b/crates/nostr-redb/src/store/core/mod.rs new file mode 100644 index 000000000..fbc432607 --- /dev/null +++ b/crates/nostr-redb/src/store/core/mod.rs @@ -0,0 +1,888 @@ +// Copyright (c) 2024 Michael Dilger +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2024 Rust Nostr Developers +// Distributed under the MIT software license + +use std::collections::BTreeSet; +#[cfg(not(target_arch = "wasm32"))] +use std::fs; +use std::iter; +use std::ops::Bound; +#[cfg(not(target_arch = "wasm32"))] +use std::path::Path; +use std::sync::Arc; + +use nostr::prelude::*; +use nostr_database::flatbuffers::FlatBufferDecodeBorrowed; +use nostr_database::{FlatBufferBuilder, FlatBufferEncode}; +use redb::backends::InMemoryBackend; +use redb::{Database, Range, ReadTransaction, TableDefinition, WriteTransaction}; + +pub(super) mod index; +#[cfg(target_arch = "wasm32")] +pub(crate) mod wasm; + +#[cfg(target_arch = "wasm32")] +use self::wasm::IndexeddbBackend; +use super::error::Error; +use super::types::{AccessGuardEvent, DatabaseFilter}; + +const EVENT_ID_ALL_ZEROS: [u8; 32] = [0; 32]; +const EVENT_ID_ALL_255: [u8; 32] = [255; 32]; + +const EVENTS: TableDefinition<&[u8; 32], &[u8]> = TableDefinition::new("events"); // Event ID, Event +/// CreatedAt + ID index +const CI_INDEX: TableDefinition<&[u8], &[u8; 32]> = TableDefinition::new("ci_index"); // , Event ID +/// Tag + CreatedAt + ID index +const TC_INDEX: TableDefinition<&[u8], &[u8; 32]> = TableDefinition::new("tc_index"); // , Event ID +/// Author + CreatedAt + ID index +const AC_INDEX: TableDefinition<&[u8], &[u8; 32]> = TableDefinition::new("ac_index"); // , Event ID +/// Author + Kind + CreatedAt + ID index +const AKC_INDEX: TableDefinition<&[u8], &[u8; 32]> = TableDefinition::new("akc_index"); // , Event ID +/// Author + Tag + CreatedAt + ID index +const ATC_INDEX: TableDefinition<&[u8], &[u8; 32]> = TableDefinition::new("atc_index"); // , Event ID +/// Kind + Tag + CreatedAt + ID index +const KTC_INDEX: TableDefinition<&[u8], &[u8; 32]> = TableDefinition::new("ktc_index"); // , Event ID +const DELETED_IDS: TableDefinition<&[u8; 32], ()> = TableDefinition::new("deleted_ids"); // Event ID +const DELETED_COORDINATES: TableDefinition<&[u8], u64> = + TableDefinition::new("deletec_coordinates"); // Coordinate, UNIX timestamp + +type IndexRange<'a> = Range<'a, &'static [u8], &'static [u8; 32]>; + +#[derive(Debug, Clone)] +pub(crate) struct Db { + // TODO: remove `Arc` when PR #TDB will be merged + env: Arc, +} + +impl Db { + fn new(env: Arc) -> Result { + // Create tables + let txn = env.begin_write()?; + Self::create_tables(&txn)?; + txn.commit()?; + + Ok(Self { env }) + } + + #[cfg(not(target_arch = "wasm32"))] + pub(crate) fn persistent

(path: P) -> Result + where + P: AsRef, + { + let dir = path.as_ref(); + + fs::create_dir_all(dir)?; + + let path = dir.join("data.redb"); + let env = Arc::new(Database::create(path)?); + + Self::new(env) + } + + #[cfg(target_arch = "wasm32")] + pub(crate) async fn web(name: &str) -> Result { + let backend = IndexeddbBackend::open(name).await?; + let env = Arc::new(Database::builder().create_with_backend(backend)?); + Self::new(env) + } + + // TODO: add support to in-memory with limited capacity? + pub(crate) fn in_memory() -> Result { + let backend = InMemoryBackend::new(); + let env = Arc::new(Database::builder().create_with_backend(backend)?); + Self::new(env) + } + + /// Get a read transaction + #[inline] + pub(crate) fn read_txn(&self) -> Result { + Ok(self.env.begin_read()?) + } + + /// Get a write transaction + #[inline] + pub(crate) fn write_txn(&self) -> Result { + Ok(self.env.begin_write()?) + } + + fn create_tables(txn: &WriteTransaction) -> Result<(), Error> { + txn.open_table(EVENTS)?; + txn.open_table(CI_INDEX)?; + txn.open_table(TC_INDEX)?; + txn.open_table(AC_INDEX)?; + txn.open_table(AKC_INDEX)?; + txn.open_table(ATC_INDEX)?; + txn.open_table(KTC_INDEX)?; + txn.open_table(DELETED_IDS)?; + txn.open_table(DELETED_COORDINATES)?; + + Ok(()) + } + + /// Store and index the event + pub(crate) fn store( + &self, + txn: &WriteTransaction, + fbb: &mut FlatBufferBuilder, + event: &Event, + ) -> Result<(), Error> { + let id: &[u8; 32] = event.id.as_bytes(); + + // Store event + let mut events = txn.open_table(EVENTS)?; + events.insert(id, event.encode(fbb))?; + + // Index by created_at and id + let ci_index_key: Vec = + index::make_ci_index_key(&event.created_at, event.id.as_bytes()); + let mut ci_index = txn.open_table(CI_INDEX)?; + ci_index.insert(ci_index_key.as_slice(), id)?; + + // Index by author and kind (with created_at and id) + let akc_index_key: Vec = index::make_akc_index_key( + &event.pubkey.to_bytes(), + event.kind.as_u16(), + &event.created_at, + event.id.as_bytes(), + ); + let mut akc_index = txn.open_table(AKC_INDEX)?; + akc_index.insert(akc_index_key.as_slice(), id)?; + + // Index by author (with created_at and id) + let ac_index_key: Vec = index::make_ac_index_key( + &event.pubkey.to_bytes(), + &event.created_at, + event.id.as_bytes(), + ); + let mut ac_index = txn.open_table(AC_INDEX)?; + ac_index.insert(ac_index_key.as_slice(), id)?; + + for tag in event.tags.iter() { + if let (Some(tag_name), Some(tag_value)) = (tag.single_letter_tag(), tag.content()) { + // Index by author and tag (with created_at and id) + let atc_index_key: Vec = index::make_atc_index_key( + &event.pubkey.to_bytes(), + &tag_name, + tag_value, + &event.created_at, + event.id.as_bytes(), + ); + let mut atc_index = txn.open_table(ATC_INDEX)?; + atc_index.insert(atc_index_key.as_slice(), id)?; + + // Index by kind and tag (with created_at and id) + let ktc_index_key: Vec = index::make_ktc_index_key( + event.kind.as_u16(), + &tag_name, + tag_value, + &event.created_at, + event.id.as_bytes(), + ); + let mut ktc_index = txn.open_table(KTC_INDEX)?; + ktc_index.insert(ktc_index_key.as_slice(), id)?; + + // Index by tag (with created_at and id) + let tc_index_key: Vec = index::make_tc_index_key( + &tag_name, + tag_value, + &event.created_at, + event.id.as_bytes(), + ); + let mut tc_index = txn.open_table(TC_INDEX)?; + tc_index.insert(tc_index_key.as_slice(), id)?; + } + } + + Ok(()) + } + + /// Remove the event + pub(crate) fn remove( + &self, + txn: &WriteTransaction, + event: &AccessGuardEvent, + ) -> Result<(), Error> { + let value = event.guard.value(); + let event = EventBorrow::decode(value)?; + + let mut events = txn.open_table(EVENTS)?; + events.remove(event.id)?; + + let ci_index_key: Vec = index::make_ci_index_key(&event.created_at, event.id); + let mut ci_index = txn.open_table(CI_INDEX)?; + ci_index.remove(ci_index_key.as_slice())?; + + let akc_index_key: Vec = + index::make_akc_index_key(event.pubkey, event.kind, &event.created_at, event.id); + let mut akc_index = txn.open_table(AKC_INDEX)?; + akc_index.remove(akc_index_key.as_slice())?; + + let ac_index_key: Vec = + index::make_ac_index_key(event.pubkey, &event.created_at, event.id); + let mut ac_index = txn.open_table(AC_INDEX)?; + ac_index.remove(ac_index_key.as_slice())?; + + for tag in event.tags.iter() { + if let Some((tag_name, tag_value)) = tag.extract() { + // Index by author and tag (with created_at and id) + let atc_index_key: Vec = index::make_atc_index_key( + event.pubkey, + &tag_name, + tag_value, + &event.created_at, + event.id, + ); + let mut atc_index = txn.open_table(ATC_INDEX)?; + atc_index.remove(atc_index_key.as_slice())?; + + // Index by kind and tag (with created_at and id) + let ktc_index_key: Vec = index::make_ktc_index_key( + event.kind, + &tag_name, + tag_value, + &event.created_at, + event.id, + ); + let mut ktc_index = txn.open_table(KTC_INDEX)?; + ktc_index.remove(ktc_index_key.as_slice())?; + + // Index by tag (with created_at and id) + let tc_index_key: Vec = + index::make_tc_index_key(&tag_name, tag_value, &event.created_at, event.id); + let mut tc_index = txn.open_table(TC_INDEX)?; + tc_index.remove(tc_index_key.as_slice())?; + } + } + + Ok(()) + } + + pub(crate) fn wipe(&self, txn: &WriteTransaction) -> Result<(), Error> { + // Delete tables + txn.delete_table(EVENTS)?; + txn.delete_table(CI_INDEX)?; + txn.delete_table(TC_INDEX)?; + txn.delete_table(AC_INDEX)?; + txn.delete_table(AKC_INDEX)?; + txn.delete_table(ATC_INDEX)?; + txn.delete_table(KTC_INDEX)?; + txn.delete_table(DELETED_IDS)?; + txn.delete_table(DELETED_COORDINATES)?; + + // Re-create tables + Self::create_tables(txn)?; + + Ok(()) + } + + #[inline] + pub(crate) fn has_event( + &self, + txn: &ReadTransaction, + event_id: &[u8; 32], + ) -> Result { + Ok(self.get_event_by_id(txn, event_id)?.is_some()) + } + + #[inline] + pub(crate) fn get_event_by_id<'a>( + &self, + txn: &ReadTransaction, + event_id: &[u8; 32], + ) -> Result>, Error> { + let events = txn.open_table(EVENTS)?; + match events.get(event_id)? { + Some(guard) => Ok(Some(AccessGuardEvent::new(guard)?)), + None => Ok(None), + } + } + + pub fn query<'a, I>( + &self, + txn: &'a ReadTransaction, + filters: I, + ) -> Result>, Error> + where + I: IntoIterator, + { + let mut output: BTreeSet> = BTreeSet::new(); + for filter in filters.into_iter() { + let events = self.single_filter_query(txn, filter)?; + output.extend(events); + } + Ok(output) + } + + pub fn delete( + &self, + read_txn: &ReadTransaction, + txn: &WriteTransaction, + filter: Filter, + ) -> Result<(), Error> { + let events = self.single_filter_query(read_txn, filter)?; + for event in events.into_iter() { + self.remove(txn, &event)?; + } + Ok(()) + } + + /// Find all events that match the filter + fn single_filter_query<'a>( + &self, + txn: &'a ReadTransaction, + filter: Filter, + ) -> Result> + 'a>, Error> { + if let (Some(since), Some(until)) = (filter.since, filter.until) { + if since > until { + return Ok(Box::new(iter::empty())); + } + } + + // We insert into a BTreeSet to keep them time-ordered + let mut output: BTreeSet> = BTreeSet::new(); + + let limit: Option = filter.limit; + let since = filter.since.unwrap_or_else(Timestamp::min); + let until = filter.until.unwrap_or_else(Timestamp::max); + + let filter: DatabaseFilter = filter.into(); + + if !filter.ids.is_empty() { + // Fetch by id + for id in filter.ids.iter() { + // Check if limit is set + if let Some(limit) = limit { + // Stop if limited + if output.len() >= limit { + break; + } + } + + if let Some(event) = self.get_event_by_id(txn, id)? { + if event.match_filter(&filter)? { + output.insert(event); + } + } + } + } else if !filter.authors.is_empty() && !filter.kinds.is_empty() { + // We may bring since forward if we hit the limit without going back that + // far, so we use a mutable since: + let mut since = since; + + for author in filter.authors.iter() { + for kind in filter.kinds.iter() { + let iter = self.akc_iter(txn, author, *kind, since, until)?; + + // Count how many we have found of this author-kind pair, so we + // can possibly update `since` + let mut paircount = 0; + + 'per_event: for result in iter { + let (_key, value) = result?; + let id = value.value(); + let event = self.get_event_by_id(txn, id)?.ok_or(Error::NotFound)?; + + // If we have gone beyond since, we can stop early + // (We have to check because `since` might change in this loop) + if event.created_at < since { + break 'per_event; + } + + // check against the rest of the filter + if event.match_filter(&filter)? { + let created_at = event.created_at; + + // Accept the event + output.insert(event); + paircount += 1; + + // Stop this pair if limited + if let Some(limit) = limit { + if paircount >= limit { + // Since we found the limit just among this pair, + // potentially move since forward + if created_at > since { + since = created_at; + } + break 'per_event; + } + } + + // If kind is replaceable (and not parameterized) + // then don't take any more events for this author-kind + // pair. + // NOTE that this optimization is difficult to implement + // for other replaceable event situations + if Kind::from(*kind).is_replaceable() { + break 'per_event; + } + } + } + } + } + } else if !filter.authors.is_empty() && !filter.generic_tags.is_empty() { + // We may bring since forward if we hit the limit without going back that + // far, so we use a mutable since: + let mut since = since; + + for author in filter.authors.iter() { + for (tagname, set) in filter.generic_tags.iter() { + for tag_value in set.iter() { + let iter = + self.atc_iter(txn, author, tagname, tag_value, &since, &until)?; + self.iterate_filter_until_limit( + txn, + &filter, + iter, + &mut since, + limit, + &mut output, + )?; + } + } + } + } else if !filter.kinds.is_empty() && !filter.generic_tags.is_empty() { + // We may bring since forward if we hit the limit without going back that + // far, so we use a mutable since: + let mut since = since; + + for kind in filter.kinds.iter() { + for (tag_name, set) in filter.generic_tags.iter() { + for tag_value in set.iter() { + let iter = + self.ktc_iter(txn, *kind, tag_name, tag_value, &since, &until)?; + self.iterate_filter_until_limit( + txn, + &filter, + iter, + &mut since, + limit, + &mut output, + )?; + } + } + } + } else if !filter.generic_tags.is_empty() { + // We may bring since forward if we hit the limit without going back that + // far, so we use a mutable since: + let mut since = since; + + for (tag_name, set) in filter.generic_tags.iter() { + for tag_value in set.iter() { + let iter = self.tc_iter(txn, tag_name, tag_value, &since, &until)?; + self.iterate_filter_until_limit( + txn, + &filter, + iter, + &mut since, + limit, + &mut output, + )?; + } + } + } else if !filter.authors.is_empty() { + // We may bring since forward if we hit the limit without going back that + // far, so we use a mutable since: + let mut since = since; + + for author in filter.authors.iter() { + let iter = self.ac_iter(txn, author, since, until)?; + self.iterate_filter_until_limit( + txn, + &filter, + iter, + &mut since, + limit, + &mut output, + )?; + } + } else { + // SCRAPE + // This is INEFFICIENT as it scans through many events + + let iter = self.ci_iter(txn, &since, &until)?; + for result in iter { + // Check if limit is set + if let Some(limit) = limit { + // Stop if limited + if output.len() >= limit { + break; + } + } + + let (_key, guard) = result?; + let id = guard.value(); + let event = self.get_event_by_id(txn, id)?.ok_or(Error::NotFound)?; + + if event.match_filter(&filter)? { + output.insert(event); + } + } + } + + // Optionally apply limit + Ok(match limit { + Some(limit) => Box::new(output.into_iter().take(limit)), + None => Box::new(output.into_iter()), + }) + } + + fn iterate_filter_until_limit<'a>( + &self, + txn: &ReadTransaction, + filter: &DatabaseFilter, + iter: IndexRange<'a>, + since: &mut Timestamp, + limit: Option, + output: &mut BTreeSet>, + ) -> Result<(), Error> { + let mut count: usize = 0; + + for result in iter { + let (_key, guard) = result?; + + // Get event by ID + let id = guard.value(); + let event = self.get_event_by_id(txn, id)?.ok_or(Error::NotFound)?; + + if event.created_at < *since { + break; + } + + // check against the rest of the filter + if event.match_filter(filter)? { + let created_at = event.created_at; + + // Accept the event + output.insert(event); + count += 1; + + // Check if limit is set + if let Some(limit) = limit { + // Stop this limited + if count >= limit { + if created_at > *since { + *since = created_at; + } + break; + } + } + } + } + + Ok(()) + } + + pub fn find_replaceable_event<'a>( + &self, + txn: &ReadTransaction, + author: &PublicKey, + kind: Kind, + ) -> Result>, Error> { + if !kind.is_replaceable() { + return Err(Error::WrongEventKind); + } + + let mut iter = self.akc_iter( + txn, + &author.to_bytes(), + kind.as_u16(), + Timestamp::min(), + Timestamp::max(), + )?; + + if let Some(result) = iter.next() { + let (_key, guard) = result?; + let id = guard.value(); + return self.get_event_by_id(txn, id); + } + + Ok(None) + } + + pub fn find_parameterized_replaceable_event<'a>( + &'a self, + txn: &ReadTransaction, + addr: &Coordinate, + ) -> Result>, Error> { + if !addr.kind.is_addressable() { + return Err(Error::WrongEventKind); + } + + let iter = self.atc_iter( + txn, + &addr.public_key.to_bytes(), + &SingleLetterTag::lowercase(Alphabet::D), + &addr.identifier, + &Timestamp::min(), + &Timestamp::max(), + )?; + + for result in iter { + let (_key, guard) = result?; + let id = guard.value(); + let event = self.get_event_by_id(txn, id)?.ok_or(Error::NotFound)?; + + // the atc index doesn't have kind, so we have to compare the kinds + if event.kind != addr.kind.as_u16() { + continue; + } + + return Ok(Some(event)); + } + + Ok(None) + } + + // Remove all replaceable events with the matching author-kind + // Kind must be a replaceable (not parameterized replaceable) event kind + pub fn remove_replaceable( + &self, + read_txn: &ReadTransaction, + txn: &WriteTransaction, + coordinate: &Coordinate, + until: Timestamp, + ) -> Result<(), Error> { + if !coordinate.kind.is_replaceable() { + return Err(Error::WrongEventKind); + } + + let iter = self.akc_iter( + read_txn, + &coordinate.public_key.to_bytes(), + coordinate.kind.as_u16(), + Timestamp::zero(), + until, + )?; + + for result in iter { + let (_key, guard) = result?; + + let id = guard.value(); + if let Some(event) = self.get_event_by_id(read_txn, id)? { + self.remove(txn, &event)?; + } + } + + Ok(()) + } + + // Remove all parameterized-replaceable events with the matching author-kind-d + // Kind must be a parameterized-replaceable event kind + pub fn remove_parameterized_replaceable( + &self, + read_txn: &ReadTransaction, + txn: &WriteTransaction, + coordinate: &Coordinate, + until: Timestamp, + ) -> Result<(), Error> { + if !coordinate.kind.is_addressable() { + return Err(Error::WrongEventKind); + } + + let iter = self.atc_iter( + read_txn, + &coordinate.public_key.to_bytes(), + &SingleLetterTag::lowercase(Alphabet::D), + &coordinate.identifier, + &Timestamp::min(), + &until, + )?; + + for result in iter { + let (_key, guard) = result?; + + // Our index doesn't have Kind embedded, so we have to check it + let id = guard.value(); + let event = self.get_event_by_id(read_txn, id)?.ok_or(Error::NotFound)?; + + if event.kind == coordinate.kind.as_u16() { + self.remove(txn, &event)?; + } + } + + Ok(()) + } + + #[inline] + pub(crate) fn is_deleted( + &self, + txn: &ReadTransaction, + event_id: &EventId, + ) -> Result { + let deleted_ids = txn.open_table(DELETED_IDS)?; + Ok(deleted_ids.get(event_id.as_bytes())?.is_some()) + } + + pub(crate) fn mark_deleted( + &self, + txn: &WriteTransaction, + event_id: &EventId, + ) -> Result<(), Error> { + let mut deleted_ids = txn.open_table(DELETED_IDS)?; + deleted_ids.insert(event_id.as_bytes(), &())?; + Ok(()) + } + + pub(crate) fn mark_coordinate_deleted( + &self, + txn: &WriteTransaction, + coordinate: &CoordinateBorrow, + when: Timestamp, + ) -> Result<(), Error> { + let key: Vec = index::make_coordinate_index_key(coordinate); + let mut deleted_coordinates = txn.open_table(DELETED_COORDINATES)?; + deleted_coordinates.insert(key.as_slice(), when.as_u64())?; + Ok(()) + } + + pub(crate) fn when_is_coordinate_deleted( + &self, + txn: &ReadTransaction, + coordinate: &CoordinateBorrow, + ) -> Result, Error> { + let key: Vec = index::make_coordinate_index_key(coordinate); + self.when_is_coordinate_deleted_by_key(txn, key) + } + + pub(crate) fn when_is_coordinate_deleted_by_key( + &self, + txn: &ReadTransaction, + coordinate_key: Vec, + ) -> Result, Error> { + let deleted_coordinates = txn.open_table(DELETED_COORDINATES)?; + Ok(deleted_coordinates + .get(coordinate_key.as_slice())? + .map(|guard| { + let secs: u64 = guard.value(); + Timestamp::from_secs(secs) + })) + } + + pub(crate) fn ci_iter<'a>( + &self, + txn: &'a ReadTransaction, + since: &Timestamp, + until: &Timestamp, + ) -> Result, Error> { + let start_prefix = index::make_ci_index_key(until, &EVENT_ID_ALL_ZEROS); + let end_prefix = index::make_ci_index_key(since, &EVENT_ID_ALL_255); + let range = ( + Bound::Included(start_prefix.as_slice()), + Bound::Excluded(end_prefix.as_slice()), + ); + let ci_index = txn.open_table(CI_INDEX)?; + Ok(ci_index.range::<&[u8]>(range)?) + } + + pub(crate) fn tc_iter<'a>( + &self, + txn: &'a ReadTransaction, + tag_name: &SingleLetterTag, + tag_value: &str, + since: &Timestamp, + until: &Timestamp, + ) -> Result, Error> { + let start_prefix = index::make_tc_index_key( + tag_name, + tag_value, + until, // scan goes backwards in time + &EVENT_ID_ALL_ZEROS, + ); + let end_prefix = index::make_tc_index_key(tag_name, tag_value, since, &EVENT_ID_ALL_255); + let range = ( + Bound::Included(start_prefix.as_slice()), + Bound::Excluded(end_prefix.as_slice()), + ); + let tc_index = txn.open_table(TC_INDEX)?; + Ok(tc_index.range::<&[u8]>(range)?) + } + + pub(crate) fn ac_iter<'a>( + &self, + txn: &'a ReadTransaction, + author: &[u8; 32], + since: Timestamp, + until: Timestamp, + ) -> Result, Error> { + let start_prefix = index::make_ac_index_key(author, &until, &EVENT_ID_ALL_ZEROS); + let end_prefix = index::make_ac_index_key(author, &since, &EVENT_ID_ALL_255); + let range = ( + Bound::Included(start_prefix.as_slice()), + Bound::Excluded(end_prefix.as_slice()), + ); + let ac_index = txn.open_table(AC_INDEX)?; + Ok(ac_index.range::<&[u8]>(range)?) + } + + pub(crate) fn akc_iter<'a>( + &self, + txn: &'a ReadTransaction, + author: &[u8; 32], + kind: u16, + since: Timestamp, + until: Timestamp, + ) -> Result, Error> { + let start_prefix = index::make_akc_index_key(author, kind, &until, &EVENT_ID_ALL_ZEROS); + let end_prefix = index::make_akc_index_key(author, kind, &since, &EVENT_ID_ALL_255); + let range = ( + Bound::Included(start_prefix.as_slice()), + Bound::Excluded(end_prefix.as_slice()), + ); + let akc_index = txn.open_table(AKC_INDEX)?; + Ok(akc_index.range::<&[u8]>(range)?) + } + + pub(crate) fn atc_iter<'a>( + &self, + txn: &'a ReadTransaction, + author: &[u8; 32], + tag_name: &SingleLetterTag, + tag_value: &str, + since: &Timestamp, + until: &Timestamp, + ) -> Result, Error> { + let start_prefix: Vec = index::make_atc_index_key( + author, + tag_name, + tag_value, + until, // scan goes backwards in time + &EVENT_ID_ALL_ZEROS, + ); + let end_prefix: Vec = + index::make_atc_index_key(author, tag_name, tag_value, since, &EVENT_ID_ALL_255); + let range = ( + Bound::Included(start_prefix.as_slice()), + Bound::Excluded(end_prefix.as_slice()), + ); + let atc_index = txn.open_table(ATC_INDEX)?; + Ok(atc_index.range::<&[u8]>(range)?) + } + + pub(crate) fn ktc_iter<'a>( + &self, + txn: &'a ReadTransaction, + kind: u16, + tag_name: &SingleLetterTag, + tag_value: &str, + since: &Timestamp, + until: &Timestamp, + ) -> Result, Error> { + let start_prefix = index::make_ktc_index_key( + kind, + tag_name, + tag_value, + until, // scan goes backwards in time + &EVENT_ID_ALL_ZEROS, + ); + let end_prefix = + index::make_ktc_index_key(kind, tag_name, tag_value, since, &EVENT_ID_ALL_255); + let range = ( + Bound::Included(start_prefix.as_slice()), + Bound::Excluded(end_prefix.as_slice()), + ); + let ktc_index = txn.open_table(KTC_INDEX)?; + Ok(ktc_index.range::<&[u8]>(range)?) + } +} diff --git a/crates/nostr-redb/src/store/core/wasm.rs b/crates/nostr-redb/src/store/core/wasm.rs new file mode 100644 index 000000000..a48dbdf47 --- /dev/null +++ b/crates/nostr-redb/src/store/core/wasm.rs @@ -0,0 +1,279 @@ +// Copyright (c) 2024 Michael Dilger +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2024 Rust Nostr Developers +// Distributed under the MIT software license + +use std::collections::HashSet; +use std::fmt; +use std::future::IntoFuture; +use std::io::{self, ErrorKind}; +use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +use futures::executor::block_on; +use indexed_db_futures::js_sys::JsString; +use indexed_db_futures::prelude::OpenDbRequest; +use indexed_db_futures::request::IdbOpenDbRequestLike; +use indexed_db_futures::web_sys::{DomException, IdbTransactionMode}; +use indexed_db_futures::{IdbDatabase, IdbQuerySource, IdbVersionChangeEvent}; +use nostr::util::hex; +use redb::StorageBackend; +use wasm_bindgen::{JsCast, JsValue}; + +const CURRENT_DB_VERSION: u32 = 3; +const STORE_NAME: &str = "rust-nostr-redb"; +const KEY_NAME: &str = "rust-nostr-redb-key"; + +/// Error +#[derive(Debug)] +pub enum Error { + Poison, + /// DOM error + DomException { + /// DomException code + code: u16, + /// Specific name of the DomException + name: String, + /// Message given to the DomException + message: String, + }, + Other(String), +} + +impl std::error::Error for Error {} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Poison => write!(f, "RwLock poisoned."), + Self::DomException { + name, + code, + message, + } => write!(f, "DomException {name} ({code}): {message}"), + Self::Other(msg) => write!(f, "{}", msg), + } + } +} + +impl From for Error { + fn from(frm: DomException) -> Self { + Self::DomException { + name: frm.name(), + message: frm.message(), + code: frm.code(), + } + } +} + +impl From for Error { + fn from(e: JsValue) -> Self { + Self::Other(format!("{e:?}")) + } +} + +fn into_io_err(e: Error) -> io::Error { + io::Error::new(ErrorKind::Other, e) +} + +/// Helper struct for upgrading the inner DB. +#[derive(Debug, Clone, Default)] +struct OngoingMigration { + /// Names of stores to drop. + drop_stores: HashSet<&'static str>, + /// Names of stores to create. + create_stores: HashSet<&'static str>, +} + +/// Acts as temporal in-memory database storage. +#[derive(Debug)] +pub struct IndexeddbBackend { + db: Arc, + buf: RwLock>, +} + +unsafe impl Send for IndexeddbBackend {} + +unsafe impl Sync for IndexeddbBackend {} + +impl IndexeddbBackend { + fn out_of_range() -> io::Error { + io::Error::new(ErrorKind::InvalidInput, "Index out-of-range.") + } +} + +impl IndexeddbBackend { + /// Creates a new, empty memory backend. + pub async fn open(name: &str) -> Result { + let mut db_req: OpenDbRequest = IdbDatabase::open_u32(&name, CURRENT_DB_VERSION)?; + db_req.set_on_upgrade_needed(Some( + move |evt: &IdbVersionChangeEvent| -> Result<(), JsValue> { + let mut old_version: u32 = evt.old_version() as u32; + + tracing::debug!("Database version: {old_version}"); + + if old_version <= 1 { + let migration = OngoingMigration { + create_stores: HashSet::from([STORE_NAME]), + ..Default::default() + }; + Self::apply_migration(old_version, CURRENT_DB_VERSION, migration, evt)?; + old_version = CURRENT_DB_VERSION; + } + + if old_version < 3 { + let migration = OngoingMigration { + create_stores: HashSet::from([STORE_NAME]), + ..Default::default() + }; + Self::apply_migration(old_version, CURRENT_DB_VERSION, migration, evt)?; + //old_version = CURRENT_DB_VERSION; + } + + tracing::debug!("Migration completed."); + + Ok(()) + }, + )); + + let mut this = Self { + db: Arc::new(db_req.into_future().await?), + buf: RwLock::new(Vec::new()), + }; + + this.read_buf().await?; + + Ok(this) + } + + fn apply_migration( + old_version: u32, + version: u32, + migration: OngoingMigration, + evt: &IdbVersionChangeEvent, + ) -> Result<(), DomException> { + tracing::debug!("Migrating from v{old_version} to v{version}"); + + // Changing the format can only happen in the upgrade procedure + for store in migration.drop_stores.iter() { + evt.db().delete_object_store(store)?; + } + for store in migration.create_stores.iter() { + evt.db().create_object_store(store)?; + } + + Ok(()) + } + + async fn read_buf(&mut self) -> Result<(), Error> { + tracing::debug!("Reading buffer from database..."); + + let tx = self + .db + .transaction_on_one_with_mode(STORE_NAME, IdbTransactionMode::Readonly)?; + let store = tx.object_store(STORE_NAME)?; + + if let Some(jsvalue) = store.get(&JsValue::from_str(KEY_NAME))?.await? { + if let Some(encoded) = js_value_to_string(jsvalue) { + tracing::debug!("Found buffer in database. Decoding..."); + let buf = hex::decode(encoded) + .map_err(|_| Error::Other("Failed to decode hex string".to_string()))?; + self.buf = RwLock::new(buf); + tracing::debug!("Buffer decoded."); + } + } + + Ok(()) + } + + /// Gets a read guard for this backend. + fn read(&self) -> Result>, Error> { + self.buf.read().map_err(|_| Error::Poison) + } + + /// Gets a write guard for this backend. + fn write(&self) -> Result>, Error> { + self.buf.write().map_err(|_| Error::Poison) + } +} + +impl StorageBackend for IndexeddbBackend { + fn len(&self) -> Result { + Ok(self.read().map_err(into_io_err)?.len() as u64) + } + + fn read(&self, offset: u64, len: usize) -> Result, io::Error> { + let guard = self.read().map_err(into_io_err)?; + let offset = usize::try_from(offset).map_err(|_| Self::out_of_range())?; + if offset + len <= guard.len() { + Ok(guard[offset..offset + len].to_owned()) + } else { + Err(Self::out_of_range()) + } + } + + fn set_len(&self, len: u64) -> Result<(), io::Error> { + let mut guard = self.write().map_err(into_io_err)?; + let len = usize::try_from(len).map_err(|_| Self::out_of_range())?; + if guard.len() < len { + let additional = len - guard.len(); + guard.reserve(additional); + for _ in 0..additional { + guard.push(0); + } + } else { + guard.truncate(len); + } + + Ok(()) + } + + fn sync_data(&self, _: bool) -> Result<(), io::Error> { + let guard = self.read().map_err(into_io_err)?; + + let tx = self + .db + .transaction_on_one_with_mode(STORE_NAME, IdbTransactionMode::Readwrite) + .map_err(|e| { + let e = Error::from(e); + into_io_err(e) + })?; + let store = tx.object_store(STORE_NAME).map_err(|e| { + let e = Error::from(e); + into_io_err(e) + })?; + + // Hex encode + let encoded: String = hex::encode(guard.as_slice()); + + // Store + let key = JsValue::from_str(KEY_NAME); + let value = JsValue::from(encoded); + store.put_key_val(&key, &value).map_err(|e| { + let e = Error::from(e); + into_io_err(e) + })?; + + block_on(async { tx.await.into_result() }).map_err(|e| { + let e = Error::from(e); + into_io_err(e) + })?; + + Ok(()) + } + + fn write(&self, offset: u64, data: &[u8]) -> Result<(), io::Error> { + let mut guard = self.write().map_err(into_io_err)?; + let offset = usize::try_from(offset).map_err(|_| Self::out_of_range())?; + if offset + data.len() <= guard.len() { + guard[offset..offset + data.len()].copy_from_slice(data); + Ok(()) + } else { + Err(Self::out_of_range()) + } + } +} + +fn js_value_to_string(value: JsValue) -> Option { + let s: JsString = value.dyn_into().ok()?; + Some(s.into()) +} diff --git a/crates/nostr-redb/src/store/error.rs b/crates/nostr-redb/src/store/error.rs new file mode 100644 index 000000000..9b5aa3226 --- /dev/null +++ b/crates/nostr-redb/src/store/error.rs @@ -0,0 +1,133 @@ +// Copyright (c) 2024 Michael Dilger +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2024 Rust Nostr Developers +// Distributed under the MIT software license + +use std::{fmt, io}; + +use async_utility::task::Error as JoinError; +use nostr::{key, secp256k1}; +use nostr_database::flatbuffers; + +#[cfg(target_arch = "wasm32")] +use super::core::wasm::Error as WasmError; + +#[derive(Debug)] +pub enum Error { + /// An upstream I/O error + Io(io::Error), + /// An error from LMDB + Redb(redb::DatabaseError), + /// An error from LMDB + RedbTx(redb::TransactionError), + /// An error from LMDB + RedbTable(redb::TableError), + /// An error from LMDB + RedbStorage(redb::StorageError), + /// An error from LMDB + RedbCommit(redb::CommitError), + /// Flatbuffers error + FlatBuffers(flatbuffers::Error), + Thread(JoinError), + Key(key::Error), + Secp256k1(secp256k1::Error), + #[cfg(target_arch = "wasm32")] + Wasm(WasmError), + /// Mutex poisoned + MutexPoisoned, + /// The event kind is wrong + WrongEventKind, + /// Not found + NotFound, +} + +impl std::error::Error for Error {} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Io(e) => write!(f, "{e}"), + Self::Redb(e) => write!(f, "{e}"), + Self::RedbTx(e) => write!(f, "{e}"), + Self::RedbTable(e) => write!(f, "{e}"), + Self::RedbStorage(e) => write!(f, "{e}"), + Self::RedbCommit(e) => write!(f, "{e}"), + Self::FlatBuffers(e) => write!(f, "{e}"), + Self::Thread(e) => write!(f, "{e}"), + Self::Key(e) => write!(f, "{e}"), + Self::Secp256k1(e) => write!(f, "{e}"), + #[cfg(target_arch = "wasm32")] + Self::Wasm(e) => write!(f, "{e}"), + Self::MutexPoisoned => write!(f, "mutex poisoned"), + Self::NotFound => write!(f, "Not found"), + Self::WrongEventKind => write!(f, "Wrong event kind"), + } + } +} + +impl From for Error { + fn from(e: io::Error) -> Self { + Self::Io(e) + } +} + +impl From for Error { + fn from(e: redb::DatabaseError) -> Self { + Self::Redb(e) + } +} + +impl From for Error { + fn from(e: redb::TransactionError) -> Self { + Self::RedbTx(e) + } +} + +impl From for Error { + fn from(e: redb::TableError) -> Self { + Self::RedbTable(e) + } +} + +impl From for Error { + fn from(e: redb::StorageError) -> Self { + Self::RedbStorage(e) + } +} + +impl From for Error { + fn from(e: redb::CommitError) -> Self { + Self::RedbCommit(e) + } +} + +impl From for Error { + fn from(e: flatbuffers::Error) -> Self { + Self::FlatBuffers(e) + } +} + +impl From for Error { + fn from(e: JoinError) -> Self { + Self::Thread(e) + } +} + +impl From for Error { + fn from(e: key::Error) -> Self { + Self::Key(e) + } +} + +impl From for Error { + fn from(e: secp256k1::Error) -> Self { + Self::Secp256k1(e) + } +} + +#[cfg(target_arch = "wasm32")] +impl From for Error { + fn from(e: WasmError) -> Self { + Self::Wasm(e) + } +} diff --git a/crates/nostr-redb/src/store/mod.rs b/crates/nostr-redb/src/store/mod.rs new file mode 100644 index 000000000..929c51b1d --- /dev/null +++ b/crates/nostr-redb/src/store/mod.rs @@ -0,0 +1,358 @@ +// Copyright (c) 2024 Michael Dilger +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2024 Rust Nostr Developers +// Distributed under the MIT software license + +use std::collections::BTreeSet; +#[cfg(not(target_arch = "wasm32"))] +use std::path::Path; +use std::sync::{Arc, Mutex}; + +use async_utility::task; +use nostr_database::flatbuffers::FlatBufferDecodeBorrowed; +use nostr_database::prelude::*; +use redb::{ReadTransaction, WriteTransaction}; + +use crate::store::types::AccessGuardEvent; + +mod core; +mod error; +mod types; + +use self::core::Db; +use self::error::Error; + +type Fbb = Arc>>; + +#[derive(Debug)] +pub struct Store { + db: Db, + fbb: Fbb, + persistent: bool, +} + +impl Store { + #[cfg(not(target_arch = "wasm32"))] + pub fn persistent

(path: P) -> Result + where + P: AsRef, + { + Ok(Self { + db: Db::persistent(path)?, + fbb: Arc::new(Mutex::new(FlatBufferBuilder::with_capacity(70_000))), + persistent: true, + }) + } + + #[cfg(target_arch = "wasm32")] + pub(crate) async fn web(name: &str) -> Result { + Ok(Self { + db: Db::web(name).await?, + fbb: Arc::new(Mutex::new(FlatBufferBuilder::with_capacity(70_000))), + persistent: true, + }) + } + + pub fn in_memory() -> Self { + Self { + // SAFETY: newly created database, should never panic. + db: Db::in_memory().unwrap(), + fbb: Arc::new(Mutex::new(FlatBufferBuilder::with_capacity(70_000))), + persistent: false, + } + } + + #[inline] + pub fn is_persistent(&self) -> bool { + self.persistent + } + + #[inline] + async fn interact(&self, f: F) -> Result + where + F: FnOnce(Db) -> R + Send + 'static, + R: Send + 'static, + { + let db = self.db.clone(); + Ok(task::spawn_blocking(move || f(db)).join().await?) + } + + #[inline] + async fn interact_with_fbb(&self, f: F) -> Result + where + F: FnOnce(Db, Fbb) -> R + Send + 'static, + R: Send + 'static, + { + let db = self.db.clone(); + let fbb = self.fbb.clone(); + Ok(task::spawn_blocking(move || f(db, fbb)).join().await?) + } + + /// Store an event. + pub async fn save_event(&self, event: &Event) -> Result { + if event.kind.is_ephemeral() { + return Ok(SaveEventStatus::Rejected(RejectedReason::Ephemeral)); + } + + // TODO: avoid this clone + let event = event.clone(); + + self.interact_with_fbb(move |db, fbb| { + // Acquire read transaction + let read_txn = db.read_txn()?; + + // Already exists + if db.has_event(&read_txn, event.id.as_bytes())? { + return Ok(SaveEventStatus::Rejected(RejectedReason::Duplicate)); + } + + // Reject event if ID was deleted + if db.is_deleted(&read_txn, &event.id)? { + return Ok(SaveEventStatus::Rejected(RejectedReason::Deleted)); + } + + // Reject event if ADDR was deleted after it's created_at date + // (non-parameterized or parameterized) + if let Some(coordinate) = event.coordinate() { + if let Some(time) = db.when_is_coordinate_deleted(&read_txn, &coordinate)? { + if event.created_at <= time { + return Ok(SaveEventStatus::Rejected(RejectedReason::Deleted)); + } + } + } + + // Acquire write transaction + let txn = db.write_txn()?; + + // Remove replaceable events being replaced + if event.kind.is_replaceable() { + // Find replaceable event + if let Some(stored) = + db.find_replaceable_event(&read_txn, &event.pubkey, event.kind)? + { + if stored.created_at > event.created_at { + txn.abort()?; + return Ok(SaveEventStatus::Rejected(RejectedReason::Replaced)); + } + + let coordinate: Coordinate = Coordinate::new(event.kind, event.pubkey); + db.remove_replaceable(&read_txn, &txn, &coordinate, event.created_at)?; + } + } + + // Remove parameterized replaceable events being replaced + if event.kind.is_addressable() { + if let Some(identifier) = event.tags.identifier() { + let coordinate: Coordinate = + Coordinate::new(event.kind, event.pubkey).identifier(identifier); + + // Find param replaceable event + if let Some(stored) = + db.find_parameterized_replaceable_event(&read_txn, &coordinate)? + { + if stored.created_at > event.created_at { + txn.abort()?; + return Ok(SaveEventStatus::Rejected(RejectedReason::Replaced)); + } + + db.remove_parameterized_replaceable( + &read_txn, + &txn, + &coordinate, + Timestamp::max(), + )?; + } + } + } + + // Handle deletion events + if let Kind::EventDeletion = event.kind { + let invalid: bool = Self::handle_deletion_event(&db, &read_txn, &txn, &event)?; + + if invalid { + txn.abort()?; + return Ok(SaveEventStatus::Rejected(RejectedReason::InvalidDelete)); + } + } + + // Acquire lock + let mut fbb = fbb.lock().map_err(|_| Error::MutexPoisoned)?; + + // Store and index the event + db.store(&txn, &mut fbb, &event)?; + + // Immediately drop the lock + drop(fbb); + + // Commit + read_txn.close()?; + txn.commit()?; + + Ok(SaveEventStatus::Success) + }) + .await? + } + + fn handle_deletion_event( + db: &Db, + read_txn: &ReadTransaction, + txn: &WriteTransaction, + event: &Event, + ) -> Result { + for id in event.tags.event_ids() { + if let Some(target) = db.get_event_by_id(read_txn, id.as_bytes())? { + let value = target.guard.value(); + let temp = EventBorrow::decode(value)?; + + // Author must match + if temp.pubkey != event.pubkey.as_bytes() { + return Ok(true); + } + + // Mark as deleted and remove event + db.mark_deleted(txn, id)?; + db.remove(txn, &target)?; + } + } + + for coordinate in event.tags.coordinates() { + // Author must match + if coordinate.public_key != event.pubkey { + return Ok(true); + } + + // Mark deleted + db.mark_coordinate_deleted(txn, &coordinate.borrow(), event.created_at)?; + + // Remove events (up to the created_at of the deletion event) + if coordinate.kind.is_replaceable() { + db.remove_replaceable(read_txn, txn, coordinate, event.created_at)?; + } else if coordinate.kind.is_addressable() { + db.remove_parameterized_replaceable(read_txn, txn, coordinate, event.created_at)?; + } + } + + Ok(false) + } + + /// Get an event by ID + pub async fn get_event_by_id(&self, id: &EventId) -> Result, Error> { + let bytes = id.to_bytes(); + self.interact(move |db| { + let txn = db.read_txn()?; + let event: Option = match db.get_event_by_id(&txn, &bytes)? { + Some(e) => Some(e.to_event()?), + None => None, + }; + txn.close()?; + Ok(event) + }) + .await? + } + + /// Do we have an event + pub async fn has_event(&self, id: &EventId) -> Result { + let bytes = id.to_bytes(); + self.interact(move |db| { + let txn = db.read_txn()?; + let has: bool = db.has_event(&txn, &bytes)?; + txn.close()?; + Ok(has) + }) + .await? + } + + /// Is the event deleted + pub async fn event_is_deleted(&self, id: EventId) -> Result { + self.interact(move |db| { + let txn = db.read_txn()?; + let deleted: bool = db.is_deleted(&txn, &id)?; + txn.close()?; + Ok(deleted) + }) + .await? + } + + #[inline] + pub async fn when_is_coordinate_deleted<'a>( + &self, + coordinate: &'a CoordinateBorrow<'a>, + ) -> Result, Error> { + let key: Vec = core::index::make_coordinate_index_key(coordinate); + self.interact(move |db| { + let txn = db.read_txn()?; + let when = db.when_is_coordinate_deleted_by_key(&txn, key)?; + txn.close()?; + Ok(when) + }) + .await? + } + + pub async fn count(&self, filters: Vec) -> Result { + self.interact(move |db| { + let txn = db.read_txn()?; + let output = db.query(&txn, filters)?; + let len: usize = output.len(); + //txn.close()?; + Ok(len) + }) + .await? + } + + // Lookup ID: EVENT_ORD_IMPL + pub async fn query(&self, filters: Vec) -> Result { + self.interact(move |db| { + let mut events: Events = Events::new(&filters); + + let txn: ReadTransaction = db.read_txn()?; + let output: BTreeSet = db.query(&txn, filters)?; + events.extend(output.into_iter().filter_map(|e| e.to_event().ok())); + txn.close()?; + + Ok(events) + }) + .await? + } + + pub async fn negentropy_items( + &self, + filter: Filter, + ) -> Result, Error> { + self.interact(move |db| { + let txn = db.read_txn()?; + let events = db.query(&txn, vec![filter])?; + let items = events + .into_iter() + .map(|e| (EventId::from_byte_array(e.id), e.created_at)) + .collect(); + txn.close()?; + Ok(items) + }) + .await? + } + + pub async fn delete(&self, filter: Filter) -> Result<(), Error> { + self.interact(move |db| { + let read_txn = db.read_txn()?; + let txn = db.write_txn()?; + + db.delete(&read_txn, &txn, filter)?; + + read_txn.close()?; + txn.commit()?; + + Ok(()) + }) + .await? + } + + pub async fn wipe(&self) -> Result<(), Error> { + self.interact(move |db| { + let txn = db.write_txn()?; + db.wipe(&txn)?; + txn.commit()?; + Ok(()) + }) + .await? + } +} diff --git a/crates/nostr-redb/src/store/types/event.rs b/crates/nostr-redb/src/store/types/event.rs new file mode 100644 index 000000000..e527c1165 --- /dev/null +++ b/crates/nostr-redb/src/store/types/event.rs @@ -0,0 +1,74 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2024 Rust Nostr Developers +// Distributed under the MIT software license + +use std::cmp::Ordering; + +use nostr::prelude::*; +use nostr_database::flatbuffers::FlatBufferDecodeBorrowed; +use redb::AccessGuard; + +use super::filter::DatabaseFilter; +use crate::store::Error; + +pub struct AccessGuardEvent<'a> { + pub guard: AccessGuard<'a, &'static [u8]>, + pub id: [u8; 32], + pub created_at: Timestamp, + pub kind: u16, +} + +impl PartialEq for AccessGuardEvent<'_> { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for AccessGuardEvent<'_> {} + +impl PartialOrd for AccessGuardEvent<'_> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for AccessGuardEvent<'_> { + fn cmp(&self, other: &Self) -> Ordering { + if self.created_at != other.created_at { + // Descending order + // NOT EDIT, will break many things!! + self.created_at.cmp(&other.created_at).reverse() + } else { + self.id.cmp(&other.id) + } + } +} + +impl<'a> AccessGuardEvent<'a> { + pub fn new(guard: AccessGuard<'a, &'static [u8]>) -> Result { + let value = guard.value(); + let event = EventBorrow::decode(value)?; + let id = *event.id; + let created_at = event.created_at; + let kind = event.kind; + Ok(Self { + guard, + id, + created_at, + kind, + }) + } + + pub fn match_filter(&self, filter: &DatabaseFilter) -> Result { + let value = self.guard.value(); + let event = EventBorrow::decode(value)?; + Ok(filter.match_event(&event)) + } + + #[allow(clippy::wrong_self_convention)] + pub fn to_event(self) -> Result { + let value = self.guard.value(); + let temp = EventBorrow::decode(value)?; + Ok(temp.into_owned()) + } +} diff --git a/crates/nostr-redb/src/store/types/filter.rs b/crates/nostr-redb/src/store/types/filter.rs new file mode 100644 index 000000000..29b1c5713 --- /dev/null +++ b/crates/nostr-redb/src/store/types/filter.rs @@ -0,0 +1,121 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2024 Rust Nostr Developers +// Distributed under the MIT software license + +use std::collections::{BTreeMap, BTreeSet, HashSet}; + +use nostr::event::borrow::EventBorrow; +use nostr::{Filter, SingleLetterTag, Timestamp}; + +pub struct DatabaseFilter { + pub ids: HashSet<[u8; 32]>, + pub authors: HashSet<[u8; 32]>, + pub kinds: HashSet, + /// Lowercase query + pub search: Option, + pub since: Option, + pub until: Option, + pub generic_tags: BTreeMap>, +} + +impl DatabaseFilter { + #[inline] + fn ids_match(&self, event: &EventBorrow) -> bool { + self.ids.is_empty() || self.ids.contains(event.id) + } + + #[inline] + fn authors_match(&self, event: &EventBorrow) -> bool { + self.authors.is_empty() || self.authors.contains(event.pubkey) + } + + #[inline] + fn tag_match(&self, event: &EventBorrow) -> bool { + if self.generic_tags.is_empty() { + return true; + } + + if event.tags.is_empty() { + return false; + } + + // TODO: review this code + + // Match + self.generic_tags.iter().all(|(tag_name, set)| { + event.tags.iter().any(|tag| { + if let Some((first, content)) = tag.extract() { + if tag_name == &first { + return set.contains(content); + } + } + + false + }) + }) + } + + #[inline] + fn kind_match(&self, event: &EventBorrow) -> bool { + self.kinds.is_empty() || self.kinds.contains(&event.kind) + } + + #[inline] + fn search_match(&self, event: &EventBorrow) -> bool { + match &self.search { + Some(query) => { + // NOTE: `query` was already converted to lowercase + let query: &[u8] = query.as_bytes(); + event + .content + .as_bytes() + .windows(query.len()) + .any(|window| window.eq_ignore_ascii_case(query)) + } + None => true, + } + } + + #[inline] + pub fn match_event(&self, event: &EventBorrow) -> bool { + self.ids_match(event) + && self.authors_match(event) + && self.kind_match(event) + && self.since.map_or(true, |t| event.created_at >= t) + && self.until.map_or(true, |t| event.created_at <= t) + && self.tag_match(event) + && self.search_match(event) + } +} + +impl From for DatabaseFilter { + fn from(filter: Filter) -> Self { + Self { + ids: filter + .ids + .map(|ids| ids.into_iter().map(|id| id.to_bytes()).collect()) + .unwrap_or_default(), + authors: filter + .authors + .map(|authors| { + authors + .into_iter() + .map(|pubkey| pubkey.to_bytes()) + .collect() + }) + .unwrap_or_default(), + kinds: filter + .kinds + .map(|kinds| kinds.into_iter().map(|id| id.as_u16()).collect()) + .unwrap_or_default(), + search: filter.search.map(|mut s| { + // Convert to lowercase + s.make_ascii_lowercase(); + s + }), + since: filter.since, + until: filter.until, + generic_tags: filter.generic_tags, + } + } +} diff --git a/crates/nostr-redb/src/store/types/mod.rs b/crates/nostr-redb/src/store/types/mod.rs new file mode 100644 index 000000000..29365dcae --- /dev/null +++ b/crates/nostr-redb/src/store/types/mod.rs @@ -0,0 +1,9 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2024 Rust Nostr Developers +// Distributed under the MIT software license + +mod event; +mod filter; + +pub use self::event::AccessGuardEvent; +pub use self::filter::DatabaseFilter; diff --git a/crates/nostr-relay-builder/Cargo.toml b/crates/nostr-relay-builder/Cargo.toml index 2e050b21a..143cd0256 100644 --- a/crates/nostr-relay-builder/Cargo.toml +++ b/crates/nostr-relay-builder/Cargo.toml @@ -22,6 +22,7 @@ atomic-destructor.workspace = true negentropy = { workspace = true, features = ["std"] } nostr = { workspace = true, default-features = false, features = ["std"] } nostr-database.workspace = true +nostr-redb.workspace = true tokio = { workspace = true, features = ["macros", "net", "sync"] } tracing.workspace = true diff --git a/crates/nostr-relay-builder/src/builder.rs b/crates/nostr-relay-builder/src/builder.rs index b297a4b47..d016b459c 100644 --- a/crates/nostr-relay-builder/src/builder.rs +++ b/crates/nostr-relay-builder/src/builder.rs @@ -14,6 +14,7 @@ use std::sync::Arc; use std::time::Duration; use nostr_database::prelude::*; +use nostr_redb::NostrRedb; /// Rate limit #[derive(Debug, Clone)] @@ -199,10 +200,7 @@ impl Default for RelayBuilder { Self { addr: None, port: None, - database: Arc::new(MemoryDatabase::with_opts(MemoryDatabaseOptions { - events: true, - max_events: Some(75_000), - })), + database: Arc::new(NostrRedb::in_memory()), mode: RelayBuilderMode::default(), rate_limit: RateLimit::default(), nip42: None, diff --git a/crates/nostr-sdk/Cargo.toml b/crates/nostr-sdk/Cargo.toml index 05b5611f3..b310c6128 100644 --- a/crates/nostr-sdk/Cargo.toml +++ b/crates/nostr-sdk/Cargo.toml @@ -53,6 +53,7 @@ nostr-indexeddb = { workspace = true, optional = true } [dev-dependencies] nostr-connect.workspace = true +nostr-redb.workspace = true tokio = { workspace = true, features = ["macros"] } tracing-subscriber = { workspace = true, features = ["env-filter"] } @@ -88,6 +89,9 @@ required-features = ["all-nips"] name = "nostrdb" required-features = ["ndb"] +[[example]] +name = "redb" + [[example]] name = "stream-events" diff --git a/crates/nostr-sdk/examples/redb.rs b/crates/nostr-sdk/examples/redb.rs new file mode 100644 index 000000000..3874f75a7 --- /dev/null +++ b/crates/nostr-sdk/examples/redb.rs @@ -0,0 +1,63 @@ +// Copyright (c) 2022-2023 Yuki Kishimoto +// Copyright (c) 2023-2024 Rust Nostr Developers +// Distributed under the MIT software license + +use nostr_redb::NostrRedb; +use nostr_sdk::prelude::*; + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); + + let keys = Keys::parse("nsec1ufnus6pju578ste3v90xd5m2decpuzpql2295m3sknqcjzyys9ls0qlc85")?; + + let database = NostrRedb::persistent("./db/nostr-redb")?; + let client: Client = ClientBuilder::default() + .signer(keys.clone()) + .database(database) + .build(); + + client.add_relay("wss://relay.damus.io").await?; + client.add_relay("wss://nostr.wine").await?; + client.add_relay("wss://nostr.oxtr.dev").await?; + + client.connect().await; + + // Publish a text note + let builder = EventBuilder::text_note("Hello world"); + client.send_event_builder(builder).await?; + + // Negentropy sync + let filter = Filter::new().author(keys.public_key()); + let (tx, mut rx) = SyncProgress::channel(); + let opts = SyncOptions::default().progress(tx); + + tokio::spawn(async move { + while rx.changed().await.is_ok() { + let progress = *rx.borrow_and_update(); + if progress.total > 0 { + println!("{:.2}%", progress.percentage() * 100.0); + } + } + }); + let output = client.sync(filter, &opts).await?; + + println!("Local: {}", output.local.len()); + println!("Remote: {}", output.remote.len()); + println!("Sent: {}", output.sent.len()); + println!("Received: {}", output.received.len()); + println!("Failures:"); + for (url, map) in output.send_failures.iter() { + println!("* '{url}':"); + for (id, e) in map.iter() { + println!(" - {id}: {e}"); + } + } + + // Query events from database + let filter = Filter::new().author(keys.public_key()).limit(10); + let events = client.database().query(vec![filter]).await?; + println!("Events: {events:?}"); + + Ok(()) +}