From 46900a5b45450cbaa79c714c8848465ca422b8d0 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Wed, 29 Oct 2025 18:02:10 +0100 Subject: [PATCH 01/10] dedup some Workload code --- crates/core/src/db/relational_db.rs | 30 +++++++++-------------- crates/core/src/host/module_host.rs | 21 ++++------------ crates/datastore/src/execution_context.rs | 13 ++++++++++ 3 files changed, 30 insertions(+), 34 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 4b7a533c008..f8097dec8e1 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -2865,43 +2865,37 @@ mod tests { let stdb = TestDB::durable().expect("failed to create TestDB"); let timestamp = Timestamp::now(); - let ctx = ReducerContext { - name: "abstract_concrete_proxy_factory_impl".into(), - caller_identity: Identity::__dummy(), - caller_connection_id: ConnectionId::ZERO, - timestamp, - arg_bsatn: Bytes::new(), + let workload = |name: &str| { + Workload::Reducer(ReducerContext { + name: name.into(), + caller_identity: Identity::__dummy(), + caller_connection_id: ConnectionId::ZERO, + timestamp, + arg_bsatn: Bytes::new(), + }) }; + let workload1 = workload("abstract_concrete_proxy_factory_impl"); let row_ty = ProductType::from([("le_boeuf", AlgebraicType::I32)]); let schema = table("test_table", row_ty.clone(), |builder| builder); // Create an empty transaction { - let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Reducer(ctx.clone())); + let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload1.clone()); stdb.commit_tx(tx).expect("failed to commit empty transaction"); } // Create an empty transaction pretending to be an // `__identity_connected__` call. { - let tx = stdb.begin_mut_tx( - IsolationLevel::Serializable, - Workload::Reducer(ReducerContext { - name: "__identity_connected__".into(), - caller_identity: Identity::__dummy(), - caller_connection_id: ConnectionId::ZERO, - timestamp, - arg_bsatn: Bytes::new(), - }), - ); + let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload("__identity_connected__")); stdb.commit_tx(tx) .expect("failed to commit empty __identity_connected__ transaction"); } // Create a non-empty transaction including reducer info let table_id = { - let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Reducer(ctx)); + let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload1); let table_id = stdb.create_table(&mut tx, schema).expect("failed to create table"); insert(&stdb, &mut tx, table_id, &product!(AlgebraicValue::I32(0))).expect("failed to insert row"); stdb.commit_tx(tx).expect("failed to commit tx"); diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 1fde862906f..3aef2f70c5b 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -25,7 +25,6 @@ use crate::util::jobs::{SingleCoreExecutor, WeakSingleCoreExecutor}; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; use anyhow::Context; -use bytes::Bytes; use derive_more::From; use futures::lock::Mutex; use indexmap::IndexSet; @@ -933,13 +932,11 @@ impl ModuleHost { self.call("call_identity_connected", move |inst| { let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnConnect); let stdb = &me.module.replica_ctx().relational_db; - let workload = Workload::Reducer(ReducerContext { - name: "call_identity_connected".to_owned(), - caller_identity: caller_auth.claims.identity, + let workload = Workload::reducer_no_args( + "call_identity_connected", + caller_auth.claims.identity, caller_connection_id, - timestamp: Timestamp::now(), - arg_bsatn: Bytes::new(), - }); + ); let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload); let mut mut_tx = scopeguard::guard(mut_tx, |mut_tx| { // If we crash before committing, we need to ensure that the transaction is rolled back. @@ -1033,15 +1030,7 @@ impl ModuleHost { let is_client_exist = |mut_tx: &MutTxId| mut_tx.st_client_row(caller_identity, caller_connection_id).is_some(); - let workload = || { - Workload::Reducer(ReducerContext { - name: reducer_name.to_owned(), - caller_identity, - caller_connection_id, - timestamp: Timestamp::now(), - arg_bsatn: Bytes::new(), - }) - }; + let workload = || Workload::reducer_no_args(reducer_name, caller_identity, caller_connection_id); let me = self.clone(); let stdb = me.module.replica_ctx().relational_db.clone(); diff --git a/crates/datastore/src/execution_context.rs b/crates/datastore/src/execution_context.rs index ef72cbbdf25..22b843a99a3 100644 --- a/crates/datastore/src/execution_context.rs +++ b/crates/datastore/src/execution_context.rs @@ -105,6 +105,19 @@ pub enum Workload { } impl Workload { + /// Returns a reducer workload with no arguments to the reducer + /// and the current timestamp. + pub fn reducer_no_args(name: &str, id: Identity, conn_id: ConnectionId) -> Self { + Self::Reducer(ReducerContext { + name: name.into(), + caller_identity: id, + caller_connection_id: conn_id, + timestamp: Timestamp::now(), + arg_bsatn: Bytes::new(), + }) + } + + /// Returns the workload's type/kind, without any of the data. pub fn workload_type(&self) -> WorkloadType { match self { #[cfg(any(test, feature = "test"))] From abcdeba4813489ed01be12dca7f510b82b6b2ce9 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Thu, 30 Oct 2025 13:10:50 +0100 Subject: [PATCH 02/10] move logic clear_all_clients to Instance + use a single reply channel for v8 worker --- crates/core/src/db/relational_db.rs | 13 +- crates/core/src/host/module_host.rs | 27 ++-- crates/core/src/host/v8/mod.rs | 117 ++++++++++-------- .../src/host/wasm_common/module_host_actor.rs | 9 ++ .../subscription/module_subscription_actor.rs | 7 ++ 5 files changed, 108 insertions(+), 65 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index f8097dec8e1..a56aa00ff00 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -18,7 +18,7 @@ use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; -use spacetimedb_datastore::system_tables::{system_tables, StModuleRow}; +use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID}; use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID}; use spacetimedb_datastore::traits::{ InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore, @@ -1397,12 +1397,21 @@ impl RelationalDB { self.inner.delete_by_rel_mut_tx(tx, table_id, relation) } - /// Clear all rows from a table without dropping it. + /// Clears all rows from a table without dropping it. pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result { let rows_deleted = tx.clear_table(table_id)?; Ok(rows_deleted) } + /// Empties the system tables tracking clients. + pub fn clear_all_clients(&self) -> Result<(), DBError> { + self.with_auto_commit(Workload::Internal, |mut_tx| { + self.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?; + self.clear_table(mut_tx, ST_CLIENT_ID)?; + Ok(()) + }) + } + pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result { Ok(self.inner.create_sequence_mut_tx(tx, sequence_schema)?) } diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 3aef2f70c5b..0a87e31fbff 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -37,7 +37,6 @@ use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; -use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; use spacetimedb_durability::DurableOffset; use spacetimedb_execution::pipelined::PipelinedProject; @@ -288,6 +287,10 @@ impl ModuleInfo { metrics, }) } + + pub fn relational_db(&self) -> &Arc { + self.subscriptions.relational_db() + } } /// A bidirectional map between `Identifiers` (reducer names) and `ReducerId`s. @@ -402,6 +405,13 @@ impl Instance { } } + fn clear_all_clients(&self) -> anyhow::Result<()> { + match self { + Instance::Wasm(inst) => inst.clear_all_clients(), + Instance::Js(inst) => inst.clear_all_clients(), + } + } + async fn call_procedure(&mut self, params: CallProcedureParams) -> Result { match self { Instance::Wasm(inst) => inst.call_procedure(params).await, @@ -1064,7 +1074,6 @@ impl ModuleHost { }; if let Some((reducer_id, reducer_def)) = reducer_lookup { - let stdb = me.module.replica_ctx().relational_db.clone(); let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload()); if !is_client_exist(&mut_tx) { @@ -1150,18 +1159,8 @@ impl ModuleHost { /// Empty the system tables tracking clients without running any lifecycle reducers. pub async fn clear_all_clients(&self) -> anyhow::Result<()> { - let me = self.clone(); - self.call("clear_all_clients", move |_| { - let stdb = &me.module.replica_ctx().relational_db; - let workload = Workload::Internal; - stdb.with_auto_commit(workload, |mut_tx| { - stdb.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?; - stdb.clear_table(mut_tx, ST_CLIENT_ID)?; - Ok::<(), DBError>(()) - }) - }) - .await? - .map_err(anyhow::Error::from) + self.call("clear_all_clients", move |inst| inst.clear_all_clients()) + .await? } async fn call_reducer_inner( diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 73b1ad9d76a..d62cb6a377a 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -18,7 +18,9 @@ use crate::host::{ReducerCallResult, Scheduler}; use crate::module_host_context::{ModuleCreationContext, ModuleCreationContextLimited}; use crate::replica_context::ReplicaContext; use crate::util::asyncify; +use core::any::type_name; use core::str; +use enum_as_inner::EnumAsInner; use itertools::Either; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::traits::Program; @@ -246,8 +248,7 @@ impl JsInstanceEnv { /// and cleanup the isolate and friends. pub struct JsInstance { request_tx: SyncSender, - update_response_rx: Receiver>, - call_reducer_response_rx: Receiver<(ReducerCallResult, bool)>, + reply_rx: Receiver, trapped: bool, } @@ -256,46 +257,58 @@ impl JsInstance { self.trapped } - pub fn update_database( - &mut self, - program: Program, - old_module_info: Arc, - policy: MigrationPolicy, - ) -> anyhow::Result { + /// Send a request to the worker and wait for a reply. + fn send_recv( + &self, + extract: impl FnOnce(JsWorkerReply) -> Result, + request: JsWorkerRequest, + ) -> T { // Send the request. - let request = JsWorkerRequest::UpdateDatabase { - program, - old_module_info, - policy, - }; self.request_tx .send(request) .expect("worker's `request_rx` should be live as `JsInstance::drop` hasn't happened"); // Wait for the response. - self.update_response_rx + let reply = self + .reply_rx .recv() - .expect("worker's `update_response_tx` should be live as `JsInstance::drop` hasn't happened") - } + .expect("worker's `reply_tx` should be live as `JsInstance::drop` hasn't happened"); - pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> super::ReducerCallResult { - // Send the request. - let request = JsWorkerRequest::CallReducer { tx, params }; - self.request_tx - .send(request) - .expect("worker's `request_rx` should be live as `JsInstance::drop` hasn't happened"); + match extract(reply) { + Err(err) => unreachable!("should have received {} but got {err:?}", type_name::()), + Ok(reply) => reply, + } + } - // Wait for the response. - let (response, trapped) = self - .call_reducer_response_rx - .recv() - .expect("worker's `call_reducer_response_tx` should be live as `JsInstance::drop` hasn't happened"); + pub fn update_database( + &mut self, + program: Program, + old_module_info: Arc, + policy: MigrationPolicy, + ) -> anyhow::Result { + self.send_recv( + JsWorkerReply::into_update_database, + JsWorkerRequest::UpdateDatabase { + program, + old_module_info, + policy, + }, + ) + } + pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> super::ReducerCallResult { + let (response, trapped) = self.send_recv( + JsWorkerReply::into_call_reducer, + JsWorkerRequest::CallReducer { tx, params }, + ); self.trapped = trapped; - response } + pub fn clear_all_clients(&self) -> anyhow::Result<()> { + self.send_recv(JsWorkerReply::into_clear_all_clients, JsWorkerRequest::ClearAllClients) + } + pub async fn call_procedure( &mut self, _params: CallProcedureParams, @@ -304,6 +317,14 @@ impl JsInstance { } } +/// A reply from the worker in [`spawn_instance_worker`]. +#[derive(EnumAsInner, Debug)] +enum JsWorkerReply { + UpdateDatabase(anyhow::Result), + CallReducer((ReducerCallResult, bool)), + ClearAllClients(anyhow::Result<()>), +} + /// A request for the worker in [`spawn_instance_worker`]. // We care about optimizing for `CallReducer` as it happens frequently, // so we don't want to box anything in it. @@ -320,6 +341,8 @@ enum JsWorkerRequest { tx: Option, params: CallReducerParams, }, + /// See [`JsInstance::clear_all_clients`]. + ClearAllClients, } /// Performs some of the startup work of [`spawn_instance_worker`]. @@ -377,10 +400,8 @@ fn spawn_instance_worker( // where each `.send` blocks until it's received. // The Instance --Request-> Worker channel: let (request_tx, request_rx) = mpsc::sync_channel(0); - // The Worker --UpdateResponse-> Instance channel: - let (update_response_tx, update_response_rx) = mpsc::sync_channel(0); - // The Worker --ReducerCallResult-> Instance channel: - let (call_reducer_response_tx, call_reducer_response_rx) = mpsc::sync_channel(0); + // The Worker --Reply-> Instance channel: + let (reply_tx, reply_rx) = mpsc::sync_channel(0); // This one-shot channel is used for initial startup error handling within the thread. let (result_tx, result_rx) = oneshot::channel(); @@ -422,6 +443,13 @@ fn spawn_instance_worker( // // The loop is terminated when a `JsInstance` is dropped. // This will cause channels, scopes, and the isolate to be cleaned up. + let reply = |ctx: &str, reply: JsWorkerReply| { + if let Err(e) = reply_tx.send(reply) { + // This should never happen as `JsInstance::$function` immediately + // does `.recv` on the other end of the channel. + unreachable!("should have receiver for `{ctx}` response, {e}"); + } + }; for request in request_rx.iter() { match request { JsWorkerRequest::UpdateDatabase { @@ -429,15 +457,9 @@ fn spawn_instance_worker( old_module_info, policy, } => { - // Update the database. + // Update the database and reply to `JsInstance::update_database`. let res = instance_common.update_database(replica_ctx, program, old_module_info, policy); - - // Reply to `JsInstance::update_database`. - if let Err(e) = update_response_tx.send(res) { - // This should never happen as `JsInstance::update_database` immediately - // does `.recv` on the other end of the channel. - unreachable!("should have receiver for `update_database` response, {e}"); - } + reply("update_database", JsWorkerReply::UpdateDatabase(res)); } JsWorkerRequest::CallReducer { tx, params } => { // Call the reducer. @@ -446,13 +468,11 @@ fn spawn_instance_worker( // which will cause `JsInstance` to be dropped, // which in turn results in the loop being terminated. let res = call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params); - - // Reply to `JsInstance::call_reducer`. - if let Err(e) = call_reducer_response_tx.send(res) { - // This should never happen as `JsInstance::call_reducer` immediately - // does `.recv` on the other end of the channel. - unreachable!("should have receiver for `call_reducer` response, {e}"); - } + reply("call_reducer", JsWorkerReply::CallReducer(res)); + } + JsWorkerRequest::ClearAllClients => { + let res = instance_common.clear_all_clients(); + reply("clear_all_clients", JsWorkerReply::ClearAllClients(res)); } } } @@ -463,8 +483,7 @@ fn spawn_instance_worker( res.map(|opt_mc| { let inst = JsInstance { request_tx, - update_response_rx, - call_reducer_response_rx, + reply_rx, trapped: false, }; (opt_mc, inst) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 6bf94421f2a..07e7f52083f 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -274,6 +274,10 @@ impl WasmModuleInstance { crate::callgrind_flag::invoke_allowing_callgrind(|| self.call_reducer_with_tx(tx, params)) } + pub fn clear_all_clients(&self) -> anyhow::Result<()> { + self.common.clear_all_clients() + } + pub async fn call_procedure( &mut self, params: CallProcedureParams, @@ -668,6 +672,11 @@ impl InstanceCommon { (res, outer_error) } + + /// Empty the system tables tracking clients without running any lifecycle reducers. + pub(crate) fn clear_all_clients(&self) -> anyhow::Result<()> { + self.info.relational_db().clear_all_clients().map_err(Into::into) + } } /// VM-related metrics for reducer execution. diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 1c1f492b285..f89a9f2620a 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -213,6 +213,13 @@ impl ModuleSubscriptions { ) } + /// Returns the [`RelationalDB`] of this [`ModuleSubscriptions`]. + /// + /// This is used by [`ModuleInfo`] and in turn by `InstanceCommon`. + pub fn relational_db(&self) -> &Arc { + &self.relational_db + } + // Recompute gauges to update metrics. pub fn update_gauges(&self) { let num_queries = self.subscriptions.read().calculate_gauge_stats(); From 03b3275fca614fb174065f5d11e1ec4e3ad3fef6 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Thu, 30 Oct 2025 13:23:18 +0100 Subject: [PATCH 03/10] polish init_database --- crates/core/src/host/module_host.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 0a87e31fbff..9a072f3b946 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -456,22 +456,25 @@ fn init_database( let timestamp = Timestamp::now(); let stdb = &*replica_ctx.relational_db; let logger = replica_ctx.logger.system_logger(); + let owner_identity = replica_ctx.database.owner_identity; + let host_type = replica_ctx.host_type; let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); - let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity); + let auth_ctx = AuthCtx::for_current(owner_identity); let (tx, ()) = stdb .with_auto_rollback(tx, |tx| { + // Create all in-memory tables defined by the module, + // with IDs ordered lexicographically by the table names. let mut table_defs: Vec<_> = module_def.tables().collect(); - table_defs.sort_by(|a, b| a.name.cmp(&b.name)); - + table_defs.sort_by_key(|x| &x.name); for def in table_defs { logger.info(&format!("Creating table `{}`", &def.name)); create_table_from_def(stdb, tx, module_def, def)?; } + // Create all in-memory views defined by the module. let mut view_defs: Vec<_> = module_def.views().collect(); - view_defs.sort_by(|a, b| a.name.cmp(&b.name)); - + view_defs.sort_by_key(|x| &x.name); for def in view_defs { logger.info(&format!("Creating table for view `{}`", &def.name)); create_table_from_view_def(stdb, tx, module_def, def)?; @@ -489,7 +492,7 @@ fn init_database( .with_context(|| format!("failed to create row-level security for table `{table_id}`: `{sql}`",))?; } - stdb.set_initialized(tx, replica_ctx.host_type, program)?; + stdb.set_initialized(tx, host_type, program)?; anyhow::Ok(()) }) @@ -505,12 +508,11 @@ fn init_database( Some((reducer_id, _)) => { logger.info("Invoking `init` reducer"); - let caller_identity = replica_ctx.database.owner_identity; Some(inst.call_reducer( Some(tx), CallReducerParams { timestamp, - caller_identity, + caller_identity: owner_identity, caller_connection_id: ConnectionId::ZERO, client: None, request_id: None, From 9ad7e60fd96215bfe18fa52a5a2807ab45e18336 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Thu, 30 Oct 2025 13:29:45 +0100 Subject: [PATCH 04/10] add 'CallReducerParams::from_system' --- crates/core/src/host/module_host.rs | 37 +++++++++++++++++++---------- crates/core/src/host/scheduler.rs | 22 ++++++----------- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 9a072f3b946..7f08cc1b0a2 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -508,19 +508,8 @@ fn init_database( Some((reducer_id, _)) => { logger.info("Invoking `init` reducer"); - Some(inst.call_reducer( - Some(tx), - CallReducerParams { - timestamp, - caller_identity: owner_identity, - caller_connection_id: ConnectionId::ZERO, - client: None, - request_id: None, - timer: None, - reducer_id, - args: ArgsTuple::nullary(), - }, - )) + let params = CallReducerParams::from_system(timestamp, owner_identity, reducer_id, ArgsTuple::nullary()); + Some(inst.call_reducer(Some(tx), params)) } }; @@ -539,6 +528,28 @@ pub struct CallReducerParams { pub args: ArgsTuple, } +impl CallReducerParams { + /// Returns a set of parameters for a call that came from within + /// and without a client/caller/request_id. + pub fn from_system( + timestamp: Timestamp, + caller_identity: Identity, + reducer_id: ReducerId, + args: ArgsTuple, + ) -> Self { + Self { + timestamp, + caller_identity, + caller_connection_id: ConnectionId::ZERO, + client: None, + request_id: None, + timer: None, + reducer_id, + args, + } + } +} + pub struct CallProcedureParams { pub timestamp: Timestamp, pub caller_identity: Identity, diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index cc6ee632154..32b403dbc1b 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -355,16 +355,12 @@ impl SchedulerActor { // at least the timestamp it was scheduled to run at. let timestamp = at.max(Timestamp::now()); - Ok(Some(CallReducerParams { + Ok(Some(CallReducerParams::from_system( timestamp, caller_identity, - caller_connection_id: ConnectionId::ZERO, - client: None, - request_id: None, - timer: None, reducer_id, - args: reducer_args, - })) + reducer_args, + ))) } QueueItem::VolatileNonatomicImmediate { reducer_name, args } => { let (reducer_id, reducer_seed) = module_info @@ -373,16 +369,12 @@ impl SchedulerActor { .ok_or_else(|| anyhow!("Reducer not found: {reducer_name}"))?; let reducer_args = args.into_tuple(reducer_seed)?; - Ok(Some(CallReducerParams { - timestamp: Timestamp::now(), + Ok(Some(CallReducerParams::from_system( + Timestamp::now(), caller_identity, - caller_connection_id: ConnectionId::ZERO, - client: None, - request_id: None, - timer: None, reducer_id, - args: reducer_args, - })) + reducer_args, + ))) } }; From d90874f7c9c028699af5a136fb9e117828b07269 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Thu, 30 Oct 2025 14:39:56 +0100 Subject: [PATCH 05/10] move init_database work into Instance --- crates/core/src/host/module_host.rs | 43 ++++++++++++----- crates/core/src/host/scheduler.rs | 1 - crates/core/src/host/v8/mod.rs | 48 ++++++++++++++----- .../src/host/wasm_common/module_host_actor.rs | 30 ++++++++---- 4 files changed, 88 insertions(+), 34 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 7f08cc1b0a2..62a4555f953 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -418,6 +418,13 @@ impl Instance { Instance::Js(inst) => inst.call_procedure(params).await, } } + + fn init_database(&mut self, program: Program) -> anyhow::Result> { + match self { + Instance::Wasm(inst) => inst.init_database(program), + Instance::Js(inst) => inst.init_database(program), + } + } } /// Creates the table for `table_def` in `stdb`. @@ -445,13 +452,26 @@ pub fn create_table_from_view_def( Ok(()) } -/// If the module instance's replica_ctx is uninitialized, initialize it. -fn init_database( +/// If the module instance's `replica_ctx` is uninitialized, initialize it. +pub(crate) fn init_database( replica_ctx: &ReplicaContext, module_def: &ModuleDef, - inst: &mut Instance, program: Program, -) -> anyhow::Result> { + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), +) -> (anyhow::Result>, bool) { + let res = init_database_inner(replica_ctx, module_def, program, call_reducer); + match res { + Ok((x, y)) => (Ok(x), y), + Err(x) => (Err(x), false), + } +} + +fn init_database_inner( + replica_ctx: &ReplicaContext, + module_def: &ModuleDef, + program: Program, + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), +) -> anyhow::Result<(Option, bool)> { log::debug!("init database"); let timestamp = Timestamp::now(); let stdb = &*replica_ctx.relational_db; @@ -503,13 +523,14 @@ fn init_database( if let Some((_tx_offset, tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); } - None + (None, false) } Some((reducer_id, _)) => { logger.info("Invoking `init` reducer"); let params = CallReducerParams::from_system(timestamp, owner_identity, reducer_id, ArgsTuple::nullary()); - Some(inst.call_reducer(Some(tx), params)) + let (res, trapped) = call_reducer(Some(tx), params); + (Some(res), trapped) } }; @@ -1407,13 +1428,9 @@ impl ModuleHost { } pub async fn init_database(&self, program: Program) -> Result, InitDatabaseError> { - let replica_ctx = self.module.replica_ctx().clone(); - let info = self.info.clone(); - self.call("", move |inst| { - init_database(&replica_ctx, &info.module_def, inst, program) - }) - .await? - .map_err(InitDatabaseError::Other) + self.call("", move |inst| inst.init_database(program)) + .await? + .map_err(InitDatabaseError::Other) } pub async fn update_database( diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 32b403dbc1b..283561a5edd 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -6,7 +6,6 @@ use futures::StreamExt; use rustc_hash::FxHashMap; use spacetimedb_client_api_messages::energy::EnergyQuanta; use spacetimedb_lib::scheduler::ScheduleAt; -use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Timestamp; use spacetimedb_primitives::{ColId, TableId}; use spacetimedb_sats::{bsatn::ToBsatn as _, AlgebraicValue}; diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index d62cb6a377a..08d34296484 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -10,7 +10,7 @@ use super::module_common::{build_common_module_from_raw, run_describer, ModuleCo use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime}; use super::UpdateDatabaseResult; use crate::host::instance_env::{ChunkPool, InstanceEnv}; -use crate::host::module_host::Instance; +use crate::host::module_host::{init_database, Instance}; use crate::host::wasm_common::instrumentation::CallTimes; use crate::host::wasm_common::module_host_actor::{DescribeError, ExecuteResult, ExecutionTimings, InstanceCommon}; use crate::host::wasm_common::{RowIters, TimingSpanSet}; @@ -280,8 +280,15 @@ impl JsInstance { } } + /// Performs `work` that may trap the worker. + fn can_trap(&mut self, work: impl FnOnce(&mut Self) -> (T, bool)) -> T { + let (x, trapped) = work(self); + self.trapped = trapped; + x + } + pub fn update_database( - &mut self, + &self, program: Program, old_module_info: Arc, policy: MigrationPolicy, @@ -296,19 +303,28 @@ impl JsInstance { ) } - pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> super::ReducerCallResult { - let (response, trapped) = self.send_recv( - JsWorkerReply::into_call_reducer, - JsWorkerRequest::CallReducer { tx, params }, - ); - self.trapped = trapped; - response + pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { + self.can_trap(|this| { + this.send_recv( + JsWorkerReply::into_call_reducer, + JsWorkerRequest::CallReducer { tx, params }, + ) + }) } pub fn clear_all_clients(&self) -> anyhow::Result<()> { self.send_recv(JsWorkerReply::into_clear_all_clients, JsWorkerRequest::ClearAllClients) } + pub fn init_database(&mut self, program: Program) -> anyhow::Result> { + self.can_trap(|this| { + this.send_recv( + JsWorkerReply::into_init_database, + JsWorkerRequest::InitDatabase(program), + ) + }) + } + pub async fn call_procedure( &mut self, _params: CallProcedureParams, @@ -323,6 +339,7 @@ enum JsWorkerReply { UpdateDatabase(anyhow::Result), CallReducer((ReducerCallResult, bool)), ClearAllClients(anyhow::Result<()>), + InitDatabase((anyhow::Result>, bool)), } /// A request for the worker in [`spawn_instance_worker`]. @@ -343,6 +360,8 @@ enum JsWorkerRequest { }, /// See [`JsInstance::clear_all_clients`]. ClearAllClients, + /// See [`JsInstance::init_database`]. + InitDatabase(Program), } /// Performs some of the startup work of [`spawn_instance_worker`]. @@ -474,12 +493,19 @@ fn spawn_instance_worker( let res = instance_common.clear_all_clients(); reply("clear_all_clients", JsWorkerReply::ClearAllClients(res)); } + JsWorkerRequest::InitDatabase(program) => { + let call_reducer = |tx, params| { + call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params) + }; + let res = init_database(replica_ctx, &module_common.info().module_def, program, call_reducer); + reply("init_database", JsWorkerReply::InitDatabase(res)); + } } } }); // Get the module, if any, and get any setup errors from the worker. - let res = result_rx.blocking_recv().expect("should have a sender"); + let res: Result = result_rx.blocking_recv().expect("should have a sender"); res.map(|opt_mc| { let inst = JsInstance { request_tx, @@ -600,7 +626,7 @@ fn call_reducer<'scope>( fun: Local<'scope, Function>, tx: Option, params: CallReducerParams, -) -> (super::ReducerCallResult, bool) { +) -> (ReducerCallResult, bool) { let mut trapped = false; let (res, _) = instance_common.call_reducer_with_tx( diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 07e7f52083f..d77d61cdbdc 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -19,8 +19,8 @@ use crate::host::module_host::{ CallProcedureParams, CallReducerParams, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, }; use crate::host::{ - ArgsTuple, ProcedureCallError, ProcedureCallResult, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, - UpdateDatabaseResult, + init_database, ArgsTuple, ProcedureCallError, ProcedureCallResult, ReducerCallResult, ReducerId, ReducerOutcome, + Scheduler, }; use crate::identity::Identity; use crate::messages::control_db::HostType; @@ -271,13 +271,24 @@ impl WasmModuleInstance { } pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { - crate::callgrind_flag::invoke_allowing_callgrind(|| self.call_reducer_with_tx(tx, params)) + let (res, trapped) = self.call_reducer_inner(tx, params); + self.trapped = trapped; + res } pub fn clear_all_clients(&self) -> anyhow::Result<()> { self.common.clear_all_clients() } + pub fn init_database(&mut self, program: Program) -> anyhow::Result> { + let module_def = &self.common.info.clone().module_def; + let replica_ctx = &self.instance.instance_env().replica_ctx.clone(); + let call_reducer = |tx, params| self.call_reducer_inner(tx, params); + let (res, trapped) = init_database(replica_ctx, module_def, program, call_reducer); + self.trapped = trapped; + res + } + pub async fn call_procedure( &mut self, params: CallProcedureParams, @@ -293,14 +304,17 @@ impl WasmModuleInstance { if res.is_err() { self.trapped = true; } - res } } impl WasmModuleInstance { + fn call_reducer_inner(&mut self, tx: Option, params: CallReducerParams) -> (ReducerCallResult, bool) { + crate::callgrind_flag::invoke_allowing_callgrind(|| self.call_reducer_with_tx(tx, params)) + } + #[tracing::instrument(level = "trace", skip_all)] - fn call_reducer_with_tx(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { - let (res, trapped) = self.common.call_reducer_with_tx( + fn call_reducer_with_tx(&mut self, tx: Option, params: CallReducerParams) -> (ReducerCallResult, bool) { + self.common.call_reducer_with_tx( &self.instance.instance_env().replica_ctx.clone(), tx, params, @@ -312,9 +326,7 @@ impl WasmModuleInstance { .clone() .set(tx, || self.instance.call_reducer(op, budget)) }, - ); - self.trapped = trapped; - res + ) } } From efb330dbd407439a404ef38e7a6580fba187292c Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Mon, 3 Nov 2025 14:05:54 +0100 Subject: [PATCH 06/10] move call_scheduled_reducer to Instance --- crates/core/src/host/module_host.rs | 141 ++++++++++-------- crates/core/src/host/scheduler.rs | 114 +++++++------- crates/core/src/host/v8/mod.rs | 25 +++- .../src/host/wasm_common/module_host_actor.rs | 13 +- 4 files changed, 176 insertions(+), 117 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 62a4555f953..f345882e998 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -10,6 +10,7 @@ use crate::energy::EnergyQuanta; use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::hash::Hash; +use crate::host::scheduler::{handle_queued_call_reducer_params, QueueItem}; use crate::host::InvalidFunctionArguments; use crate::identity::Identity; use crate::messages::control_db::{Database, HostType}; @@ -419,6 +420,13 @@ impl Instance { } } + fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result { + match self { + Instance::Wasm(inst) => inst.call_scheduled_reducer(item), + Instance::Js(inst) => inst.call_scheduled_reducer(item), + } + } + fn init_database(&mut self, program: Program) -> anyhow::Result> { match self { Instance::Wasm(inst) => inst.init_database(program), @@ -452,6 +460,14 @@ pub fn create_table_from_view_def( Ok(()) } +/// Moves out the `trapped: bool` from `res`. +fn extract_trapped(res: Result<(T, bool), E>) -> (Result, bool) { + match res { + Ok((x, t)) => (Ok(x), t), + Err(x) => (Err(x), false), + } +} + /// If the module instance's `replica_ctx` is uninitialized, initialize it. pub(crate) fn init_database( replica_ctx: &ReplicaContext, @@ -459,11 +475,7 @@ pub(crate) fn init_database( program: Program, call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), ) -> (anyhow::Result>, bool) { - let res = init_database_inner(replica_ctx, module_def, program, call_reducer); - match res { - Ok((x, y)) => (Ok(x), y), - Err(x) => (Err(x), false), - } + extract_trapped(init_database_inner(replica_ctx, module_def, program, call_reducer)) } fn init_database_inner( @@ -538,6 +550,57 @@ fn init_database_inner( Ok(rcr) } +// Only for logging purposes. +const SCHEDULED_REDUCER: &str = "scheduled_reducer"; + +pub(crate) fn call_scheduled_reducer( + module: &ModuleInfo, + queue_item: QueueItem, + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), +) -> (Result, bool) { + extract_trapped(call_scheduled_reducer_inner(module, queue_item, call_reducer)) +} + +fn call_scheduled_reducer_inner( + module: &ModuleInfo, + item: QueueItem, + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), +) -> Result<(ReducerCallResult, bool), ReducerCallError> { + let db = &module.relational_db(); + let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); + + match handle_queued_call_reducer_params(&tx, module, db, item) { + Ok(Some(params)) => { + // Is necessary to patch the context with the actual calling reducer + let reducer_def = module + .module_def + .get_reducer_by_id(params.reducer_id) + .ok_or(ReducerCallError::ScheduleReducerNotFound)?; + let reducer = &*reducer_def.name; + + tx.ctx = ExecutionContext::with_workload( + tx.ctx.database_identity(), + Workload::Reducer(ReducerContext { + name: reducer.into(), + caller_identity: params.caller_identity, + caller_connection_id: params.caller_connection_id, + timestamp: Timestamp::now(), + arg_bsatn: params.args.get_bsatn().clone(), + }), + ); + + Ok(call_reducer(Some(tx), params)) + } + Ok(None) => Err(ReducerCallError::ScheduleReducerNotFound), + Err(err) => Err(ReducerCallError::Args(InvalidReducerArguments( + InvalidFunctionArguments { + err, + function_name: SCHEDULED_REDUCER.into(), + }, + ))), + } +} + pub struct CallReducerParams { pub timestamp: Timestamp, pub caller_identity: Identity, @@ -1211,22 +1274,20 @@ impl ModuleHost { let reducer_seed = ArgsSeed(self.info.module_def.typespace().with_type(reducer_def)); let args = args.into_tuple(reducer_seed).map_err(InvalidReducerArguments)?; let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO); + let call_reducer_params = CallReducerParams { + timestamp: Timestamp::now(), + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_id, + args, + }; Ok(self .call(&reducer_def.name, move |inst| { - inst.call_reducer( - None, - CallReducerParams { - timestamp: Timestamp::now(), - caller_identity, - caller_connection_id, - client, - request_id, - timer, - reducer_id, - args, - }, - ) + inst.call_reducer(None, call_reducer_params) }) .await?) } @@ -1378,47 +1439,9 @@ impl ModuleHost { // Scheduled reducers require a different function here to call their reducer // because their reducer arguments are stored in the database and need to be fetched // within the same transaction as the reducer call. - pub async fn call_scheduled_reducer( - &self, - call_reducer_params: impl FnOnce(&MutTxId) -> anyhow::Result> + Send + 'static, - ) -> Result { - let db = self.module.replica_ctx().relational_db.clone(); - // scheduled reducer name not fetched yet, anyway this is only for logging purpose - const REDUCER: &str = "scheduled_reducer"; - let module = self.info.clone(); - self.call(REDUCER, move |inst: &mut Instance| { - let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); - - match call_reducer_params(&mut tx) { - Ok(Some(params)) => { - // Is necessary to patch the context with the actual calling reducer - let reducer_def = module - .module_def - .get_reducer_by_id(params.reducer_id) - .ok_or(ReducerCallError::ScheduleReducerNotFound)?; - let reducer = &*reducer_def.name; - - tx.ctx = ExecutionContext::with_workload( - tx.ctx.database_identity(), - Workload::Reducer(ReducerContext { - name: reducer.into(), - caller_identity: params.caller_identity, - caller_connection_id: params.caller_connection_id, - timestamp: Timestamp::now(), - arg_bsatn: params.args.get_bsatn().clone(), - }), - ); - - Ok(inst.call_reducer(Some(tx), params)) - } - Ok(None) => Err(ReducerCallError::ScheduleReducerNotFound), - Err(err) => Err(ReducerCallError::Args(InvalidReducerArguments( - InvalidFunctionArguments { - err, - function_name: REDUCER.into(), - }, - ))), - } + pub(crate) async fn call_scheduled_reducer(&self, item: QueueItem) -> Result { + self.call(SCHEDULED_REDUCER, move |inst: &mut Instance| { + inst.call_scheduled_reducer(item) }) .await? } diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 283561a5edd..f99ee1d7aab 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -15,6 +15,7 @@ use tokio::time::Instant; use tokio_util::time::delay_queue::{self, DelayQueue, Expired}; use crate::db::relational_db::RelationalDB; +use crate::host::module_host::ModuleInfo; use super::module_host::ModuleEvent; use super::module_host::ModuleFunctionCall; @@ -264,7 +265,7 @@ struct SchedulerActor { module_host: WeakModuleHost, } -enum QueueItem { +pub(crate) enum QueueItem { Id { id: ScheduledReducerId, at: Timestamp }, VolatileNonatomicImmediate { reducer_name: String, args: FunctionArgs }, } @@ -325,62 +326,10 @@ impl SchedulerActor { let Some(module_host) = self.module_host.upgrade() else { return; }; - let db = module_host.replica_ctx().relational_db.clone(); - let caller_identity = module_host.info().database_identity; - let module_info = module_host.info.clone(); - - let call_reducer_params = move |tx: &MutTxId| match item { - QueueItem::Id { id, at } => { - let Ok(schedule_row) = get_schedule_row_mut(tx, &db, id) else { - // if the row is not found, it means the schedule is cancelled by the user - log::debug!( - "table row corresponding to yield scheduler id not found: tableid {}, schedulerId {}", - id.table_id, - id.schedule_id - ); - return Ok(None); - }; - - let ScheduledReducer { reducer, bsatn_args } = process_schedule(tx, &db, id.table_id, &schedule_row)?; - - let (reducer_id, reducer_seed) = module_info - .module_def - .reducer_arg_deserialize_seed(&reducer[..]) - .ok_or_else(|| anyhow!("Reducer not found: {reducer}"))?; - - let reducer_args = FunctionArgs::Bsatn(bsatn_args.into()).into_tuple(reducer_seed)?; - - // the timestamp we tell the reducer it's running at will be - // at least the timestamp it was scheduled to run at. - let timestamp = at.max(Timestamp::now()); - - Ok(Some(CallReducerParams::from_system( - timestamp, - caller_identity, - reducer_id, - reducer_args, - ))) - } - QueueItem::VolatileNonatomicImmediate { reducer_name, args } => { - let (reducer_id, reducer_seed) = module_info - .module_def - .reducer_arg_deserialize_seed(&reducer_name[..]) - .ok_or_else(|| anyhow!("Reducer not found: {reducer_name}"))?; - let reducer_args = args.into_tuple(reducer_seed)?; - - Ok(Some(CallReducerParams::from_system( - Timestamp::now(), - caller_identity, - reducer_id, - reducer_args, - ))) - } - }; - let db = module_host.replica_ctx().relational_db.clone(); let module_host_clone = module_host.clone(); - let res = tokio::spawn(async move { module_host.call_scheduled_reducer(call_reducer_params).await }).await; + let res = tokio::spawn(async move { module_host.call_scheduled_reducer(item).await }).await; match res { // if we didn't actually call the reducer because the module exited or it was already deleted, leave @@ -458,6 +407,63 @@ impl SchedulerActor { } } +pub(crate) fn handle_queued_call_reducer_params( + tx: &MutTxId, + module_info: &ModuleInfo, + db: &RelationalDB, + item: QueueItem, +) -> anyhow::Result> { + let caller_identity = module_info.database_identity; + + match item { + QueueItem::Id { id, at } => { + let Ok(schedule_row) = get_schedule_row_mut(tx, db, id) else { + // if the row is not found, it means the schedule is cancelled by the user + log::debug!( + "table row corresponding to yield scheduler id not found: tableid {}, schedulerId {}", + id.table_id, + id.schedule_id + ); + return Ok(None); + }; + + let ScheduledReducer { reducer, bsatn_args } = process_schedule(tx, db, id.table_id, &schedule_row)?; + + let (reducer_id, reducer_seed) = module_info + .module_def + .reducer_arg_deserialize_seed(&reducer[..]) + .ok_or_else(|| anyhow!("Reducer not found: {reducer}"))?; + + let reducer_args = FunctionArgs::Bsatn(bsatn_args.into()).into_tuple(reducer_seed)?; + + // the timestamp we tell the reducer it's running at will be + // at least the timestamp it was scheduled to run at. + let timestamp = at.max(Timestamp::now()); + + Ok(Some(CallReducerParams::from_system( + timestamp, + caller_identity, + reducer_id, + reducer_args, + ))) + } + QueueItem::VolatileNonatomicImmediate { reducer_name, args } => { + let (reducer_id, reducer_seed) = module_info + .module_def + .reducer_arg_deserialize_seed(&reducer_name[..]) + .ok_or_else(|| anyhow!("Reducer not found: {reducer_name}"))?; + let reducer_args = args.into_tuple(reducer_seed)?; + + Ok(Some(CallReducerParams::from_system( + Timestamp::now(), + caller_identity, + reducer_id, + reducer_args, + ))) + } + } +} + fn commit_and_broadcast_deletion_event(tx: MutTxId, module_host: ModuleHost) { let caller_identity = module_host.info().database_identity; diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 08d34296484..c20755e17cd 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -10,11 +10,12 @@ use super::module_common::{build_common_module_from_raw, run_describer, ModuleCo use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime}; use super::UpdateDatabaseResult; use crate::host::instance_env::{ChunkPool, InstanceEnv}; -use crate::host::module_host::{init_database, Instance}; +use crate::host::module_host::{call_scheduled_reducer, init_database, Instance}; +use crate::host::scheduler::QueueItem; use crate::host::wasm_common::instrumentation::CallTimes; use crate::host::wasm_common::module_host_actor::{DescribeError, ExecuteResult, ExecutionTimings, InstanceCommon}; use crate::host::wasm_common::{RowIters, TimingSpanSet}; -use crate::host::{ReducerCallResult, Scheduler}; +use crate::host::{ReducerCallError, ReducerCallResult, Scheduler}; use crate::module_host_context::{ModuleCreationContext, ModuleCreationContextLimited}; use crate::replica_context::ReplicaContext; use crate::util::asyncify; @@ -316,6 +317,15 @@ impl JsInstance { self.send_recv(JsWorkerReply::into_clear_all_clients, JsWorkerRequest::ClearAllClients) } + pub(crate) fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result { + self.can_trap(move |this| { + this.send_recv( + JsWorkerReply::into_call_scheduled_reducer, + JsWorkerRequest::CallScheduledReducer(item), + ) + }) + } + pub fn init_database(&mut self, program: Program) -> anyhow::Result> { self.can_trap(|this| { this.send_recv( @@ -339,6 +349,7 @@ enum JsWorkerReply { UpdateDatabase(anyhow::Result), CallReducer((ReducerCallResult, bool)), ClearAllClients(anyhow::Result<()>), + CallScheduledReducer((Result, bool)), InitDatabase((anyhow::Result>, bool)), } @@ -360,6 +371,8 @@ enum JsWorkerRequest { }, /// See [`JsInstance::clear_all_clients`]. ClearAllClients, + /// See [`JsInstance::call_scheduled_reducer`]. + CallScheduledReducer(QueueItem), /// See [`JsInstance::init_database`]. InitDatabase(Program), } @@ -493,6 +506,14 @@ fn spawn_instance_worker( let res = instance_common.clear_all_clients(); reply("clear_all_clients", JsWorkerReply::ClearAllClients(res)); } + JsWorkerRequest::CallScheduledReducer(queue_item) => { + let call_reducer = |tx, params| { + call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params) + }; + let module = module_common.info(); + let res = call_scheduled_reducer(&module, queue_item, call_reducer); + reply("call_scheduled_reducer", JsWorkerReply::CallScheduledReducer(res)); + } JsWorkerRequest::InitDatabase(program) => { let call_reducer = |tx, params| { call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index d77d61cdbdc..696118c1298 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -18,9 +18,10 @@ use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; use crate::host::module_host::{ CallProcedureParams, CallReducerParams, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, }; +use crate::host::scheduler::QueueItem; use crate::host::{ - init_database, ArgsTuple, ProcedureCallError, ProcedureCallResult, ReducerCallResult, ReducerId, ReducerOutcome, - Scheduler, + call_scheduled_reducer, init_database, ArgsTuple, ProcedureCallError, ProcedureCallResult, ReducerCallResult, + ReducerId, ReducerOutcome, Scheduler, }; use crate::identity::Identity; use crate::messages::control_db::HostType; @@ -280,6 +281,14 @@ impl WasmModuleInstance { self.common.clear_all_clients() } + pub(crate) fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result { + let module = &self.common.info.clone(); + let call_reducer = |tx, params| self.call_reducer_inner(tx, params); + let (res, trapped) = call_scheduled_reducer(module, item, call_reducer); + self.trapped = trapped; + res + } + pub fn init_database(&mut self, program: Program) -> anyhow::Result> { let module_def = &self.common.info.clone().module_def; let replica_ctx = &self.instance.instance_env().replica_ctx.clone(); From c8ed4d41eec47cad2934bf3952f5c18a9eace550 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 4 Nov 2025 10:58:49 +0100 Subject: [PATCH 07/10] move call_identity_connected to Instance --- crates/core/src/host/module_host.rs | 211 +++++++++++------- crates/core/src/host/v8/mod.rs | 37 ++- .../src/host/wasm_common/module_host_actor.rs | 18 +- 3 files changed, 179 insertions(+), 87 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index f345882e998..2665793999b 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -413,10 +413,14 @@ impl Instance { } } - async fn call_procedure(&mut self, params: CallProcedureParams) -> Result { + fn call_identity_connected( + &mut self, + caller_auth: ConnectionAuthCtx, + caller_connection_id: ConnectionId, + ) -> Result<(), ClientConnectedError> { match self { - Instance::Wasm(inst) => inst.call_procedure(params).await, - Instance::Js(inst) => inst.call_procedure(params).await, + Instance::Wasm(inst) => inst.call_identity_connected(caller_auth, caller_connection_id), + Instance::Js(inst) => inst.call_identity_connected(caller_auth, caller_connection_id), } } @@ -433,6 +437,13 @@ impl Instance { Instance::Js(inst) => inst.init_database(program), } } + + async fn call_procedure(&mut self, params: CallProcedureParams) -> Result { + match self { + Instance::Wasm(inst) => inst.call_procedure(params).await, + Instance::Js(inst) => inst.call_procedure(params).await, + } + } } /// Creates the table for `table_def` in `stdb`. @@ -550,6 +561,95 @@ fn init_database_inner( Ok(rcr) } +pub fn call_identity_connected( + caller_auth: ConnectionAuthCtx, + caller_connection_id: ConnectionId, + module: &ModuleInfo, + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), + trapped_slot: &mut bool, +) -> Result<(), ClientConnectedError> { + let reducer_lookup = module.module_def.lifecycle_reducer(Lifecycle::OnConnect); + let stdb = module.relational_db(); + let workload = Workload::reducer_no_args( + "call_identity_connected", + caller_auth.claims.identity, + caller_connection_id, + ); + let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload); + let mut mut_tx = scopeguard::guard(mut_tx, |mut_tx| { + // If we crash before committing, we need to ensure that the transaction is rolled back. + // This is necessary to avoid leaving the database in an inconsistent state. + log::debug!("call_identity_connected: rolling back transaction"); + let (_, metrics, reducer_name) = mut_tx.rollback(); + stdb.report_mut_tx_metrics(reducer_name, metrics, None); + }); + + mut_tx + .insert_st_client( + caller_auth.claims.identity, + caller_connection_id, + &caller_auth.jwt_payload, + ) + .map_err(DBError::from)?; + + if let Some((reducer_id, reducer_def)) = reducer_lookup { + // The module defined a lifecycle reducer to handle new connections. + // Call this reducer. + // If the call fails (as in, something unexpectedly goes wrong with guest execution), + // abort the connection: we can't really recover. + let tx = Some(ScopeGuard::into_inner(mut_tx)); + let params = ModuleHost::call_reducer_params( + module, + caller_auth.claims.identity, + Some(caller_connection_id), + None, + None, + None, + reducer_id, + reducer_def, + FunctionArgs::Nullary, + ) + .map_err(ReducerCallError::from)?; + let (reducer_outcome, trapped) = call_reducer(tx, params); + *trapped_slot = trapped; + + match reducer_outcome.outcome { + // If the reducer committed successfully, we're done. + // `WasmModuleInstance::call_reducer_with_tx` has already ensured + // that `st_client` is updated appropriately. + // + // It's necessary to spread out the responsibility for updating `st_client` in this way + // because it's important that `call_identity_connected` commit at most one transaction. + // A naive implementation of this method would just run the reducer first, + // then insert into `st_client`, + // but if we crashed in between, we'd be left in an inconsistent state + // where the reducer had run but `st_client` was not yet updated. + ReducerOutcome::Committed => Ok(()), + + // If the reducer returned an error or couldn't run due to insufficient energy, + // abort the connection: the module code has decided it doesn't want this client. + ReducerOutcome::Failed(message) => Err(ClientConnectedError::Rejected(message)), + ReducerOutcome::BudgetExceeded => Err(ClientConnectedError::OutOfEnergy), + } + } else { + // The module doesn't define a client_connected reducer. + // We need to commit the transaction to update st_clients and st_connection_credentials. + // + // This is necessary to be able to disconnect clients after a server crash. + + // TODO: Is this being broadcast? Does it need to be, or are st_client table subscriptions + // not allowed? + // I (jsdt) don't think it was being broadcast previously. See: + // https://github.com/clockworklabs/SpacetimeDB/issues/3130 + stdb.finish_tx(ScopeGuard::into_inner(mut_tx), Ok(())) + .map_err(|e: DBError| { + log::error!("`call_identity_connected`: finish transaction failed: {e:#?}"); + ClientConnectedError::DBError(e) + })?; + Ok(()) + } +} + // Only for logging purposes. const SCHEDULED_REDUCER: &str = "scheduled_reducer"; @@ -1035,85 +1135,8 @@ impl ModuleHost { caller_auth: ConnectionAuthCtx, caller_connection_id: ConnectionId, ) -> Result<(), ClientConnectedError> { - let me = self.clone(); self.call("call_identity_connected", move |inst| { - let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnConnect); - let stdb = &me.module.replica_ctx().relational_db; - let workload = Workload::reducer_no_args( - "call_identity_connected", - caller_auth.claims.identity, - caller_connection_id, - ); - let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload); - let mut mut_tx = scopeguard::guard(mut_tx, |mut_tx| { - // If we crash before committing, we need to ensure that the transaction is rolled back. - // This is necessary to avoid leaving the database in an inconsistent state. - log::debug!("call_identity_connected: rolling back transaction"); - let (_, metrics, reducer_name) = mut_tx.rollback(); - stdb.report_mut_tx_metrics(reducer_name, metrics, None); - }); - - mut_tx - .insert_st_client( - caller_auth.claims.identity, - caller_connection_id, - &caller_auth.jwt_payload, - ) - .map_err(DBError::from)?; - - if let Some((reducer_id, reducer_def)) = reducer_lookup { - // The module defined a lifecycle reducer to handle new connections. - // Call this reducer. - // If the call fails (as in, something unexpectedly goes wrong with guest execution), - // abort the connection: we can't really recover. - let reducer_outcome = me.call_reducer_inner_with_inst( - Some(ScopeGuard::into_inner(mut_tx)), - caller_auth.claims.identity, - Some(caller_connection_id), - None, - None, - None, - reducer_id, - reducer_def, - FunctionArgs::Nullary, - inst, - )?; - - match reducer_outcome.outcome { - // If the reducer committed successfully, we're done. - // `WasmModuleInstance::call_reducer_with_tx` has already ensured - // that `st_client` is updated appropriately. - // - // It's necessary to spread out the responsibility for updating `st_client` in this way - // because it's important that `call_identity_connected` commit at most one transaction. - // A naive implementation of this method would just run the reducer first, - // then insert into `st_client`, - // but if we crashed in between, we'd be left in an inconsistent state - // where the reducer had run but `st_client` was not yet updated. - ReducerOutcome::Committed => Ok(()), - - // If the reducer returned an error or couldn't run due to insufficient energy, - // abort the connection: the module code has decided it doesn't want this client. - ReducerOutcome::Failed(message) => Err(ClientConnectedError::Rejected(message)), - ReducerOutcome::BudgetExceeded => Err(ClientConnectedError::OutOfEnergy), - } - } else { - // The module doesn't define a client_connected reducer. - // We need to commit the transaction to update st_clients and st_connection_credentials. - // - // This is necessary to be able to disconnect clients after a server crash. - - // TODO: Is this being broadcast? Does it need to be, or are st_client table subscriptions - // not allowed? - // I (jsdt) don't think it was being broadcast previously. See: - // https://github.com/clockworklabs/SpacetimeDB/issues/3130 - stdb.finish_tx(ScopeGuard::into_inner(mut_tx), Ok(())) - .map_err(|e: DBError| { - log::error!("`call_identity_connected`: finish transaction failed: {e:#?}"); - ClientConnectedError::DBError(e) - })?; - Ok(()) - } + inst.call_identity_connected(caller_auth, caller_connection_id) }) .await .map_err(ReducerCallError::from)? @@ -1260,6 +1283,32 @@ impl ModuleHost { .await? } + fn call_reducer_params( + module: &ModuleInfo, + caller_identity: Identity, + caller_connection_id: Option, + client: Option>, + request_id: Option, + timer: Option, + reducer_id: ReducerId, + reducer_def: &ReducerDef, + args: FunctionArgs, + ) -> Result { + let reducer_seed = ArgsSeed(module.module_def.typespace().with_type(reducer_def)); + let args = args.into_tuple(reducer_seed).map_err(InvalidReducerArguments)?; + let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO); + Ok(CallReducerParams { + timestamp: Timestamp::now(), + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_id, + args, + }) + } + async fn call_reducer_inner( &self, caller_identity: Identity, diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index c20755e17cd..bf2614f9b37 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -10,7 +10,9 @@ use super::module_common::{build_common_module_from_raw, run_describer, ModuleCo use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime}; use super::UpdateDatabaseResult; use crate::host::instance_env::{ChunkPool, InstanceEnv}; -use crate::host::module_host::{call_scheduled_reducer, init_database, Instance}; +use crate::host::module_host::{ + call_identity_connected, call_scheduled_reducer, init_database, ClientConnectedError, Instance, +}; use crate::host::scheduler::QueueItem; use crate::host::wasm_common::instrumentation::CallTimes; use crate::host::wasm_common::module_host_actor::{DescribeError, ExecuteResult, ExecutionTimings, InstanceCommon}; @@ -23,9 +25,10 @@ use core::any::type_name; use core::str; use enum_as_inner::EnumAsInner; use itertools::Either; +use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::traits::Program; -use spacetimedb_lib::{RawModuleDef, Timestamp}; +use spacetimedb_lib::{ConnectionId, RawModuleDef, Timestamp}; use spacetimedb_schema::auto_migrate::MigrationPolicy; use std::sync::mpsc::{Receiver, SyncSender}; use std::sync::{mpsc, Arc, LazyLock}; @@ -317,6 +320,19 @@ impl JsInstance { self.send_recv(JsWorkerReply::into_clear_all_clients, JsWorkerRequest::ClearAllClients) } + pub fn call_identity_connected( + &mut self, + caller_auth: ConnectionAuthCtx, + caller_connection_id: ConnectionId, + ) -> Result<(), ClientConnectedError> { + self.can_trap(move |this| { + this.send_recv( + JsWorkerReply::into_call_identity_connected, + JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id), + ) + }) + } + pub(crate) fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result { self.can_trap(move |this| { this.send_recv( @@ -349,6 +365,7 @@ enum JsWorkerReply { UpdateDatabase(anyhow::Result), CallReducer((ReducerCallResult, bool)), ClearAllClients(anyhow::Result<()>), + CallIdentityConnected((Result<(), ClientConnectedError>, bool)), CallScheduledReducer((Result, bool)), InitDatabase((anyhow::Result>, bool)), } @@ -371,6 +388,8 @@ enum JsWorkerRequest { }, /// See [`JsInstance::clear_all_clients`]. ClearAllClients, + /// See [`JsInstance::call_identity_connected`]. + CallIdentityConnected(ConnectionAuthCtx, ConnectionId), /// See [`JsInstance::call_scheduled_reducer`]. CallScheduledReducer(QueueItem), /// See [`JsInstance::init_database`]. @@ -465,6 +484,7 @@ fn spawn_instance_worker( }; // Setup the instance common and environment. + let info = &module_common.info(); let mut instance_common = InstanceCommon::new(&module_common); let replica_ctx: &Arc = module_common.replica_ctx(); let scheduler = module_common.scheduler().clone(); @@ -506,12 +526,21 @@ fn spawn_instance_worker( let res = instance_common.clear_all_clients(); reply("clear_all_clients", JsWorkerReply::ClearAllClients(res)); } + JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id) => { + let call_reducer = |tx, params| { + call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params) + }; + let mut trapped = false; + let res = + call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped); + let res = (res, trapped); + reply("call_identity_connected", JsWorkerReply::CallIdentityConnected(res)); + } JsWorkerRequest::CallScheduledReducer(queue_item) => { let call_reducer = |tx, params| { call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params) }; - let module = module_common.info(); - let res = call_scheduled_reducer(&module, queue_item, call_reducer); + let res = call_scheduled_reducer(info, queue_item, call_reducer); reply("call_scheduled_reducer", JsWorkerReply::CallScheduledReducer(res)); } JsWorkerRequest::InitDatabase(program) => { diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 696118c1298..e0cbfbbcb47 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1,5 +1,6 @@ use bytes::Bytes; use prometheus::{Histogram, IntCounter, IntGauge}; +use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_lib::db::raw_def::v9::Lifecycle; use spacetimedb_lib::de::DeserializeSeed; use spacetimedb_primitives::ProcedureId; @@ -20,8 +21,8 @@ use crate::host::module_host::{ }; use crate::host::scheduler::QueueItem; use crate::host::{ - call_scheduled_reducer, init_database, ArgsTuple, ProcedureCallError, ProcedureCallResult, ReducerCallResult, - ReducerId, ReducerOutcome, Scheduler, + call_identity_connected, call_scheduled_reducer, init_database, ArgsTuple, ProcedureCallError, ProcedureCallResult, + ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, }; use crate::identity::Identity; use crate::messages::control_db::HostType; @@ -281,6 +282,19 @@ impl WasmModuleInstance { self.common.clear_all_clients() } + pub fn call_identity_connected( + &mut self, + caller_auth: ConnectionAuthCtx, + caller_connection_id: ConnectionId, + ) -> Result<(), ClientConnectedError> { + let module = &self.common.info.clone(); + let call_reducer = |tx, params| self.call_reducer_inner(tx, params); + let mut trapped = false; + let res = call_identity_connected(caller_auth, caller_connection_id, module, call_reducer, &mut trapped); + self.trapped = trapped; + res + } + pub(crate) fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result { let module = &self.common.info.clone(); let call_reducer = |tx, params| self.call_reducer_inner(tx, params); From 48f2e1f8b8bbfa62fb043c78a840e7a4e756b9a1 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 4 Nov 2025 12:17:48 +0100 Subject: [PATCH 08/10] move call_identity_disconnected + disconnect_clien to Instance --- crates/core/src/host/module_host.rs | 105 +++++++++--------- crates/core/src/host/v8/mod.rs | 68 ++++++++++-- .../src/host/wasm_common/module_host_actor.rs | 30 ++++- 3 files changed, 139 insertions(+), 64 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 2665793999b..178a8488380 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -424,6 +424,24 @@ impl Instance { } } + fn call_identity_disconnected( + &mut self, + caller_identity: Identity, + caller_connection_id: ConnectionId, + ) -> Result<(), ReducerCallError> { + match self { + Instance::Wasm(inst) => inst.call_identity_disconnected(caller_identity, caller_connection_id), + Instance::Js(inst) => inst.call_identity_disconnected(caller_identity, caller_connection_id), + } + } + + fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> { + match self { + Instance::Wasm(inst) => inst.disconnect_client(client_id), + Instance::Js(inst) => inst.disconnect_client(client_id), + } + } + fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result { match self { Instance::Wasm(inst) => inst.call_scheduled_reducer(item), @@ -1103,20 +1121,32 @@ impl ModuleHost { pub async fn disconnect_client(&self, client_id: ClientActorId) { log::trace!("disconnecting client {client_id}"); - let this = self.clone(); if let Err(e) = self - .call("disconnect_client", move |inst| { - // Call the `client_disconnected` reducer, if it exists. - // This is a no-op if the module doesn't define such a reducer. - this.subscriptions().remove_subscriber(client_id); - this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst) - }) + .call("disconnect_client", move |inst| inst.disconnect_client(client_id)) .await { log::error!("Error from client_disconnected transaction: {e}"); } } + pub fn disconnect_client_inner( + client_id: ClientActorId, + info: &ModuleInfo, + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), + trapped_slot: &mut bool, + ) -> Result<(), ReducerCallError> { + // Call the `client_disconnected` reducer, if it exists. + // This is a no-op if the module doesn't define such a reducer. + info.subscriptions.remove_subscriber(client_id); + Self::call_identity_disconnected_inner( + client_id.identity, + client_id.connection_id, + info, + call_reducer, + trapped_slot, + ) + } + /// Invoke the module's `client_connected` reducer, if it has one, /// and insert a new row into `st_client` for `(caller_identity, caller_connection_id)`. /// @@ -1147,12 +1177,15 @@ impl ModuleHost { /// If the reducer fails, the rows are still deleted. /// Calling this on an already-disconnected client is a no-op. pub fn call_identity_disconnected_inner( - &self, caller_identity: Identity, caller_connection_id: ConnectionId, - inst: &mut Instance, + info: &ModuleInfo, + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), + trapped_slot: &mut bool, ) -> Result<(), ReducerCallError> { - let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); + let stdb = info.relational_db(); + + let reducer_lookup = info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); let reducer_name = reducer_lookup .as_ref() .map(|(_, def)| &*def.name) @@ -1162,12 +1195,9 @@ impl ModuleHost { let workload = || Workload::reducer_no_args(reducer_name, caller_identity, caller_connection_id); - let me = self.clone(); - let stdb = me.module.replica_ctx().relational_db.clone(); - // A fallback transaction that deletes the client from `st_client`. + let database_identity = stdb.database_identity(); let fallback = || { - let database_identity = me.info.database_identity; stdb.with_auto_commit(workload(), |mut_tx| { if !is_client_exist(mut_tx) { // The client is already gone. Nothing to do. @@ -1207,8 +1237,9 @@ impl ModuleHost { // The module defined a lifecycle reducer to handle disconnects. Call it. // If it succeeds, `WasmModuleInstance::call_reducer_with_tx` has already ensured // that `st_client` is updated appropriately. - let result = me.call_reducer_inner_with_inst( - Some(mut_tx), + let tx = Some(mut_tx); + let result = Self::call_reducer_params( + info, caller_identity, Some(caller_connection_id), None, @@ -1217,8 +1248,12 @@ impl ModuleHost { reducer_id, reducer_def, FunctionArgs::Nullary, - inst, - ); + ) + .map(|params| { + let (res, trapped) = call_reducer(tx, params); + *trapped_slot = trapped; + res + }); // If it failed, we still need to update `st_client`: the client's not coming back. // Commit a separate transaction that just updates `st_client`. @@ -1270,9 +1305,8 @@ impl ModuleHost { caller_identity: Identity, caller_connection_id: ConnectionId, ) -> Result<(), ReducerCallError> { - let me = self.clone(); self.call("call_identity_disconnected", move |inst| { - me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst) + inst.call_identity_disconnected(caller_identity, caller_connection_id) }) .await? } @@ -1340,37 +1374,6 @@ impl ModuleHost { }) .await?) } - fn call_reducer_inner_with_inst( - &self, - tx: Option, - caller_identity: Identity, - caller_connection_id: Option, - client: Option>, - request_id: Option, - timer: Option, - reducer_id: ReducerId, - reducer_def: &ReducerDef, - args: FunctionArgs, - module_instance: &mut Instance, - ) -> Result { - let reducer_seed = ArgsSeed(self.info.module_def.typespace().with_type(reducer_def)); - let args = args.into_tuple(reducer_seed).map_err(InvalidReducerArguments)?; - let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO); - - Ok(module_instance.call_reducer( - tx, - CallReducerParams { - timestamp: Timestamp::now(), - caller_identity, - caller_connection_id, - client, - request_id, - timer, - reducer_id, - args, - }, - )) - } pub async fn call_reducer( &self, diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index bf2614f9b37..b76ee8ff350 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -9,6 +9,7 @@ use self::syscall::{call_call_reducer, call_describe_module, call_reducer_fun, r use super::module_common::{build_common_module_from_raw, run_describer, ModuleCommon}; use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime}; use super::UpdateDatabaseResult; +use crate::client::ClientActorId; use crate::host::instance_env::{ChunkPool, InstanceEnv}; use crate::host::module_host::{ call_identity_connected, call_scheduled_reducer, init_database, ClientConnectedError, Instance, @@ -17,7 +18,7 @@ use crate::host::scheduler::QueueItem; use crate::host::wasm_common::instrumentation::CallTimes; use crate::host::wasm_common::module_host_actor::{DescribeError, ExecuteResult, ExecutionTimings, InstanceCommon}; use crate::host::wasm_common::{RowIters, TimingSpanSet}; -use crate::host::{ReducerCallError, ReducerCallResult, Scheduler}; +use crate::host::{ModuleHost, ReducerCallError, ReducerCallResult, Scheduler}; use crate::module_host_context::{ModuleCreationContext, ModuleCreationContextLimited}; use crate::replica_context::ReplicaContext; use crate::util::asyncify; @@ -28,7 +29,7 @@ use itertools::Either; use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::traits::Program; -use spacetimedb_lib::{ConnectionId, RawModuleDef, Timestamp}; +use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp}; use spacetimedb_schema::auto_migrate::MigrationPolicy; use std::sync::mpsc::{Receiver, SyncSender}; use std::sync::{mpsc, Arc, LazyLock}; @@ -333,6 +334,28 @@ impl JsInstance { }) } + pub fn call_identity_disconnected( + &mut self, + caller_identity: Identity, + caller_connection_id: ConnectionId, + ) -> Result<(), ReducerCallError> { + self.can_trap(move |this| { + this.send_recv( + JsWorkerReply::into_call_identity_disconnected, + JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id), + ) + }) + } + + pub fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> { + self.can_trap(move |this| { + this.send_recv( + JsWorkerReply::into_disconnect_client, + JsWorkerRequest::DisconnectClient(client_id), + ) + }) + } + pub(crate) fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result { self.can_trap(move |this| { this.send_recv( @@ -366,6 +389,8 @@ enum JsWorkerReply { CallReducer((ReducerCallResult, bool)), ClearAllClients(anyhow::Result<()>), CallIdentityConnected((Result<(), ClientConnectedError>, bool)), + CallIdentityDisconnected((Result<(), ReducerCallError>, bool)), + DisconnectClient((Result<(), ReducerCallError>, bool)), CallScheduledReducer((Result, bool)), InitDatabase((anyhow::Result>, bool)), } @@ -390,6 +415,10 @@ enum JsWorkerRequest { ClearAllClients, /// See [`JsInstance::call_identity_connected`]. CallIdentityConnected(ConnectionAuthCtx, ConnectionId), + /// See [`JsInstance::call_identity_disconnected`]. + CallIdentityDisconnected(Identity, ConnectionId), + /// See [`JsInstance::disconnect_client`]. + DisconnectClient(ClientActorId), /// See [`JsInstance::call_scheduled_reducer`]. CallScheduledReducer(QueueItem), /// See [`JsInstance::init_database`]. @@ -503,6 +532,9 @@ fn spawn_instance_worker( } }; for request in request_rx.iter() { + let mut call_reducer = + |tx, params| call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params); + match request { JsWorkerRequest::UpdateDatabase { program, @@ -519,7 +551,7 @@ fn spawn_instance_worker( // but rather let this happen by `return_instance` using `JsInstance::trapped` // which will cause `JsInstance` to be dropped, // which in turn results in the loop being terminated. - let res = call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params); + let res = call_reducer(tx, params); reply("call_reducer", JsWorkerReply::CallReducer(res)); } JsWorkerRequest::ClearAllClients => { @@ -527,26 +559,38 @@ fn spawn_instance_worker( reply("clear_all_clients", JsWorkerReply::ClearAllClients(res)); } JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id) => { - let call_reducer = |tx, params| { - call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params) - }; let mut trapped = false; let res = call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped); let res = (res, trapped); reply("call_identity_connected", JsWorkerReply::CallIdentityConnected(res)); } + JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id) => { + let mut trapped = false; + let res = ModuleHost::call_identity_disconnected_inner( + caller_identity, + caller_connection_id, + info, + call_reducer, + &mut trapped, + ); + let res = (res, trapped); + reply( + "call_identity_disconnected", + JsWorkerReply::CallIdentityDisconnected(res), + ); + } + JsWorkerRequest::DisconnectClient(client_id) => { + let mut trapped = false; + let res = ModuleHost::disconnect_client_inner(client_id, info, call_reducer, &mut trapped); + let res = (res, trapped); + reply("disconnect_client", JsWorkerReply::DisconnectClient(res)); + } JsWorkerRequest::CallScheduledReducer(queue_item) => { - let call_reducer = |tx, params| { - call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params) - }; let res = call_scheduled_reducer(info, queue_item, call_reducer); reply("call_scheduled_reducer", JsWorkerReply::CallScheduledReducer(res)); } JsWorkerRequest::InitDatabase(program) => { - let call_reducer = |tx, params| { - call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params) - }; let res = init_database(replica_ctx, &module_common.info().module_def, program, call_reducer); reply("init_database", JsWorkerReply::InitDatabase(res)); } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index e0cbfbbcb47..e93bb9f0749 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -11,7 +11,7 @@ use std::time::Duration; use tracing::span::EnteredSpan; use super::instrumentation::CallTimes; -use crate::client::ClientConnectionSender; +use crate::client::{ClientActorId, ClientConnectionSender}; use crate::database_logger; use crate::energy::{EnergyMonitor, ReducerBudget, ReducerFingerprint}; use crate::host::instance_env::InstanceEnv; @@ -295,6 +295,34 @@ impl WasmModuleInstance { res } + pub fn call_identity_disconnected( + &mut self, + caller_identity: Identity, + caller_connection_id: ConnectionId, + ) -> Result<(), ReducerCallError> { + let module = &self.common.info.clone(); + let call_reducer = |tx, params| self.call_reducer_inner(tx, params); + let mut trapped = false; + let res = ModuleHost::call_identity_disconnected_inner( + caller_identity, + caller_connection_id, + module, + call_reducer, + &mut trapped, + ); + self.trapped = trapped; + res + } + + pub fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> { + let module = &self.common.info.clone(); + let call_reducer = |tx, params| self.call_reducer_inner(tx, params); + let mut trapped = false; + let res = ModuleHost::disconnect_client_inner(client_id, module, call_reducer, &mut trapped); + self.trapped = trapped; + res + } + pub(crate) fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result { let module = &self.common.info.clone(); let call_reducer = |tx, params| self.call_reducer_inner(tx, params); From 81b862a6b19244b6ccb6399424799dd9e5acab27 Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Tue, 4 Nov 2025 22:10:50 +0100 Subject: [PATCH 09/10] fix rebase fallout --- .../src/host/wasm_common/module_host_actor.rs | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index e93bb9f0749..ea7fa7e9c32 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1,15 +1,3 @@ -use bytes::Bytes; -use prometheus::{Histogram, IntCounter, IntGauge}; -use spacetimedb_auth::identity::ConnectionAuthCtx; -use spacetimedb_lib::db::raw_def::v9::Lifecycle; -use spacetimedb_lib::de::DeserializeSeed; -use spacetimedb_primitives::ProcedureId; -use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError}; -use std::future::Future; -use std::sync::Arc; -use std::time::Duration; -use tracing::span::EnteredSpan; - use super::instrumentation::CallTimes; use crate::client::{ClientActorId, ClientConnectionSender}; use crate::database_logger; @@ -17,12 +5,13 @@ use crate::energy::{EnergyMonitor, ReducerBudget, ReducerFingerprint}; use crate::host::instance_env::InstanceEnv; use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; use crate::host::module_host::{ - CallProcedureParams, CallReducerParams, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, + call_identity_connected, call_scheduled_reducer, init_database, CallProcedureParams, CallReducerParams, + ClientConnectedError, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, }; use crate::host::scheduler::QueueItem; use crate::host::{ - call_identity_connected, call_scheduled_reducer, init_database, ArgsTuple, ProcedureCallError, ProcedureCallResult, - ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, + ArgsTuple, ModuleHost, ProcedureCallError, ProcedureCallResult, ReducerCallError, ReducerCallResult, ReducerId, + ReducerOutcome, Scheduler, UpdateDatabaseResult, }; use crate::identity::Identity; use crate::messages::control_db::HostType; @@ -31,13 +20,24 @@ use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_actor::WriteConflict; use crate::util::prometheus_handle::{HistogramExt, TimerGuard}; use crate::worker_metrics::WORKER_METRICS; +use bytes::Bytes; +use prometheus::{Histogram, IntCounter, IntGauge}; +use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::traits::{IsolationLevel, Program}; use spacetimedb_lib::buffer::DecodeError; +use spacetimedb_lib::db::raw_def::v9::Lifecycle; +use spacetimedb_lib::de::DeserializeSeed; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::{bsatn, ConnectionId, RawModuleDef, Timestamp}; +use spacetimedb_primitives::ProcedureId; +use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError}; +use std::future::Future; +use std::sync::Arc; +use std::time::Duration; +use tracing::span::EnteredSpan; use super::*; @@ -355,6 +355,7 @@ impl WasmModuleInstance { if res.is_err() { self.trapped = true; } + res } } From ce19fd54f27ca85e54beae82708728ed5838cd1e Mon Sep 17 00:00:00 2001 From: Mazdak Farrokhzad Date: Wed, 5 Nov 2025 00:29:14 +0100 Subject: [PATCH 10/10] make JsInstance methods async --- crates/core/src/host/module_host.rs | 273 ++++++++++++++-------------- crates/core/src/host/v8/mod.rs | 191 +++++++++---------- 2 files changed, 232 insertions(+), 232 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 178a8488380..1cc044f2c6b 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -11,6 +11,8 @@ use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::hash::Hash; use crate::host::scheduler::{handle_queued_call_reducer_params, QueueItem}; +use crate::host::v8::JsInstance; +use crate::host::wasmtime::ModuleInstance; use crate::host::InvalidFunctionArguments; use crate::identity::Identity; use crate::messages::control_db::{Database, HostType}; @@ -386,76 +388,6 @@ impl Instance { } } - /// Update the module instance's database to match the schema of the module instance. - fn update_database( - &mut self, - program: Program, - old_module_info: Arc, - policy: MigrationPolicy, - ) -> anyhow::Result { - match self { - Instance::Wasm(inst) => inst.update_database(program, old_module_info, policy), - Instance::Js(inst) => inst.update_database(program, old_module_info, policy), - } - } - - fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { - match self { - Instance::Wasm(inst) => inst.call_reducer(tx, params), - Instance::Js(inst) => inst.call_reducer(tx, params), - } - } - - fn clear_all_clients(&self) -> anyhow::Result<()> { - match self { - Instance::Wasm(inst) => inst.clear_all_clients(), - Instance::Js(inst) => inst.clear_all_clients(), - } - } - - fn call_identity_connected( - &mut self, - caller_auth: ConnectionAuthCtx, - caller_connection_id: ConnectionId, - ) -> Result<(), ClientConnectedError> { - match self { - Instance::Wasm(inst) => inst.call_identity_connected(caller_auth, caller_connection_id), - Instance::Js(inst) => inst.call_identity_connected(caller_auth, caller_connection_id), - } - } - - fn call_identity_disconnected( - &mut self, - caller_identity: Identity, - caller_connection_id: ConnectionId, - ) -> Result<(), ReducerCallError> { - match self { - Instance::Wasm(inst) => inst.call_identity_disconnected(caller_identity, caller_connection_id), - Instance::Js(inst) => inst.call_identity_disconnected(caller_identity, caller_connection_id), - } - } - - fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> { - match self { - Instance::Wasm(inst) => inst.disconnect_client(client_id), - Instance::Js(inst) => inst.disconnect_client(client_id), - } - } - - fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result { - match self { - Instance::Wasm(inst) => inst.call_scheduled_reducer(item), - Instance::Js(inst) => inst.call_scheduled_reducer(item), - } - } - - fn init_database(&mut self, program: Program) -> anyhow::Result> { - match self { - Instance::Wasm(inst) => inst.init_database(program), - Instance::Js(inst) => inst.init_database(program), - } - } - async fn call_procedure(&mut self, params: CallProcedureParams) -> Result { match self { Instance::Wasm(inst) => inst.call_procedure(params).await, @@ -1050,79 +982,119 @@ impl ModuleHost { }) } - async fn call_async_with_instance(&self, label: &str, f: Fun) -> Result + /// Run a function for this module which has access to the module instance. + async fn with_instance<'a, Guard, R, F>( + &'a self, + kind: &str, + label: &str, + timer: impl FnOnce(&str) -> Guard, + work: impl FnOnce(Guard, &'a SingleCoreExecutor, Instance) -> F, + ) -> Result where - Fun: (FnOnce(Instance) -> Fut) + Send + 'static, - Fut: Future + Send + 'static, - R: Send + 'static, + F: Future, { self.guard_closed()?; - let timer_guard = self.start_call_timer(label); + let timer_guard = timer(label); + // Operations on module instances (e.g. calling reducers) is blocking, + // partially because the computation can potentially take a long time + // and partially because interacting with the database requires taking + // a blocking lock. So, we run `f` on a dedicated thread with `self.executor`. + // This will bubble up any panic that may occur. + + // If a function call panics, we **must** ensure to call `self.on_panic` + // so that the module is discarded by the host controller. scopeguard::defer_on_unwind!({ - log::warn!("procedure {label} panicked"); + log::warn!("{kind} {label} panicked"); (self.on_panic)(); }); // TODO: should we be calling and/or `await`-ing `get_instance` within the below `run_job`? // Unclear how much overhead this call can have. - let instance = self.instance_manager.lock().await.get_instance().await; + let inst = self.instance_manager.lock().await.get_instance().await; - let (res, instance) = self - .executor - .run_job(async move { - drop(timer_guard); - f(instance).await - }) - .await; + let (res, inst) = work(timer_guard, &self.executor, inst).await; - self.instance_manager.lock().await.return_instance(instance); + self.instance_manager.lock().await.return_instance(inst); Ok(res) } - /// Run a function on the JobThread for this module which has access to the module instance. - async fn call(&self, label: &str, f: F) -> Result + async fn call_async_with_instance(&self, label: &str, work: Fun) -> Result where - F: FnOnce(&mut Instance) -> R + Send + 'static, + Fun: (FnOnce(Instance) -> Fut) + Send + 'static, + Fut: Future + Send + 'static, R: Send + 'static, { - self.guard_closed()?; - let timer_guard = self.start_call_timer(label); - - // Operations on module instances (e.g. calling reducers) is blocking, - // partially because the computation can potentially take a long time - // and partially because interacting with the database requires taking - // a blocking lock. So, we run `f` on a dedicated thread with `self.executor`. - // This will bubble up any panic that may occur. - - // If a reducer call panics, we **must** ensure to call `self.on_panic` - // so that the module is discarded by the host controller. - scopeguard::defer_on_unwind!({ - log::warn!("reducer {label} panicked"); - (self.on_panic)(); - }); - - let mut instance = self.instance_manager.lock().await.get_instance().await; - - let (res, instance) = self - .executor - .run_sync_job(move || { - drop(timer_guard); - let res = f(&mut instance); - (res, instance) - }) - .await; - - self.instance_manager.lock().await.return_instance(instance); + self.with_instance( + "procedure", + label, + |l| self.start_call_timer(l), + |timer_guard, executor, inst| { + executor.run_job(async move { + drop(timer_guard); + work(inst).await + }) + }, + ) + .await + } - Ok(res) + /// Run a function for this module which has access to the module instance. + /// + /// For WASM, the function is run on the module's JobThread. + /// For V8/JS, the function is run in the current task. + async fn call( + &self, + label: &str, + arg: A, + wasm: impl FnOnce(A, &mut ModuleInstance) -> R + Send + 'static, + js: impl FnOnce(A, Box) -> JF, + ) -> Result + where + JF: Future)>, + R: Send + 'static, + A: Send + 'static, + { + self.with_instance( + "reducer", + label, + |l| self.start_call_timer(l), + // Operations on module instances (e.g. calling reducers) is blocking, + // partially because the computation can potentially take a long time + // and partially because interacting with the database requires taking a blocking lock. + // So, we run `work` on a dedicated thread with `self.executor`. + // This will bubble up any panic that may occur. + |timer_guard, executor, inst| async move { + match inst { + Instance::Wasm(mut inst) => { + executor + .run_sync_job(move || { + drop(timer_guard); + (wasm(arg, &mut inst), Instance::Wasm(inst)) + }) + .await + } + Instance::Js(inst) => { + drop(timer_guard); + let (res, inst) = js(arg, inst).await; + (res, Instance::Js(inst)) + } + } + }, + ) + .await } pub async fn disconnect_client(&self, client_id: ClientActorId) { log::trace!("disconnecting client {client_id}"); if let Err(e) = self - .call("disconnect_client", move |inst| inst.disconnect_client(client_id)) + .call( + "disconnect_client", + client_id, + |client_id, inst| inst.disconnect_client(client_id), + |client_id, inst| inst.disconnect_client(client_id), + ) .await { log::error!("Error from client_disconnected transaction: {e}"); @@ -1165,9 +1137,12 @@ impl ModuleHost { caller_auth: ConnectionAuthCtx, caller_connection_id: ConnectionId, ) -> Result<(), ClientConnectedError> { - self.call("call_identity_connected", move |inst| { - inst.call_identity_connected(caller_auth, caller_connection_id) - }) + self.call( + "call_identity_connected", + (caller_auth, caller_connection_id), + |(a, b), inst| inst.call_identity_connected(a, b), + |(a, b), inst| inst.call_identity_connected(a, b), + ) .await .map_err(ReducerCallError::from)? } @@ -1305,16 +1280,24 @@ impl ModuleHost { caller_identity: Identity, caller_connection_id: ConnectionId, ) -> Result<(), ReducerCallError> { - self.call("call_identity_disconnected", move |inst| { - inst.call_identity_disconnected(caller_identity, caller_connection_id) - }) + self.call( + "call_identity_disconnected", + (caller_identity, caller_connection_id), + |(a, b), inst| inst.call_identity_disconnected(a, b), + |(a, b), inst| inst.call_identity_disconnected(a, b), + ) .await? } /// Empty the system tables tracking clients without running any lifecycle reducers. pub async fn clear_all_clients(&self) -> anyhow::Result<()> { - self.call("clear_all_clients", move |inst| inst.clear_all_clients()) - .await? + self.call( + "clear_all_clients", + (), + |_, inst| inst.clear_all_clients(), + |_, inst| inst.clear_all_clients(), + ) + .await? } fn call_reducer_params( @@ -1369,9 +1352,12 @@ impl ModuleHost { }; Ok(self - .call(&reducer_def.name, move |inst| { - inst.call_reducer(None, call_reducer_params) - }) + .call( + &reducer_def.name, + call_reducer_params, + |p, inst| inst.call_reducer(None, p), + |p, inst| inst.call_reducer(None, p), + ) .await?) } @@ -1492,9 +1478,12 @@ impl ModuleHost { // because their reducer arguments are stored in the database and need to be fetched // within the same transaction as the reducer call. pub(crate) async fn call_scheduled_reducer(&self, item: QueueItem) -> Result { - self.call(SCHEDULED_REDUCER, move |inst: &mut Instance| { - inst.call_scheduled_reducer(item) - }) + self.call( + SCHEDULED_REDUCER, + item, + |item, inst| inst.call_scheduled_reducer(item), + |item, inst| inst.call_scheduled_reducer(item), + ) .await? } @@ -1503,9 +1492,14 @@ impl ModuleHost { } pub async fn init_database(&self, program: Program) -> Result, InitDatabaseError> { - self.call("", move |inst| inst.init_database(program)) - .await? - .map_err(InitDatabaseError::Other) + self.call( + "", + program, + |p, inst| inst.init_database(p), + |p, inst| inst.init_database(p), + ) + .await? + .map_err(InitDatabaseError::Other) } pub async fn update_database( @@ -1514,9 +1508,12 @@ impl ModuleHost { old_module_info: Arc, policy: MigrationPolicy, ) -> Result { - self.call("", move |inst| { - inst.update_database(program, old_module_info, policy) - }) + self.call( + "", + (program, old_module_info, policy), + |(a, b, c), inst| inst.update_database(a, b, c), + |(a, b, c), inst| inst.update_database(a, b, c), + ) .await? } diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index b76ee8ff350..fdf8fd51492 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -31,9 +31,9 @@ use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::traits::Program; use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp}; use spacetimedb_schema::auto_migrate::MigrationPolicy; -use std::sync::mpsc::{Receiver, SyncSender}; -use std::sync::{mpsc, Arc, LazyLock}; +use std::sync::{Arc, LazyLock}; use std::time::Instant; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::oneshot; use v8::script_compiler::{compile_module, Source}; use v8::{ @@ -252,8 +252,8 @@ impl JsInstanceEnv { /// which will cause the worker's loop to terminate /// and cleanup the isolate and friends. pub struct JsInstance { - request_tx: SyncSender, - reply_rx: Receiver, + request_tx: Sender, + reply_rx: Receiver<(JsWorkerReply, bool)>, trapped: bool, } @@ -263,41 +263,38 @@ impl JsInstance { } /// Send a request to the worker and wait for a reply. - fn send_recv( - &self, + async fn send_recv( + mut self: Box, extract: impl FnOnce(JsWorkerReply) -> Result, request: JsWorkerRequest, - ) -> T { + ) -> (T, Box) { // Send the request. self.request_tx .send(request) + .await .expect("worker's `request_rx` should be live as `JsInstance::drop` hasn't happened"); // Wait for the response. - let reply = self + let (reply, trapped) = self .reply_rx .recv() + .await .expect("worker's `reply_tx` should be live as `JsInstance::drop` hasn't happened"); + self.trapped = trapped; + match extract(reply) { Err(err) => unreachable!("should have received {} but got {err:?}", type_name::()), - Ok(reply) => reply, + Ok(reply) => (reply, self), } } - /// Performs `work` that may trap the worker. - fn can_trap(&mut self, work: impl FnOnce(&mut Self) -> (T, bool)) -> T { - let (x, trapped) = work(self); - self.trapped = trapped; - x - } - - pub fn update_database( - &self, + pub async fn update_database( + self: Box, program: Program, old_module_info: Arc, policy: MigrationPolicy, - ) -> anyhow::Result { + ) -> (anyhow::Result, Box) { self.send_recv( JsWorkerReply::into_update_database, JsWorkerRequest::UpdateDatabase { @@ -306,72 +303,81 @@ impl JsInstance { policy, }, ) + .await } - pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { - self.can_trap(|this| { - this.send_recv( - JsWorkerReply::into_call_reducer, - JsWorkerRequest::CallReducer { tx, params }, - ) - }) + pub async fn call_reducer( + self: Box, + tx: Option, + params: CallReducerParams, + ) -> (ReducerCallResult, Box) { + self.send_recv( + JsWorkerReply::into_call_reducer, + JsWorkerRequest::CallReducer { tx, params }, + ) + .await } - pub fn clear_all_clients(&self) -> anyhow::Result<()> { + pub async fn clear_all_clients(self: Box) -> (anyhow::Result<()>, Box) { self.send_recv(JsWorkerReply::into_clear_all_clients, JsWorkerRequest::ClearAllClients) + .await } - pub fn call_identity_connected( - &mut self, + pub async fn call_identity_connected( + self: Box, caller_auth: ConnectionAuthCtx, caller_connection_id: ConnectionId, - ) -> Result<(), ClientConnectedError> { - self.can_trap(move |this| { - this.send_recv( - JsWorkerReply::into_call_identity_connected, - JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id), - ) - }) + ) -> (Result<(), ClientConnectedError>, Box) { + self.send_recv( + JsWorkerReply::into_call_identity_connected, + JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id), + ) + .await } - pub fn call_identity_disconnected( - &mut self, + pub async fn call_identity_disconnected( + self: Box, caller_identity: Identity, caller_connection_id: ConnectionId, - ) -> Result<(), ReducerCallError> { - self.can_trap(move |this| { - this.send_recv( - JsWorkerReply::into_call_identity_disconnected, - JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id), - ) - }) + ) -> (Result<(), ReducerCallError>, Box) { + self.send_recv( + JsWorkerReply::into_call_identity_disconnected, + JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id), + ) + .await } - pub fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> { - self.can_trap(move |this| { - this.send_recv( - JsWorkerReply::into_disconnect_client, - JsWorkerRequest::DisconnectClient(client_id), - ) - }) + pub async fn disconnect_client( + self: Box, + client_id: ClientActorId, + ) -> (Result<(), ReducerCallError>, Box) { + self.send_recv( + JsWorkerReply::into_disconnect_client, + JsWorkerRequest::DisconnectClient(client_id), + ) + .await } - pub(crate) fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result { - self.can_trap(move |this| { - this.send_recv( - JsWorkerReply::into_call_scheduled_reducer, - JsWorkerRequest::CallScheduledReducer(item), - ) - }) + pub(crate) async fn call_scheduled_reducer( + self: Box, + item: QueueItem, + ) -> (Result, Box) { + self.send_recv( + JsWorkerReply::into_call_scheduled_reducer, + JsWorkerRequest::CallScheduledReducer(item), + ) + .await } - pub fn init_database(&mut self, program: Program) -> anyhow::Result> { - self.can_trap(|this| { - this.send_recv( - JsWorkerReply::into_init_database, - JsWorkerRequest::InitDatabase(program), - ) - }) + pub async fn init_database( + self: Box, + program: Program, + ) -> (anyhow::Result>, Box) { + self.send_recv( + JsWorkerReply::into_init_database, + JsWorkerRequest::InitDatabase(program), + ) + .await } pub async fn call_procedure( @@ -386,13 +392,13 @@ impl JsInstance { #[derive(EnumAsInner, Debug)] enum JsWorkerReply { UpdateDatabase(anyhow::Result), - CallReducer((ReducerCallResult, bool)), + CallReducer(ReducerCallResult), ClearAllClients(anyhow::Result<()>), - CallIdentityConnected((Result<(), ClientConnectedError>, bool)), - CallIdentityDisconnected((Result<(), ReducerCallError>, bool)), - DisconnectClient((Result<(), ReducerCallError>, bool)), - CallScheduledReducer((Result, bool)), - InitDatabase((anyhow::Result>, bool)), + CallIdentityConnected(Result<(), ClientConnectedError>), + CallIdentityDisconnected(Result<(), ReducerCallError>), + DisconnectClient(Result<(), ReducerCallError>), + CallScheduledReducer(Result), + InitDatabase(anyhow::Result>), } /// A request for the worker in [`spawn_instance_worker`]. @@ -479,9 +485,9 @@ fn spawn_instance_worker( // The use-case is SPSC and all channels are rendezvous channels // where each `.send` blocks until it's received. // The Instance --Request-> Worker channel: - let (request_tx, request_rx) = mpsc::sync_channel(0); + let (request_tx, mut request_rx) = channel(1); // The Worker --Reply-> Instance channel: - let (reply_tx, reply_rx) = mpsc::sync_channel(0); + let (reply_tx, reply_rx) = channel(1); // This one-shot channel is used for initial startup error handling within the thread. let (result_tx, result_rx) = oneshot::channel(); @@ -524,17 +530,18 @@ fn spawn_instance_worker( // // The loop is terminated when a `JsInstance` is dropped. // This will cause channels, scopes, and the isolate to be cleaned up. - let reply = |ctx: &str, reply: JsWorkerReply| { - if let Err(e) = reply_tx.send(reply) { + let reply = |ctx: &str, reply: JsWorkerReply, trapped| { + if let Err(e) = reply_tx.blocking_send((reply, trapped)) { // This should never happen as `JsInstance::$function` immediately // does `.recv` on the other end of the channel. unreachable!("should have receiver for `{ctx}` response, {e}"); } }; - for request in request_rx.iter() { + while let Some(request) = request_rx.blocking_recv() { let mut call_reducer = |tx, params| call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params); + use JsWorkerReply::*; match request { JsWorkerRequest::UpdateDatabase { program, @@ -543,7 +550,7 @@ fn spawn_instance_worker( } => { // Update the database and reply to `JsInstance::update_database`. let res = instance_common.update_database(replica_ctx, program, old_module_info, policy); - reply("update_database", JsWorkerReply::UpdateDatabase(res)); + reply("update_database", UpdateDatabase(res), false); } JsWorkerRequest::CallReducer { tx, params } => { // Call the reducer. @@ -551,19 +558,18 @@ fn spawn_instance_worker( // but rather let this happen by `return_instance` using `JsInstance::trapped` // which will cause `JsInstance` to be dropped, // which in turn results in the loop being terminated. - let res = call_reducer(tx, params); - reply("call_reducer", JsWorkerReply::CallReducer(res)); + let (res, trapped) = call_reducer(tx, params); + reply("call_reducer", CallReducer(res), trapped); } JsWorkerRequest::ClearAllClients => { let res = instance_common.clear_all_clients(); - reply("clear_all_clients", JsWorkerReply::ClearAllClients(res)); + reply("clear_all_clients", ClearAllClients(res), false); } JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id) => { let mut trapped = false; let res = call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped); - let res = (res, trapped); - reply("call_identity_connected", JsWorkerReply::CallIdentityConnected(res)); + reply("call_identity_connected", CallIdentityConnected(res), trapped); } JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id) => { let mut trapped = false; @@ -574,25 +580,22 @@ fn spawn_instance_worker( call_reducer, &mut trapped, ); - let res = (res, trapped); - reply( - "call_identity_disconnected", - JsWorkerReply::CallIdentityDisconnected(res), - ); + reply("call_identity_disconnected", CallIdentityDisconnected(res), trapped); } JsWorkerRequest::DisconnectClient(client_id) => { let mut trapped = false; let res = ModuleHost::disconnect_client_inner(client_id, info, call_reducer, &mut trapped); - let res = (res, trapped); - reply("disconnect_client", JsWorkerReply::DisconnectClient(res)); + reply("disconnect_client", DisconnectClient(res), trapped); } JsWorkerRequest::CallScheduledReducer(queue_item) => { - let res = call_scheduled_reducer(info, queue_item, call_reducer); - reply("call_scheduled_reducer", JsWorkerReply::CallScheduledReducer(res)); + let (res, trapped): (Result, bool) = + call_scheduled_reducer(info, queue_item, call_reducer); + reply("call_scheduled_reducer", CallScheduledReducer(res), trapped); } JsWorkerRequest::InitDatabase(program) => { - let res = init_database(replica_ctx, &module_common.info().module_def, program, call_reducer); - reply("init_database", JsWorkerReply::InitDatabase(res)); + let (res, trapped): (Result, anyhow::Error>, bool) = + init_database(replica_ctx, &module_common.info().module_def, program, call_reducer); + reply("init_database", InitDatabase(res), trapped); } } }