diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 4b7a533c008..a56aa00ff00 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -18,7 +18,7 @@ use spacetimedb_datastore::locking_tx_datastore::state_view::{ IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, IterTx, StateView, }; use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; -use spacetimedb_datastore::system_tables::{system_tables, StModuleRow}; +use spacetimedb_datastore::system_tables::{system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID}; use spacetimedb_datastore::system_tables::{StFields, StVarFields, StVarName, StVarRow, ST_MODULE_ID, ST_VAR_ID}; use spacetimedb_datastore::traits::{ InsertFlags, IsolationLevel, Metadata, MutTx as _, MutTxDatastore, Program, RowTypeForTable, Tx as _, TxDatastore, @@ -1397,12 +1397,21 @@ impl RelationalDB { self.inner.delete_by_rel_mut_tx(tx, table_id, relation) } - /// Clear all rows from a table without dropping it. + /// Clears all rows from a table without dropping it. pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result { let rows_deleted = tx.clear_table(table_id)?; Ok(rows_deleted) } + /// Empties the system tables tracking clients. + pub fn clear_all_clients(&self) -> Result<(), DBError> { + self.with_auto_commit(Workload::Internal, |mut_tx| { + self.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?; + self.clear_table(mut_tx, ST_CLIENT_ID)?; + Ok(()) + }) + } + pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result { Ok(self.inner.create_sequence_mut_tx(tx, sequence_schema)?) } @@ -2865,43 +2874,37 @@ mod tests { let stdb = TestDB::durable().expect("failed to create TestDB"); let timestamp = Timestamp::now(); - let ctx = ReducerContext { - name: "abstract_concrete_proxy_factory_impl".into(), - caller_identity: Identity::__dummy(), - caller_connection_id: ConnectionId::ZERO, - timestamp, - arg_bsatn: Bytes::new(), + let workload = |name: &str| { + Workload::Reducer(ReducerContext { + name: name.into(), + caller_identity: Identity::__dummy(), + caller_connection_id: ConnectionId::ZERO, + timestamp, + arg_bsatn: Bytes::new(), + }) }; + let workload1 = workload("abstract_concrete_proxy_factory_impl"); let row_ty = ProductType::from([("le_boeuf", AlgebraicType::I32)]); let schema = table("test_table", row_ty.clone(), |builder| builder); // Create an empty transaction { - let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Reducer(ctx.clone())); + let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload1.clone()); stdb.commit_tx(tx).expect("failed to commit empty transaction"); } // Create an empty transaction pretending to be an // `__identity_connected__` call. { - let tx = stdb.begin_mut_tx( - IsolationLevel::Serializable, - Workload::Reducer(ReducerContext { - name: "__identity_connected__".into(), - caller_identity: Identity::__dummy(), - caller_connection_id: ConnectionId::ZERO, - timestamp, - arg_bsatn: Bytes::new(), - }), - ); + let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload("__identity_connected__")); stdb.commit_tx(tx) .expect("failed to commit empty __identity_connected__ transaction"); } // Create a non-empty transaction including reducer info let table_id = { - let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Reducer(ctx)); + let mut tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload1); let table_id = stdb.create_table(&mut tx, schema).expect("failed to create table"); insert(&stdb, &mut tx, table_id, &product!(AlgebraicValue::I32(0))).expect("failed to insert row"); stdb.commit_tx(tx).expect("failed to commit tx"); diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 1fde862906f..1cc044f2c6b 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -10,6 +10,9 @@ use crate::energy::EnergyQuanta; use crate::error::DBError; use crate::estimation::estimate_rows_scanned; use crate::hash::Hash; +use crate::host::scheduler::{handle_queued_call_reducer_params, QueueItem}; +use crate::host::v8::JsInstance; +use crate::host::wasmtime::ModuleInstance; use crate::host::InvalidFunctionArguments; use crate::identity::Identity; use crate::messages::control_db::{Database, HostType}; @@ -25,7 +28,6 @@ use crate::util::jobs::{SingleCoreExecutor, WeakSingleCoreExecutor}; use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; use anyhow::Context; -use bytes::Bytes; use derive_more::From; use futures::lock::Mutex; use indexmap::IndexSet; @@ -38,7 +40,6 @@ use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; -use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; use spacetimedb_durability::DurableOffset; use spacetimedb_execution::pipelined::PipelinedProject; @@ -289,6 +290,10 @@ impl ModuleInfo { metrics, }) } + + pub fn relational_db(&self) -> &Arc { + self.subscriptions.relational_db() + } } /// A bidirectional map between `Identifiers` (reducer names) and `ReducerId`s. @@ -383,26 +388,6 @@ impl Instance { } } - /// Update the module instance's database to match the schema of the module instance. - fn update_database( - &mut self, - program: Program, - old_module_info: Arc, - policy: MigrationPolicy, - ) -> anyhow::Result { - match self { - Instance::Wasm(inst) => inst.update_database(program, old_module_info, policy), - Instance::Js(inst) => inst.update_database(program, old_module_info, policy), - } - } - - fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { - match self { - Instance::Wasm(inst) => inst.call_reducer(tx, params), - Instance::Js(inst) => inst.call_reducer(tx, params), - } - } - async fn call_procedure(&mut self, params: CallProcedureParams) -> Result { match self { Instance::Wasm(inst) => inst.call_procedure(params).await, @@ -436,33 +421,53 @@ pub fn create_table_from_view_def( Ok(()) } -/// If the module instance's replica_ctx is uninitialized, initialize it. -fn init_database( +/// Moves out the `trapped: bool` from `res`. +fn extract_trapped(res: Result<(T, bool), E>) -> (Result, bool) { + match res { + Ok((x, t)) => (Ok(x), t), + Err(x) => (Err(x), false), + } +} + +/// If the module instance's `replica_ctx` is uninitialized, initialize it. +pub(crate) fn init_database( replica_ctx: &ReplicaContext, module_def: &ModuleDef, - inst: &mut Instance, program: Program, -) -> anyhow::Result> { + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), +) -> (anyhow::Result>, bool) { + extract_trapped(init_database_inner(replica_ctx, module_def, program, call_reducer)) +} + +fn init_database_inner( + replica_ctx: &ReplicaContext, + module_def: &ModuleDef, + program: Program, + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), +) -> anyhow::Result<(Option, bool)> { log::debug!("init database"); let timestamp = Timestamp::now(); let stdb = &*replica_ctx.relational_db; let logger = replica_ctx.logger.system_logger(); + let owner_identity = replica_ctx.database.owner_identity; + let host_type = replica_ctx.host_type; let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); - let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity); + let auth_ctx = AuthCtx::for_current(owner_identity); let (tx, ()) = stdb .with_auto_rollback(tx, |tx| { + // Create all in-memory tables defined by the module, + // with IDs ordered lexicographically by the table names. let mut table_defs: Vec<_> = module_def.tables().collect(); - table_defs.sort_by(|a, b| a.name.cmp(&b.name)); - + table_defs.sort_by_key(|x| &x.name); for def in table_defs { logger.info(&format!("Creating table `{}`", &def.name)); create_table_from_def(stdb, tx, module_def, def)?; } + // Create all in-memory views defined by the module. let mut view_defs: Vec<_> = module_def.views().collect(); - view_defs.sort_by(|a, b| a.name.cmp(&b.name)); - + view_defs.sort_by_key(|x| &x.name); for def in view_defs { logger.info(&format!("Creating table for view `{}`", &def.name)); create_table_from_view_def(stdb, tx, module_def, def)?; @@ -480,7 +485,7 @@ fn init_database( .with_context(|| format!("failed to create row-level security for table `{table_id}`: `{sql}`",))?; } - stdb.set_initialized(tx, replica_ctx.host_type, program)?; + stdb.set_initialized(tx, host_type, program)?; anyhow::Ok(()) }) @@ -491,25 +496,14 @@ fn init_database( if let Some((_tx_offset, tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); } - None + (None, false) } Some((reducer_id, _)) => { logger.info("Invoking `init` reducer"); - let caller_identity = replica_ctx.database.owner_identity; - Some(inst.call_reducer( - Some(tx), - CallReducerParams { - timestamp, - caller_identity, - caller_connection_id: ConnectionId::ZERO, - client: None, - request_id: None, - timer: None, - reducer_id, - args: ArgsTuple::nullary(), - }, - )) + let params = CallReducerParams::from_system(timestamp, owner_identity, reducer_id, ArgsTuple::nullary()); + let (res, trapped) = call_reducer(Some(tx), params); + (Some(res), trapped) } }; @@ -517,6 +511,146 @@ fn init_database( Ok(rcr) } +pub fn call_identity_connected( + caller_auth: ConnectionAuthCtx, + caller_connection_id: ConnectionId, + module: &ModuleInfo, + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), + trapped_slot: &mut bool, +) -> Result<(), ClientConnectedError> { + let reducer_lookup = module.module_def.lifecycle_reducer(Lifecycle::OnConnect); + let stdb = module.relational_db(); + let workload = Workload::reducer_no_args( + "call_identity_connected", + caller_auth.claims.identity, + caller_connection_id, + ); + let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload); + let mut mut_tx = scopeguard::guard(mut_tx, |mut_tx| { + // If we crash before committing, we need to ensure that the transaction is rolled back. + // This is necessary to avoid leaving the database in an inconsistent state. + log::debug!("call_identity_connected: rolling back transaction"); + let (_, metrics, reducer_name) = mut_tx.rollback(); + stdb.report_mut_tx_metrics(reducer_name, metrics, None); + }); + + mut_tx + .insert_st_client( + caller_auth.claims.identity, + caller_connection_id, + &caller_auth.jwt_payload, + ) + .map_err(DBError::from)?; + + if let Some((reducer_id, reducer_def)) = reducer_lookup { + // The module defined a lifecycle reducer to handle new connections. + // Call this reducer. + // If the call fails (as in, something unexpectedly goes wrong with guest execution), + // abort the connection: we can't really recover. + let tx = Some(ScopeGuard::into_inner(mut_tx)); + let params = ModuleHost::call_reducer_params( + module, + caller_auth.claims.identity, + Some(caller_connection_id), + None, + None, + None, + reducer_id, + reducer_def, + FunctionArgs::Nullary, + ) + .map_err(ReducerCallError::from)?; + let (reducer_outcome, trapped) = call_reducer(tx, params); + *trapped_slot = trapped; + + match reducer_outcome.outcome { + // If the reducer committed successfully, we're done. + // `WasmModuleInstance::call_reducer_with_tx` has already ensured + // that `st_client` is updated appropriately. + // + // It's necessary to spread out the responsibility for updating `st_client` in this way + // because it's important that `call_identity_connected` commit at most one transaction. + // A naive implementation of this method would just run the reducer first, + // then insert into `st_client`, + // but if we crashed in between, we'd be left in an inconsistent state + // where the reducer had run but `st_client` was not yet updated. + ReducerOutcome::Committed => Ok(()), + + // If the reducer returned an error or couldn't run due to insufficient energy, + // abort the connection: the module code has decided it doesn't want this client. + ReducerOutcome::Failed(message) => Err(ClientConnectedError::Rejected(message)), + ReducerOutcome::BudgetExceeded => Err(ClientConnectedError::OutOfEnergy), + } + } else { + // The module doesn't define a client_connected reducer. + // We need to commit the transaction to update st_clients and st_connection_credentials. + // + // This is necessary to be able to disconnect clients after a server crash. + + // TODO: Is this being broadcast? Does it need to be, or are st_client table subscriptions + // not allowed? + // I (jsdt) don't think it was being broadcast previously. See: + // https://github.com/clockworklabs/SpacetimeDB/issues/3130 + stdb.finish_tx(ScopeGuard::into_inner(mut_tx), Ok(())) + .map_err(|e: DBError| { + log::error!("`call_identity_connected`: finish transaction failed: {e:#?}"); + ClientConnectedError::DBError(e) + })?; + Ok(()) + } +} + +// Only for logging purposes. +const SCHEDULED_REDUCER: &str = "scheduled_reducer"; + +pub(crate) fn call_scheduled_reducer( + module: &ModuleInfo, + queue_item: QueueItem, + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), +) -> (Result, bool) { + extract_trapped(call_scheduled_reducer_inner(module, queue_item, call_reducer)) +} + +fn call_scheduled_reducer_inner( + module: &ModuleInfo, + item: QueueItem, + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), +) -> Result<(ReducerCallResult, bool), ReducerCallError> { + let db = &module.relational_db(); + let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); + + match handle_queued_call_reducer_params(&tx, module, db, item) { + Ok(Some(params)) => { + // Is necessary to patch the context with the actual calling reducer + let reducer_def = module + .module_def + .get_reducer_by_id(params.reducer_id) + .ok_or(ReducerCallError::ScheduleReducerNotFound)?; + let reducer = &*reducer_def.name; + + tx.ctx = ExecutionContext::with_workload( + tx.ctx.database_identity(), + Workload::Reducer(ReducerContext { + name: reducer.into(), + caller_identity: params.caller_identity, + caller_connection_id: params.caller_connection_id, + timestamp: Timestamp::now(), + arg_bsatn: params.args.get_bsatn().clone(), + }), + ); + + Ok(call_reducer(Some(tx), params)) + } + Ok(None) => Err(ReducerCallError::ScheduleReducerNotFound), + Err(err) => Err(ReducerCallError::Args(InvalidReducerArguments( + InvalidFunctionArguments { + err, + function_name: SCHEDULED_REDUCER.into(), + }, + ))), + } +} + pub struct CallReducerParams { pub timestamp: Timestamp, pub caller_identity: Identity, @@ -528,6 +662,28 @@ pub struct CallReducerParams { pub args: ArgsTuple, } +impl CallReducerParams { + /// Returns a set of parameters for a call that came from within + /// and without a client/caller/request_id. + pub fn from_system( + timestamp: Timestamp, + caller_identity: Identity, + reducer_id: ReducerId, + args: ArgsTuple, + ) -> Self { + Self { + timestamp, + caller_identity, + caller_connection_id: ConnectionId::ZERO, + client: None, + request_id: None, + timer: None, + reducer_id, + args, + } + } +} + pub struct CallProcedureParams { pub timestamp: Timestamp, pub caller_identity: Identity, @@ -826,91 +982,143 @@ impl ModuleHost { }) } - async fn call_async_with_instance(&self, label: &str, f: Fun) -> Result + /// Run a function for this module which has access to the module instance. + async fn with_instance<'a, Guard, R, F>( + &'a self, + kind: &str, + label: &str, + timer: impl FnOnce(&str) -> Guard, + work: impl FnOnce(Guard, &'a SingleCoreExecutor, Instance) -> F, + ) -> Result where - Fun: (FnOnce(Instance) -> Fut) + Send + 'static, - Fut: Future + Send + 'static, - R: Send + 'static, + F: Future, { self.guard_closed()?; - let timer_guard = self.start_call_timer(label); + let timer_guard = timer(label); + // Operations on module instances (e.g. calling reducers) is blocking, + // partially because the computation can potentially take a long time + // and partially because interacting with the database requires taking + // a blocking lock. So, we run `f` on a dedicated thread with `self.executor`. + // This will bubble up any panic that may occur. + + // If a function call panics, we **must** ensure to call `self.on_panic` + // so that the module is discarded by the host controller. scopeguard::defer_on_unwind!({ - log::warn!("procedure {label} panicked"); + log::warn!("{kind} {label} panicked"); (self.on_panic)(); }); // TODO: should we be calling and/or `await`-ing `get_instance` within the below `run_job`? // Unclear how much overhead this call can have. - let instance = self.instance_manager.lock().await.get_instance().await; + let inst = self.instance_manager.lock().await.get_instance().await; - let (res, instance) = self - .executor - .run_job(async move { - drop(timer_guard); - f(instance).await - }) - .await; + let (res, inst) = work(timer_guard, &self.executor, inst).await; - self.instance_manager.lock().await.return_instance(instance); + self.instance_manager.lock().await.return_instance(inst); Ok(res) } - /// Run a function on the JobThread for this module which has access to the module instance. - async fn call(&self, label: &str, f: F) -> Result + async fn call_async_with_instance(&self, label: &str, work: Fun) -> Result where - F: FnOnce(&mut Instance) -> R + Send + 'static, + Fun: (FnOnce(Instance) -> Fut) + Send + 'static, + Fut: Future + Send + 'static, R: Send + 'static, { - self.guard_closed()?; - let timer_guard = self.start_call_timer(label); - - // Operations on module instances (e.g. calling reducers) is blocking, - // partially because the computation can potentially take a long time - // and partially because interacting with the database requires taking - // a blocking lock. So, we run `f` on a dedicated thread with `self.executor`. - // This will bubble up any panic that may occur. - - // If a reducer call panics, we **must** ensure to call `self.on_panic` - // so that the module is discarded by the host controller. - scopeguard::defer_on_unwind!({ - log::warn!("reducer {label} panicked"); - (self.on_panic)(); - }); - - let mut instance = self.instance_manager.lock().await.get_instance().await; - - let (res, instance) = self - .executor - .run_sync_job(move || { - drop(timer_guard); - let res = f(&mut instance); - (res, instance) - }) - .await; - - self.instance_manager.lock().await.return_instance(instance); + self.with_instance( + "procedure", + label, + |l| self.start_call_timer(l), + |timer_guard, executor, inst| { + executor.run_job(async move { + drop(timer_guard); + work(inst).await + }) + }, + ) + .await + } - Ok(res) + /// Run a function for this module which has access to the module instance. + /// + /// For WASM, the function is run on the module's JobThread. + /// For V8/JS, the function is run in the current task. + async fn call( + &self, + label: &str, + arg: A, + wasm: impl FnOnce(A, &mut ModuleInstance) -> R + Send + 'static, + js: impl FnOnce(A, Box) -> JF, + ) -> Result + where + JF: Future)>, + R: Send + 'static, + A: Send + 'static, + { + self.with_instance( + "reducer", + label, + |l| self.start_call_timer(l), + // Operations on module instances (e.g. calling reducers) is blocking, + // partially because the computation can potentially take a long time + // and partially because interacting with the database requires taking a blocking lock. + // So, we run `work` on a dedicated thread with `self.executor`. + // This will bubble up any panic that may occur. + |timer_guard, executor, inst| async move { + match inst { + Instance::Wasm(mut inst) => { + executor + .run_sync_job(move || { + drop(timer_guard); + (wasm(arg, &mut inst), Instance::Wasm(inst)) + }) + .await + } + Instance::Js(inst) => { + drop(timer_guard); + let (res, inst) = js(arg, inst).await; + (res, Instance::Js(inst)) + } + } + }, + ) + .await } pub async fn disconnect_client(&self, client_id: ClientActorId) { log::trace!("disconnecting client {client_id}"); - let this = self.clone(); if let Err(e) = self - .call("disconnect_client", move |inst| { - // Call the `client_disconnected` reducer, if it exists. - // This is a no-op if the module doesn't define such a reducer. - this.subscriptions().remove_subscriber(client_id); - this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst) - }) + .call( + "disconnect_client", + client_id, + |client_id, inst| inst.disconnect_client(client_id), + |client_id, inst| inst.disconnect_client(client_id), + ) .await { log::error!("Error from client_disconnected transaction: {e}"); } } + pub fn disconnect_client_inner( + client_id: ClientActorId, + info: &ModuleInfo, + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), + trapped_slot: &mut bool, + ) -> Result<(), ReducerCallError> { + // Call the `client_disconnected` reducer, if it exists. + // This is a no-op if the module doesn't define such a reducer. + info.subscriptions.remove_subscriber(client_id); + Self::call_identity_disconnected_inner( + client_id.identity, + client_id.connection_id, + info, + call_reducer, + trapped_slot, + ) + } + /// Invoke the module's `client_connected` reducer, if it has one, /// and insert a new row into `st_client` for `(caller_identity, caller_connection_id)`. /// @@ -929,88 +1137,12 @@ impl ModuleHost { caller_auth: ConnectionAuthCtx, caller_connection_id: ConnectionId, ) -> Result<(), ClientConnectedError> { - let me = self.clone(); - self.call("call_identity_connected", move |inst| { - let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnConnect); - let stdb = &me.module.replica_ctx().relational_db; - let workload = Workload::Reducer(ReducerContext { - name: "call_identity_connected".to_owned(), - caller_identity: caller_auth.claims.identity, - caller_connection_id, - timestamp: Timestamp::now(), - arg_bsatn: Bytes::new(), - }); - let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload); - let mut mut_tx = scopeguard::guard(mut_tx, |mut_tx| { - // If we crash before committing, we need to ensure that the transaction is rolled back. - // This is necessary to avoid leaving the database in an inconsistent state. - log::debug!("call_identity_connected: rolling back transaction"); - let (_, metrics, reducer_name) = mut_tx.rollback(); - stdb.report_mut_tx_metrics(reducer_name, metrics, None); - }); - - mut_tx - .insert_st_client( - caller_auth.claims.identity, - caller_connection_id, - &caller_auth.jwt_payload, - ) - .map_err(DBError::from)?; - - if let Some((reducer_id, reducer_def)) = reducer_lookup { - // The module defined a lifecycle reducer to handle new connections. - // Call this reducer. - // If the call fails (as in, something unexpectedly goes wrong with guest execution), - // abort the connection: we can't really recover. - let reducer_outcome = me.call_reducer_inner_with_inst( - Some(ScopeGuard::into_inner(mut_tx)), - caller_auth.claims.identity, - Some(caller_connection_id), - None, - None, - None, - reducer_id, - reducer_def, - FunctionArgs::Nullary, - inst, - )?; - - match reducer_outcome.outcome { - // If the reducer committed successfully, we're done. - // `WasmModuleInstance::call_reducer_with_tx` has already ensured - // that `st_client` is updated appropriately. - // - // It's necessary to spread out the responsibility for updating `st_client` in this way - // because it's important that `call_identity_connected` commit at most one transaction. - // A naive implementation of this method would just run the reducer first, - // then insert into `st_client`, - // but if we crashed in between, we'd be left in an inconsistent state - // where the reducer had run but `st_client` was not yet updated. - ReducerOutcome::Committed => Ok(()), - - // If the reducer returned an error or couldn't run due to insufficient energy, - // abort the connection: the module code has decided it doesn't want this client. - ReducerOutcome::Failed(message) => Err(ClientConnectedError::Rejected(message)), - ReducerOutcome::BudgetExceeded => Err(ClientConnectedError::OutOfEnergy), - } - } else { - // The module doesn't define a client_connected reducer. - // We need to commit the transaction to update st_clients and st_connection_credentials. - // - // This is necessary to be able to disconnect clients after a server crash. - - // TODO: Is this being broadcast? Does it need to be, or are st_client table subscriptions - // not allowed? - // I (jsdt) don't think it was being broadcast previously. See: - // https://github.com/clockworklabs/SpacetimeDB/issues/3130 - stdb.finish_tx(ScopeGuard::into_inner(mut_tx), Ok(())) - .map_err(|e: DBError| { - log::error!("`call_identity_connected`: finish transaction failed: {e:#?}"); - ClientConnectedError::DBError(e) - })?; - Ok(()) - } - }) + self.call( + "call_identity_connected", + (caller_auth, caller_connection_id), + |(a, b), inst| inst.call_identity_connected(a, b), + |(a, b), inst| inst.call_identity_connected(a, b), + ) .await .map_err(ReducerCallError::from)? } @@ -1020,12 +1152,15 @@ impl ModuleHost { /// If the reducer fails, the rows are still deleted. /// Calling this on an already-disconnected client is a no-op. pub fn call_identity_disconnected_inner( - &self, caller_identity: Identity, caller_connection_id: ConnectionId, - inst: &mut Instance, + info: &ModuleInfo, + call_reducer: impl FnOnce(Option, CallReducerParams) -> (ReducerCallResult, bool), + trapped_slot: &mut bool, ) -> Result<(), ReducerCallError> { - let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); + let stdb = info.relational_db(); + + let reducer_lookup = info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect); let reducer_name = reducer_lookup .as_ref() .map(|(_, def)| &*def.name) @@ -1033,22 +1168,11 @@ impl ModuleHost { let is_client_exist = |mut_tx: &MutTxId| mut_tx.st_client_row(caller_identity, caller_connection_id).is_some(); - let workload = || { - Workload::Reducer(ReducerContext { - name: reducer_name.to_owned(), - caller_identity, - caller_connection_id, - timestamp: Timestamp::now(), - arg_bsatn: Bytes::new(), - }) - }; - - let me = self.clone(); - let stdb = me.module.replica_ctx().relational_db.clone(); + let workload = || Workload::reducer_no_args(reducer_name, caller_identity, caller_connection_id); // A fallback transaction that deletes the client from `st_client`. + let database_identity = stdb.database_identity(); let fallback = || { - let database_identity = me.info.database_identity; stdb.with_auto_commit(workload(), |mut_tx| { if !is_client_exist(mut_tx) { // The client is already gone. Nothing to do. @@ -1075,7 +1199,6 @@ impl ModuleHost { }; if let Some((reducer_id, reducer_def)) = reducer_lookup { - let stdb = me.module.replica_ctx().relational_db.clone(); let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload()); if !is_client_exist(&mut_tx) { @@ -1089,8 +1212,9 @@ impl ModuleHost { // The module defined a lifecycle reducer to handle disconnects. Call it. // If it succeeds, `WasmModuleInstance::call_reducer_with_tx` has already ensured // that `st_client` is updated appropriately. - let result = me.call_reducer_inner_with_inst( - Some(mut_tx), + let tx = Some(mut_tx); + let result = Self::call_reducer_params( + info, caller_identity, Some(caller_connection_id), None, @@ -1099,8 +1223,12 @@ impl ModuleHost { reducer_id, reducer_def, FunctionArgs::Nullary, - inst, - ); + ) + .map(|params| { + let (res, trapped) = call_reducer(tx, params); + *trapped_slot = trapped; + res + }); // If it failed, we still need to update `st_client`: the client's not coming back. // Commit a separate transaction that just updates `st_client`. @@ -1152,31 +1280,28 @@ impl ModuleHost { caller_identity: Identity, caller_connection_id: ConnectionId, ) -> Result<(), ReducerCallError> { - let me = self.clone(); - self.call("call_identity_disconnected", move |inst| { - me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst) - }) + self.call( + "call_identity_disconnected", + (caller_identity, caller_connection_id), + |(a, b), inst| inst.call_identity_disconnected(a, b), + |(a, b), inst| inst.call_identity_disconnected(a, b), + ) .await? } /// Empty the system tables tracking clients without running any lifecycle reducers. pub async fn clear_all_clients(&self) -> anyhow::Result<()> { - let me = self.clone(); - self.call("clear_all_clients", move |_| { - let stdb = &me.module.replica_ctx().relational_db; - let workload = Workload::Internal; - stdb.with_auto_commit(workload, |mut_tx| { - stdb.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?; - stdb.clear_table(mut_tx, ST_CLIENT_ID)?; - Ok::<(), DBError>(()) - }) - }) + self.call( + "clear_all_clients", + (), + |_, inst| inst.clear_all_clients(), + |_, inst| inst.clear_all_clients(), + ) .await? - .map_err(anyhow::Error::from) } - async fn call_reducer_inner( - &self, + fn call_reducer_params( + module: &ModuleInfo, caller_identity: Identity, caller_connection_id: Option, client: Option>, @@ -1185,32 +1310,24 @@ impl ModuleHost { reducer_id: ReducerId, reducer_def: &ReducerDef, args: FunctionArgs, - ) -> Result { - let reducer_seed = ArgsSeed(self.info.module_def.typespace().with_type(reducer_def)); + ) -> Result { + let reducer_seed = ArgsSeed(module.module_def.typespace().with_type(reducer_def)); let args = args.into_tuple(reducer_seed).map_err(InvalidReducerArguments)?; let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO); - - Ok(self - .call(&reducer_def.name, move |inst| { - inst.call_reducer( - None, - CallReducerParams { - timestamp: Timestamp::now(), - caller_identity, - caller_connection_id, - client, - request_id, - timer, - reducer_id, - args, - }, - ) - }) - .await?) + Ok(CallReducerParams { + timestamp: Timestamp::now(), + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_id, + args, + }) } - fn call_reducer_inner_with_inst( + + async fn call_reducer_inner( &self, - tx: Option, caller_identity: Identity, caller_connection_id: Option, client: Option>, @@ -1219,25 +1336,29 @@ impl ModuleHost { reducer_id: ReducerId, reducer_def: &ReducerDef, args: FunctionArgs, - module_instance: &mut Instance, ) -> Result { let reducer_seed = ArgsSeed(self.info.module_def.typespace().with_type(reducer_def)); let args = args.into_tuple(reducer_seed).map_err(InvalidReducerArguments)?; let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO); + let call_reducer_params = CallReducerParams { + timestamp: Timestamp::now(), + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_id, + args, + }; - Ok(module_instance.call_reducer( - tx, - CallReducerParams { - timestamp: Timestamp::now(), - caller_identity, - caller_connection_id, - client, - request_id, - timer, - reducer_id, - args, - }, - )) + Ok(self + .call( + &reducer_def.name, + call_reducer_params, + |p, inst| inst.call_reducer(None, p), + |p, inst| inst.call_reducer(None, p), + ) + .await?) } pub async fn call_reducer( @@ -1356,48 +1477,13 @@ impl ModuleHost { // Scheduled reducers require a different function here to call their reducer // because their reducer arguments are stored in the database and need to be fetched // within the same transaction as the reducer call. - pub async fn call_scheduled_reducer( - &self, - call_reducer_params: impl FnOnce(&MutTxId) -> anyhow::Result> + Send + 'static, - ) -> Result { - let db = self.module.replica_ctx().relational_db.clone(); - // scheduled reducer name not fetched yet, anyway this is only for logging purpose - const REDUCER: &str = "scheduled_reducer"; - let module = self.info.clone(); - self.call(REDUCER, move |inst: &mut Instance| { - let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); - - match call_reducer_params(&mut tx) { - Ok(Some(params)) => { - // Is necessary to patch the context with the actual calling reducer - let reducer_def = module - .module_def - .get_reducer_by_id(params.reducer_id) - .ok_or(ReducerCallError::ScheduleReducerNotFound)?; - let reducer = &*reducer_def.name; - - tx.ctx = ExecutionContext::with_workload( - tx.ctx.database_identity(), - Workload::Reducer(ReducerContext { - name: reducer.into(), - caller_identity: params.caller_identity, - caller_connection_id: params.caller_connection_id, - timestamp: Timestamp::now(), - arg_bsatn: params.args.get_bsatn().clone(), - }), - ); - - Ok(inst.call_reducer(Some(tx), params)) - } - Ok(None) => Err(ReducerCallError::ScheduleReducerNotFound), - Err(err) => Err(ReducerCallError::Args(InvalidReducerArguments( - InvalidFunctionArguments { - err, - function_name: REDUCER.into(), - }, - ))), - } - }) + pub(crate) async fn call_scheduled_reducer(&self, item: QueueItem) -> Result { + self.call( + SCHEDULED_REDUCER, + item, + |item, inst| inst.call_scheduled_reducer(item), + |item, inst| inst.call_scheduled_reducer(item), + ) .await? } @@ -1406,11 +1492,12 @@ impl ModuleHost { } pub async fn init_database(&self, program: Program) -> Result, InitDatabaseError> { - let replica_ctx = self.module.replica_ctx().clone(); - let info = self.info.clone(); - self.call("", move |inst| { - init_database(&replica_ctx, &info.module_def, inst, program) - }) + self.call( + "", + program, + |p, inst| inst.init_database(p), + |p, inst| inst.init_database(p), + ) .await? .map_err(InitDatabaseError::Other) } @@ -1421,9 +1508,12 @@ impl ModuleHost { old_module_info: Arc, policy: MigrationPolicy, ) -> Result { - self.call("", move |inst| { - inst.update_database(program, old_module_info, policy) - }) + self.call( + "", + (program, old_module_info, policy), + |(a, b, c), inst| inst.update_database(a, b, c), + |(a, b, c), inst| inst.update_database(a, b, c), + ) .await? } diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index cc6ee632154..f99ee1d7aab 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -6,7 +6,6 @@ use futures::StreamExt; use rustc_hash::FxHashMap; use spacetimedb_client_api_messages::energy::EnergyQuanta; use spacetimedb_lib::scheduler::ScheduleAt; -use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Timestamp; use spacetimedb_primitives::{ColId, TableId}; use spacetimedb_sats::{bsatn::ToBsatn as _, AlgebraicValue}; @@ -16,6 +15,7 @@ use tokio::time::Instant; use tokio_util::time::delay_queue::{self, DelayQueue, Expired}; use crate::db::relational_db::RelationalDB; +use crate::host::module_host::ModuleInfo; use super::module_host::ModuleEvent; use super::module_host::ModuleFunctionCall; @@ -265,7 +265,7 @@ struct SchedulerActor { module_host: WeakModuleHost, } -enum QueueItem { +pub(crate) enum QueueItem { Id { id: ScheduledReducerId, at: Timestamp }, VolatileNonatomicImmediate { reducer_name: String, args: FunctionArgs }, } @@ -326,70 +326,10 @@ impl SchedulerActor { let Some(module_host) = self.module_host.upgrade() else { return; }; - let db = module_host.replica_ctx().relational_db.clone(); - let caller_identity = module_host.info().database_identity; - let module_info = module_host.info.clone(); - - let call_reducer_params = move |tx: &MutTxId| match item { - QueueItem::Id { id, at } => { - let Ok(schedule_row) = get_schedule_row_mut(tx, &db, id) else { - // if the row is not found, it means the schedule is cancelled by the user - log::debug!( - "table row corresponding to yield scheduler id not found: tableid {}, schedulerId {}", - id.table_id, - id.schedule_id - ); - return Ok(None); - }; - - let ScheduledReducer { reducer, bsatn_args } = process_schedule(tx, &db, id.table_id, &schedule_row)?; - - let (reducer_id, reducer_seed) = module_info - .module_def - .reducer_arg_deserialize_seed(&reducer[..]) - .ok_or_else(|| anyhow!("Reducer not found: {reducer}"))?; - - let reducer_args = FunctionArgs::Bsatn(bsatn_args.into()).into_tuple(reducer_seed)?; - - // the timestamp we tell the reducer it's running at will be - // at least the timestamp it was scheduled to run at. - let timestamp = at.max(Timestamp::now()); - - Ok(Some(CallReducerParams { - timestamp, - caller_identity, - caller_connection_id: ConnectionId::ZERO, - client: None, - request_id: None, - timer: None, - reducer_id, - args: reducer_args, - })) - } - QueueItem::VolatileNonatomicImmediate { reducer_name, args } => { - let (reducer_id, reducer_seed) = module_info - .module_def - .reducer_arg_deserialize_seed(&reducer_name[..]) - .ok_or_else(|| anyhow!("Reducer not found: {reducer_name}"))?; - let reducer_args = args.into_tuple(reducer_seed)?; - - Ok(Some(CallReducerParams { - timestamp: Timestamp::now(), - caller_identity, - caller_connection_id: ConnectionId::ZERO, - client: None, - request_id: None, - timer: None, - reducer_id, - args: reducer_args, - })) - } - }; - let db = module_host.replica_ctx().relational_db.clone(); let module_host_clone = module_host.clone(); - let res = tokio::spawn(async move { module_host.call_scheduled_reducer(call_reducer_params).await }).await; + let res = tokio::spawn(async move { module_host.call_scheduled_reducer(item).await }).await; match res { // if we didn't actually call the reducer because the module exited or it was already deleted, leave @@ -467,6 +407,63 @@ impl SchedulerActor { } } +pub(crate) fn handle_queued_call_reducer_params( + tx: &MutTxId, + module_info: &ModuleInfo, + db: &RelationalDB, + item: QueueItem, +) -> anyhow::Result> { + let caller_identity = module_info.database_identity; + + match item { + QueueItem::Id { id, at } => { + let Ok(schedule_row) = get_schedule_row_mut(tx, db, id) else { + // if the row is not found, it means the schedule is cancelled by the user + log::debug!( + "table row corresponding to yield scheduler id not found: tableid {}, schedulerId {}", + id.table_id, + id.schedule_id + ); + return Ok(None); + }; + + let ScheduledReducer { reducer, bsatn_args } = process_schedule(tx, db, id.table_id, &schedule_row)?; + + let (reducer_id, reducer_seed) = module_info + .module_def + .reducer_arg_deserialize_seed(&reducer[..]) + .ok_or_else(|| anyhow!("Reducer not found: {reducer}"))?; + + let reducer_args = FunctionArgs::Bsatn(bsatn_args.into()).into_tuple(reducer_seed)?; + + // the timestamp we tell the reducer it's running at will be + // at least the timestamp it was scheduled to run at. + let timestamp = at.max(Timestamp::now()); + + Ok(Some(CallReducerParams::from_system( + timestamp, + caller_identity, + reducer_id, + reducer_args, + ))) + } + QueueItem::VolatileNonatomicImmediate { reducer_name, args } => { + let (reducer_id, reducer_seed) = module_info + .module_def + .reducer_arg_deserialize_seed(&reducer_name[..]) + .ok_or_else(|| anyhow!("Reducer not found: {reducer_name}"))?; + let reducer_args = args.into_tuple(reducer_seed)?; + + Ok(Some(CallReducerParams::from_system( + Timestamp::now(), + caller_identity, + reducer_id, + reducer_args, + ))) + } + } +} + fn commit_and_broadcast_deletion_event(tx: MutTxId, module_host: ModuleHost) { let caller_identity = module_host.info().database_identity; diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 73b1ad9d76a..fdf8fd51492 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -9,24 +9,31 @@ use self::syscall::{call_call_reducer, call_describe_module, call_reducer_fun, r use super::module_common::{build_common_module_from_raw, run_describer, ModuleCommon}; use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime}; use super::UpdateDatabaseResult; +use crate::client::ClientActorId; use crate::host::instance_env::{ChunkPool, InstanceEnv}; -use crate::host::module_host::Instance; +use crate::host::module_host::{ + call_identity_connected, call_scheduled_reducer, init_database, ClientConnectedError, Instance, +}; +use crate::host::scheduler::QueueItem; use crate::host::wasm_common::instrumentation::CallTimes; use crate::host::wasm_common::module_host_actor::{DescribeError, ExecuteResult, ExecutionTimings, InstanceCommon}; use crate::host::wasm_common::{RowIters, TimingSpanSet}; -use crate::host::{ReducerCallResult, Scheduler}; +use crate::host::{ModuleHost, ReducerCallError, ReducerCallResult, Scheduler}; use crate::module_host_context::{ModuleCreationContext, ModuleCreationContextLimited}; use crate::replica_context::ReplicaContext; use crate::util::asyncify; +use core::any::type_name; use core::str; +use enum_as_inner::EnumAsInner; use itertools::Either; +use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::traits::Program; -use spacetimedb_lib::{RawModuleDef, Timestamp}; +use spacetimedb_lib::{ConnectionId, Identity, RawModuleDef, Timestamp}; use spacetimedb_schema::auto_migrate::MigrationPolicy; -use std::sync::mpsc::{Receiver, SyncSender}; -use std::sync::{mpsc, Arc, LazyLock}; +use std::sync::{Arc, LazyLock}; use std::time::Instant; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::oneshot; use v8::script_compiler::{compile_module, Source}; use v8::{ @@ -245,9 +252,8 @@ impl JsInstanceEnv { /// which will cause the worker's loop to terminate /// and cleanup the isolate and friends. pub struct JsInstance { - request_tx: SyncSender, - update_response_rx: Receiver>, - call_reducer_response_rx: Receiver<(ReducerCallResult, bool)>, + request_tx: Sender, + reply_rx: Receiver<(JsWorkerReply, bool)>, trapped: bool, } @@ -256,44 +262,122 @@ impl JsInstance { self.trapped } - pub fn update_database( - &mut self, - program: Program, - old_module_info: Arc, - policy: MigrationPolicy, - ) -> anyhow::Result { + /// Send a request to the worker and wait for a reply. + async fn send_recv( + mut self: Box, + extract: impl FnOnce(JsWorkerReply) -> Result, + request: JsWorkerRequest, + ) -> (T, Box) { // Send the request. - let request = JsWorkerRequest::UpdateDatabase { - program, - old_module_info, - policy, - }; self.request_tx .send(request) + .await .expect("worker's `request_rx` should be live as `JsInstance::drop` hasn't happened"); // Wait for the response. - self.update_response_rx + let (reply, trapped) = self + .reply_rx .recv() - .expect("worker's `update_response_tx` should be live as `JsInstance::drop` hasn't happened") + .await + .expect("worker's `reply_tx` should be live as `JsInstance::drop` hasn't happened"); + + self.trapped = trapped; + + match extract(reply) { + Err(err) => unreachable!("should have received {} but got {err:?}", type_name::()), + Ok(reply) => (reply, self), + } } - pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> super::ReducerCallResult { - // Send the request. - let request = JsWorkerRequest::CallReducer { tx, params }; - self.request_tx - .send(request) - .expect("worker's `request_rx` should be live as `JsInstance::drop` hasn't happened"); + pub async fn update_database( + self: Box, + program: Program, + old_module_info: Arc, + policy: MigrationPolicy, + ) -> (anyhow::Result, Box) { + self.send_recv( + JsWorkerReply::into_update_database, + JsWorkerRequest::UpdateDatabase { + program, + old_module_info, + policy, + }, + ) + .await + } - // Wait for the response. - let (response, trapped) = self - .call_reducer_response_rx - .recv() - .expect("worker's `call_reducer_response_tx` should be live as `JsInstance::drop` hasn't happened"); + pub async fn call_reducer( + self: Box, + tx: Option, + params: CallReducerParams, + ) -> (ReducerCallResult, Box) { + self.send_recv( + JsWorkerReply::into_call_reducer, + JsWorkerRequest::CallReducer { tx, params }, + ) + .await + } - self.trapped = trapped; + pub async fn clear_all_clients(self: Box) -> (anyhow::Result<()>, Box) { + self.send_recv(JsWorkerReply::into_clear_all_clients, JsWorkerRequest::ClearAllClients) + .await + } - response + pub async fn call_identity_connected( + self: Box, + caller_auth: ConnectionAuthCtx, + caller_connection_id: ConnectionId, + ) -> (Result<(), ClientConnectedError>, Box) { + self.send_recv( + JsWorkerReply::into_call_identity_connected, + JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id), + ) + .await + } + + pub async fn call_identity_disconnected( + self: Box, + caller_identity: Identity, + caller_connection_id: ConnectionId, + ) -> (Result<(), ReducerCallError>, Box) { + self.send_recv( + JsWorkerReply::into_call_identity_disconnected, + JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id), + ) + .await + } + + pub async fn disconnect_client( + self: Box, + client_id: ClientActorId, + ) -> (Result<(), ReducerCallError>, Box) { + self.send_recv( + JsWorkerReply::into_disconnect_client, + JsWorkerRequest::DisconnectClient(client_id), + ) + .await + } + + pub(crate) async fn call_scheduled_reducer( + self: Box, + item: QueueItem, + ) -> (Result, Box) { + self.send_recv( + JsWorkerReply::into_call_scheduled_reducer, + JsWorkerRequest::CallScheduledReducer(item), + ) + .await + } + + pub async fn init_database( + self: Box, + program: Program, + ) -> (anyhow::Result>, Box) { + self.send_recv( + JsWorkerReply::into_init_database, + JsWorkerRequest::InitDatabase(program), + ) + .await } pub async fn call_procedure( @@ -304,6 +388,19 @@ impl JsInstance { } } +/// A reply from the worker in [`spawn_instance_worker`]. +#[derive(EnumAsInner, Debug)] +enum JsWorkerReply { + UpdateDatabase(anyhow::Result), + CallReducer(ReducerCallResult), + ClearAllClients(anyhow::Result<()>), + CallIdentityConnected(Result<(), ClientConnectedError>), + CallIdentityDisconnected(Result<(), ReducerCallError>), + DisconnectClient(Result<(), ReducerCallError>), + CallScheduledReducer(Result), + InitDatabase(anyhow::Result>), +} + /// A request for the worker in [`spawn_instance_worker`]. // We care about optimizing for `CallReducer` as it happens frequently, // so we don't want to box anything in it. @@ -320,6 +417,18 @@ enum JsWorkerRequest { tx: Option, params: CallReducerParams, }, + /// See [`JsInstance::clear_all_clients`]. + ClearAllClients, + /// See [`JsInstance::call_identity_connected`]. + CallIdentityConnected(ConnectionAuthCtx, ConnectionId), + /// See [`JsInstance::call_identity_disconnected`]. + CallIdentityDisconnected(Identity, ConnectionId), + /// See [`JsInstance::disconnect_client`]. + DisconnectClient(ClientActorId), + /// See [`JsInstance::call_scheduled_reducer`]. + CallScheduledReducer(QueueItem), + /// See [`JsInstance::init_database`]. + InitDatabase(Program), } /// Performs some of the startup work of [`spawn_instance_worker`]. @@ -376,11 +485,9 @@ fn spawn_instance_worker( // The use-case is SPSC and all channels are rendezvous channels // where each `.send` blocks until it's received. // The Instance --Request-> Worker channel: - let (request_tx, request_rx) = mpsc::sync_channel(0); - // The Worker --UpdateResponse-> Instance channel: - let (update_response_tx, update_response_rx) = mpsc::sync_channel(0); - // The Worker --ReducerCallResult-> Instance channel: - let (call_reducer_response_tx, call_reducer_response_rx) = mpsc::sync_channel(0); + let (request_tx, mut request_rx) = channel(1); + // The Worker --Reply-> Instance channel: + let (reply_tx, reply_rx) = channel(1); // This one-shot channel is used for initial startup error handling within the thread. let (result_tx, result_rx) = oneshot::channel(); @@ -412,6 +519,7 @@ fn spawn_instance_worker( }; // Setup the instance common and environment. + let info = &module_common.info(); let mut instance_common = InstanceCommon::new(&module_common); let replica_ctx: &Arc = module_common.replica_ctx(); let scheduler = module_common.scheduler().clone(); @@ -422,22 +530,27 @@ fn spawn_instance_worker( // // The loop is terminated when a `JsInstance` is dropped. // This will cause channels, scopes, and the isolate to be cleaned up. - for request in request_rx.iter() { + let reply = |ctx: &str, reply: JsWorkerReply, trapped| { + if let Err(e) = reply_tx.blocking_send((reply, trapped)) { + // This should never happen as `JsInstance::$function` immediately + // does `.recv` on the other end of the channel. + unreachable!("should have receiver for `{ctx}` response, {e}"); + } + }; + while let Some(request) = request_rx.blocking_recv() { + let mut call_reducer = + |tx, params| call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params); + + use JsWorkerReply::*; match request { JsWorkerRequest::UpdateDatabase { program, old_module_info, policy, } => { - // Update the database. + // Update the database and reply to `JsInstance::update_database`. let res = instance_common.update_database(replica_ctx, program, old_module_info, policy); - - // Reply to `JsInstance::update_database`. - if let Err(e) = update_response_tx.send(res) { - // This should never happen as `JsInstance::update_database` immediately - // does `.recv` on the other end of the channel. - unreachable!("should have receiver for `update_database` response, {e}"); - } + reply("update_database", UpdateDatabase(res), false); } JsWorkerRequest::CallReducer { tx, params } => { // Call the reducer. @@ -445,26 +558,55 @@ fn spawn_instance_worker( // but rather let this happen by `return_instance` using `JsInstance::trapped` // which will cause `JsInstance` to be dropped, // which in turn results in the loop being terminated. - let res = call_reducer(&mut instance_common, replica_ctx, scope, call_reducer_fun, tx, params); - - // Reply to `JsInstance::call_reducer`. - if let Err(e) = call_reducer_response_tx.send(res) { - // This should never happen as `JsInstance::call_reducer` immediately - // does `.recv` on the other end of the channel. - unreachable!("should have receiver for `call_reducer` response, {e}"); - } + let (res, trapped) = call_reducer(tx, params); + reply("call_reducer", CallReducer(res), trapped); + } + JsWorkerRequest::ClearAllClients => { + let res = instance_common.clear_all_clients(); + reply("clear_all_clients", ClearAllClients(res), false); + } + JsWorkerRequest::CallIdentityConnected(caller_auth, caller_connection_id) => { + let mut trapped = false; + let res = + call_identity_connected(caller_auth, caller_connection_id, info, call_reducer, &mut trapped); + reply("call_identity_connected", CallIdentityConnected(res), trapped); + } + JsWorkerRequest::CallIdentityDisconnected(caller_identity, caller_connection_id) => { + let mut trapped = false; + let res = ModuleHost::call_identity_disconnected_inner( + caller_identity, + caller_connection_id, + info, + call_reducer, + &mut trapped, + ); + reply("call_identity_disconnected", CallIdentityDisconnected(res), trapped); + } + JsWorkerRequest::DisconnectClient(client_id) => { + let mut trapped = false; + let res = ModuleHost::disconnect_client_inner(client_id, info, call_reducer, &mut trapped); + reply("disconnect_client", DisconnectClient(res), trapped); + } + JsWorkerRequest::CallScheduledReducer(queue_item) => { + let (res, trapped): (Result, bool) = + call_scheduled_reducer(info, queue_item, call_reducer); + reply("call_scheduled_reducer", CallScheduledReducer(res), trapped); + } + JsWorkerRequest::InitDatabase(program) => { + let (res, trapped): (Result, anyhow::Error>, bool) = + init_database(replica_ctx, &module_common.info().module_def, program, call_reducer); + reply("init_database", InitDatabase(res), trapped); } } } }); // Get the module, if any, and get any setup errors from the worker. - let res = result_rx.blocking_recv().expect("should have a sender"); + let res: Result = result_rx.blocking_recv().expect("should have a sender"); res.map(|opt_mc| { let inst = JsInstance { request_tx, - update_response_rx, - call_reducer_response_rx, + reply_rx, trapped: false, }; (opt_mc, inst) @@ -581,7 +723,7 @@ fn call_reducer<'scope>( fun: Local<'scope, Function>, tx: Option, params: CallReducerParams, -) -> (super::ReducerCallResult, bool) { +) -> (ReducerCallResult, bool) { let mut trapped = false; let (res, _) = instance_common.call_reducer_with_tx( diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 6bf94421f2a..ea7fa7e9c32 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1,26 +1,17 @@ -use bytes::Bytes; -use prometheus::{Histogram, IntCounter, IntGauge}; -use spacetimedb_lib::db::raw_def::v9::Lifecycle; -use spacetimedb_lib::de::DeserializeSeed; -use spacetimedb_primitives::ProcedureId; -use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError}; -use std::future::Future; -use std::sync::Arc; -use std::time::Duration; -use tracing::span::EnteredSpan; - use super::instrumentation::CallTimes; -use crate::client::ClientConnectionSender; +use crate::client::{ClientActorId, ClientConnectionSender}; use crate::database_logger; use crate::energy::{EnergyMonitor, ReducerBudget, ReducerFingerprint}; use crate::host::instance_env::InstanceEnv; use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; use crate::host::module_host::{ - CallProcedureParams, CallReducerParams, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, + call_identity_connected, call_scheduled_reducer, init_database, CallProcedureParams, CallReducerParams, + ClientConnectedError, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall, ModuleInfo, }; +use crate::host::scheduler::QueueItem; use crate::host::{ - ArgsTuple, ProcedureCallError, ProcedureCallResult, ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, - UpdateDatabaseResult, + ArgsTuple, ModuleHost, ProcedureCallError, ProcedureCallResult, ReducerCallError, ReducerCallResult, ReducerId, + ReducerOutcome, Scheduler, UpdateDatabaseResult, }; use crate::identity::Identity; use crate::messages::control_db::HostType; @@ -29,13 +20,24 @@ use crate::replica_context::ReplicaContext; use crate::subscription::module_subscription_actor::WriteConflict; use crate::util::prometheus_handle::{HistogramExt, TimerGuard}; use crate::worker_metrics::WORKER_METRICS; +use bytes::Bytes; +use prometheus::{Histogram, IntCounter, IntGauge}; +use spacetimedb_auth::identity::ConnectionAuthCtx; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::execution_context::{self, ReducerContext, Workload}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::traits::{IsolationLevel, Program}; use spacetimedb_lib::buffer::DecodeError; +use spacetimedb_lib::db::raw_def::v9::Lifecycle; +use spacetimedb_lib::de::DeserializeSeed; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::{bsatn, ConnectionId, RawModuleDef, Timestamp}; +use spacetimedb_primitives::ProcedureId; +use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError}; +use std::future::Future; +use std::sync::Arc; +use std::time::Duration; +use tracing::span::EnteredSpan; use super::*; @@ -271,7 +273,71 @@ impl WasmModuleInstance { } pub fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { - crate::callgrind_flag::invoke_allowing_callgrind(|| self.call_reducer_with_tx(tx, params)) + let (res, trapped) = self.call_reducer_inner(tx, params); + self.trapped = trapped; + res + } + + pub fn clear_all_clients(&self) -> anyhow::Result<()> { + self.common.clear_all_clients() + } + + pub fn call_identity_connected( + &mut self, + caller_auth: ConnectionAuthCtx, + caller_connection_id: ConnectionId, + ) -> Result<(), ClientConnectedError> { + let module = &self.common.info.clone(); + let call_reducer = |tx, params| self.call_reducer_inner(tx, params); + let mut trapped = false; + let res = call_identity_connected(caller_auth, caller_connection_id, module, call_reducer, &mut trapped); + self.trapped = trapped; + res + } + + pub fn call_identity_disconnected( + &mut self, + caller_identity: Identity, + caller_connection_id: ConnectionId, + ) -> Result<(), ReducerCallError> { + let module = &self.common.info.clone(); + let call_reducer = |tx, params| self.call_reducer_inner(tx, params); + let mut trapped = false; + let res = ModuleHost::call_identity_disconnected_inner( + caller_identity, + caller_connection_id, + module, + call_reducer, + &mut trapped, + ); + self.trapped = trapped; + res + } + + pub fn disconnect_client(&mut self, client_id: ClientActorId) -> Result<(), ReducerCallError> { + let module = &self.common.info.clone(); + let call_reducer = |tx, params| self.call_reducer_inner(tx, params); + let mut trapped = false; + let res = ModuleHost::disconnect_client_inner(client_id, module, call_reducer, &mut trapped); + self.trapped = trapped; + res + } + + pub(crate) fn call_scheduled_reducer(&mut self, item: QueueItem) -> Result { + let module = &self.common.info.clone(); + let call_reducer = |tx, params| self.call_reducer_inner(tx, params); + let (res, trapped) = call_scheduled_reducer(module, item, call_reducer); + self.trapped = trapped; + res + } + + pub fn init_database(&mut self, program: Program) -> anyhow::Result> { + let module_def = &self.common.info.clone().module_def; + let replica_ctx = &self.instance.instance_env().replica_ctx.clone(); + let call_reducer = |tx, params| self.call_reducer_inner(tx, params); + let (res, trapped) = init_database(replica_ctx, module_def, program, call_reducer); + self.trapped = trapped; + res } pub async fn call_procedure( @@ -294,9 +360,13 @@ impl WasmModuleInstance { } impl WasmModuleInstance { + fn call_reducer_inner(&mut self, tx: Option, params: CallReducerParams) -> (ReducerCallResult, bool) { + crate::callgrind_flag::invoke_allowing_callgrind(|| self.call_reducer_with_tx(tx, params)) + } + #[tracing::instrument(level = "trace", skip_all)] - fn call_reducer_with_tx(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { - let (res, trapped) = self.common.call_reducer_with_tx( + fn call_reducer_with_tx(&mut self, tx: Option, params: CallReducerParams) -> (ReducerCallResult, bool) { + self.common.call_reducer_with_tx( &self.instance.instance_env().replica_ctx.clone(), tx, params, @@ -308,9 +378,7 @@ impl WasmModuleInstance { .clone() .set(tx, || self.instance.call_reducer(op, budget)) }, - ); - self.trapped = trapped; - res + ) } } @@ -668,6 +736,11 @@ impl InstanceCommon { (res, outer_error) } + + /// Empty the system tables tracking clients without running any lifecycle reducers. + pub(crate) fn clear_all_clients(&self) -> anyhow::Result<()> { + self.info.relational_db().clear_all_clients().map_err(Into::into) + } } /// VM-related metrics for reducer execution. diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 1c1f492b285..f89a9f2620a 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -213,6 +213,13 @@ impl ModuleSubscriptions { ) } + /// Returns the [`RelationalDB`] of this [`ModuleSubscriptions`]. + /// + /// This is used by [`ModuleInfo`] and in turn by `InstanceCommon`. + pub fn relational_db(&self) -> &Arc { + &self.relational_db + } + // Recompute gauges to update metrics. pub fn update_gauges(&self) { let num_queries = self.subscriptions.read().calculate_gauge_stats(); diff --git a/crates/datastore/src/execution_context.rs b/crates/datastore/src/execution_context.rs index ef72cbbdf25..22b843a99a3 100644 --- a/crates/datastore/src/execution_context.rs +++ b/crates/datastore/src/execution_context.rs @@ -105,6 +105,19 @@ pub enum Workload { } impl Workload { + /// Returns a reducer workload with no arguments to the reducer + /// and the current timestamp. + pub fn reducer_no_args(name: &str, id: Identity, conn_id: ConnectionId) -> Self { + Self::Reducer(ReducerContext { + name: name.into(), + caller_identity: id, + caller_connection_id: conn_id, + timestamp: Timestamp::now(), + arg_bsatn: Bytes::new(), + }) + } + + /// Returns the workload's type/kind, without any of the data. pub fn workload_type(&self) -> WorkloadType { match self { #[cfg(any(test, feature = "test"))]