diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index a872a4b6824..e1e56e9de00 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -12,12 +12,14 @@ use self::syscall::{ use super::module_common::{build_common_module_from_raw, run_describer, ModuleCommon}; use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime}; use super::UpdateDatabaseResult; -use crate::host::instance_env::{ChunkPool, InstanceEnv}; +use crate::host::instance_env::{ChunkPool, InstanceEnv, TxSlot}; use crate::host::module_host::{CallViewParams, Instance, ViewCallResult}; use crate::host::v8::error::{ErrorOrException, ExceptionThrown}; use crate::host::wasm_common::instrumentation::CallTimes; use crate::host::wasm_common::module_host_actor::{ - DescribeError, ExecutionStats, ExecutionTimings, InstanceCommon, ReducerExecuteResult, ViewExecuteResult, + AnonymousViewOp, DescribeError, ExecutionError, ExecutionResult, ExecutionStats, ExecutionTimings, InstanceCommon, + InstanceOp, ProcedureExecuteResult, ProcedureOp, ReducerExecuteResult, ReducerOp, ViewExecuteResult, ViewOp, + WasmInstance, }; use crate::host::wasm_common::{RowIters, TimingSpanSet}; use crate::host::{ReducerCallResult, Scheduler}; @@ -28,7 +30,7 @@ use anyhow::Context as _; use core::str; use itertools::Either; use spacetimedb_client_api_messages::energy::FunctionBudget; -use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCall}; +use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId}; use spacetimedb_datastore::traits::Program; use spacetimedb_lib::{RawModuleDef, Timestamp}; use spacetimedb_schema::auto_migrate::MigrationPolicy; @@ -439,6 +441,12 @@ fn spawn_instance_worker( let instance_env = InstanceEnv::new(replica_ctx.clone(), scheduler); scope.set_slot(JsInstanceEnv::new(instance_env)); + let mut inst = V8Instance { + scope, + replica_ctx, + hooks: &hooks, + }; + // Process requests to the worker. // // The loop is terminated when a `JsInstance` is dropped. @@ -466,7 +474,7 @@ fn spawn_instance_worker( // but rather let this happen by `return_instance` using `JsInstance::trapped` // which will cause `JsInstance` to be dropped, // which in turn results in the loop being terminated. - let res = call_reducer(&mut instance_common, replica_ctx, scope, &hooks, tx, params); + let res = instance_common.call_reducer_with_tx(tx, params, &mut inst); // Reply to `JsInstance::call_reducer`. if let Err(e) = call_reducer_response_tx.send(res) { @@ -476,7 +484,7 @@ fn spawn_instance_worker( } } JsWorkerRequest::CallView { tx, params } => { - let res = call_view(&mut instance_common, replica_ctx, scope, &hooks, tx, params); + let res = instance_common.call_view_with_tx(tx, params, &mut inst); if let Err(e) = call_view_response_tx.send(res) { unreachable!("should have receiver for `call_view` response, {e}"); @@ -603,43 +611,83 @@ fn call_free_fun<'scope>( fun.call(scope, receiver, args).ok_or_else(exception_already_thrown) } -#[allow(clippy::too_many_arguments)] -fn common_call<'scope, R>( +struct V8Instance<'a, 'scope, 'isolate> { + scope: &'a mut PinScope<'scope, 'isolate>, + replica_ctx: &'a Arc, + hooks: &'a HookFunctions<'a>, +} + +impl WasmInstance for V8Instance<'_, '_, '_> { + fn extract_descriptions(&mut self) -> Result { + extract_description(self.scope, self.hooks, self.replica_ctx) + } + + fn replica_ctx(&self) -> &Arc { + self.replica_ctx + } + + fn tx_slot(&self) -> TxSlot { + self.scope.get_slot::().unwrap().instance_env.tx.clone() + } + + fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> ReducerExecuteResult { + let ExecutionResult { stats, call_result } = common_call(self.scope, budget, op, |scope, op| { + Ok(call_call_reducer(scope, self.hooks, op)?) + }); + let call_result = call_result.and_then(|res| res.map_err(ExecutionError::User)); + ExecutionResult { stats, call_result } + } + + fn call_view(&mut self, op: ViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult { + common_call(self.scope, budget, op, |scope, op| { + call_call_view(scope, self.hooks, op) + }) + } + + fn call_view_anon(&mut self, op: AnonymousViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult { + common_call(self.scope, budget, op, |scope, op| { + call_call_view_anon(scope, self.hooks, op) + }) + } + + fn log_traceback(&self, func_type: &str, func: &str, trap: &anyhow::Error) { + log_traceback(self.replica_ctx, func_type, func, trap) + } + + async fn call_procedure(&mut self, _op: ProcedureOp, _budget: FunctionBudget) -> ProcedureExecuteResult { + todo!("JS/TS module procedure support") + } +} + +fn common_call<'scope, R, O, F>( scope: &mut PinScope<'scope, '_>, - tx: MutTxId, - name: &str, - timestamp: Timestamp, budget: FunctionBudget, - func_type: FuncCallType, - trapped: &mut bool, - call: impl FnOnce(&mut PinScope<'scope, '_>) -> Result>, -) -> (MutTxId, ExecutionStats, anyhow::Result) { + op: O, + call: F, +) -> ExecutionResult> +where + O: InstanceOp, + F: FnOnce(&mut PinScope<'scope, '_>, O) -> Result>, +{ // TODO(v8): Start the budget timeout and long-running logger. let env = env_on_isolate_unwrap(scope); - let mut tx_slot = env.instance_env.tx.clone(); // Start the timer. // We'd like this tightly around `call`. - env.start_funcall(name, timestamp, func_type); - - // Call the function with `tx` provided. - // It should not be available before. - let (tx, call_result) = tx_slot.set(tx, || { - catch_exception(scope, call).map_err(|(e, can_continue)| { - // Convert `can_continue` to whether the isolate has "trapped". - // Also cancel execution termination if needed, - // that can occur due to terminating long running reducers. - *trapped = match can_continue { - CanContinue::No => false, - CanContinue::Yes => true, - CanContinue::YesCancelTermination => { - scope.cancel_terminate_execution(); - true - } - }; - - anyhow::Error::from(e) - }) + env.start_funcall(op.name(), op.timestamp(), op.call_type()); + + let call_result = catch_exception(scope, |scope| call(scope, op)).map_err(|(e, can_continue)| { + // Convert `can_continue` to whether the isolate has "trapped". + // Also cancel execution termination if needed, + // that can occur due to terminating long running reducers. + match can_continue { + CanContinue::No => ExecutionError::Trap(e.into()), + CanContinue::Yes => ExecutionError::Recoverable(e.into()), + CanContinue::YesCancelTermination => { + scope.cancel_terminate_execution(); + ExecutionError::Trap(e.into()) + } + } }); // Finish timings. @@ -657,73 +705,7 @@ fn common_call<'scope, R>( timings, memory_allocation, }; - (tx, stats, call_result) -} - -fn call_reducer<'scope>( - instance_common: &mut InstanceCommon, - replica_ctx: &ReplicaContext, - scope: &mut PinScope<'scope, '_>, - hooks: &HookFunctions<'_>, - tx: Option, - params: CallReducerParams, -) -> (super::ReducerCallResult, bool) { - let mut trapped = false; - - let (res, _) = instance_common.call_reducer_with_tx( - replica_ctx, - tx, - params, - move |a, b, c| log_traceback(replica_ctx, a, b, c), - |tx, op, budget| { - let func = FuncCallType::Reducer; - let (tx, stats, call_result) = - common_call(scope, tx, op.name, op.timestamp, budget, func, &mut trapped, |scope| { - let res = call_call_reducer(scope, hooks, op)?; - Ok(res) - }); - (tx, ReducerExecuteResult { stats, call_result }) - }, - ); - - (res, trapped) -} - -fn call_view<'scope>( - instance_common: &mut InstanceCommon, - replica_ctx: &ReplicaContext, - scope: &mut PinScope<'scope, '_>, - hooks: &HookFunctions<'_>, - tx: MutTxId, - params: CallViewParams, -) -> (ViewCallResult, bool) { - let mut trapped = false; - - let is_anonymous = params.is_anonymous; - let (res, _) = instance_common.call_view_with_tx( - replica_ctx, - tx, - params, - move |a, b, c| log_traceback(replica_ctx, a, b, c), - |tx, op, budget| { - let func = FuncCallType::View(if is_anonymous { - ViewCall::anonymous(op.db_id, op.args.get_bsatn().clone()) - } else { - ViewCall::with_identity(*op.caller_identity, op.db_id, op.args.get_bsatn().clone()) - }); - let (tx, stats, call_result) = - common_call(scope, tx, op.name, op.timestamp, budget, func, &mut trapped, |scope| { - Ok(if is_anonymous { - call_call_view_anon(scope, hooks, op.into())? - } else { - call_call_view(scope, hooks, op)? - }) - }); - (tx, ViewExecuteResult { stats, call_result }) - }, - ); - - (res, trapped) + ExecutionResult { stats, call_result } } /// Extracts the raw module def by running the registered `__describe_module__` hook. diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 01c20cc59fe..c69cfcbcc13 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1,5 +1,7 @@ use bytes::Bytes; use prometheus::{Histogram, IntCounter, IntGauge}; +use spacetimedb_datastore::locking_tx_datastore::FuncCallType; +use spacetimedb_datastore::locking_tx_datastore::ViewCall; use spacetimedb_lib::db::raw_def::v9::Lifecycle; use spacetimedb_lib::de::DeserializeSeed as _; use spacetimedb_primitives::ProcedureId; @@ -17,6 +19,7 @@ use crate::database_logger; use crate::energy::{EnergyMonitor, FunctionBudget, FunctionFingerprint}; use crate::host::host_controller::ViewOutcome; use crate::host::instance_env::InstanceEnv; +use crate::host::instance_env::TxSlot; use crate::host::module_common::{build_common_module_from_raw, ModuleCommon}; use crate::host::module_host::ViewCallResult; use crate::host::module_host::{ @@ -60,11 +63,14 @@ pub trait WasmInstancePre: Send + Sync + 'static { fn instantiate(&self, env: InstanceEnv, func_names: &FuncNames) -> Result; } -#[async_trait::async_trait] -pub trait WasmInstance: Send + Sync + 'static { - fn extract_descriptions(&mut self) -> Result, DescribeError>; +// TODO: Technically this trait is also used for V8. +// We should rename and move to some place more appropriate. +pub trait WasmInstance { + fn extract_descriptions(&mut self) -> Result; - fn instance_env(&self) -> &InstanceEnv; + fn replica_ctx(&self) -> &Arc; + + fn tx_slot(&self) -> TxSlot; fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> ReducerExecuteResult; @@ -72,9 +78,13 @@ pub trait WasmInstance: Send + Sync + 'static { fn call_view_anon(&mut self, op: AnonymousViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult; - fn log_traceback(func_type: &str, func: &str, trap: &anyhow::Error); + fn log_traceback(&self, func_type: &str, func: &str, trap: &anyhow::Error); - async fn call_procedure(&mut self, op: ProcedureOp, budget: FunctionBudget) -> ProcedureExecuteResult; + fn call_procedure( + &mut self, + op: ProcedureOp, + budget: FunctionBudget, + ) -> impl Future; } pub struct EnergyStats { @@ -119,26 +129,25 @@ pub struct ExecutionStats { pub memory_allocation: usize, } -#[derive(derive_more::AsRef)] -pub struct ReducerExecuteResult { - #[as_ref] - pub stats: ExecutionStats, - pub call_result: anyhow::Result, +pub enum ExecutionError { + User(Box), + Recoverable(anyhow::Error), + Trap(anyhow::Error), } #[derive(derive_more::AsRef)] -pub struct ViewExecuteResult { - #[as_ref] - pub stats: ExecutionStats, - pub call_result: anyhow::Result, -} -#[derive(derive_more::AsRef)] -pub struct ProcedureExecuteResult { +pub struct ExecutionResult { #[as_ref] pub stats: ExecutionStats, - pub call_result: anyhow::Result, + pub call_result: T, } +pub type ReducerExecuteResult = ExecutionResult>; + +pub type ViewExecuteResult = ExecutionResult>; + +pub type ProcedureExecuteResult = ExecutionResult>; + pub struct WasmModuleHostActor { module: T::InstancePre, common: ModuleCommon, @@ -206,7 +215,6 @@ impl WasmModuleHostActor { let mut instance = uninit_instance.instantiate(instance_env, &func_names)?; let desc = instance.extract_descriptions()?; - let desc: RawModuleDef = bsatn::from_slice(&desc).map_err(DescribeError::Decode)?; // Validate and create a common module rom the raw definition. let common = build_common_module_from_raw(mcc, desc)?; @@ -284,7 +292,7 @@ impl WasmModuleInstance { old_module_info: Arc, policy: MigrationPolicy, ) -> anyhow::Result { - let replica_ctx = &self.instance.instance_env().replica_ctx; + let replica_ctx = self.instance.replica_ctx(); self.common .update_database(replica_ctx, program, old_module_info, policy) } @@ -297,14 +305,7 @@ impl WasmModuleInstance { &mut self, params: CallProcedureParams, ) -> Result { - let res = self - .common - .call_procedure( - params, - |ty, fun, err| T::log_traceback(ty, fun, err), - |op, budget| self.instance.call_procedure(op, budget), - ) - .await; + let res = self.common.call_procedure(params, &mut self.instance).await; if res.is_err() { self.trapped = true; } @@ -315,40 +316,13 @@ impl WasmModuleInstance { impl WasmModuleInstance { #[tracing::instrument(level = "trace", skip_all)] fn call_reducer_with_tx(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult { - let (res, trapped) = self.common.call_reducer_with_tx( - &self.instance.instance_env().replica_ctx.clone(), - tx, - params, - |ty, fun, err| T::log_traceback(ty, fun, err), - |tx, op, budget| { - self.instance - .instance_env() - .tx - .clone() - .set(tx, || self.instance.call_reducer(op, budget)) - }, - ); + let (res, trapped) = self.common.call_reducer_with_tx(tx, params, &mut self.instance); self.trapped = trapped; res } pub fn call_view(&mut self, tx: MutTxId, params: CallViewParams) -> ViewCallResult { - let is_anonymous = params.is_anonymous; - let (res, trapped) = self.common.call_view_with_tx( - &self.instance.instance_env().replica_ctx.clone(), - tx, - params, - |ty, fun, err| T::log_traceback(ty, fun, err), - |tx, op: ViewOp<'_>, budget| { - self.instance.instance_env().tx.clone().set(tx, || { - if is_anonymous { - self.instance.call_view_anon(op.into(), budget) - } else { - self.instance.call_view(op, budget) - } - }) - }, - ); + let (res, trapped) = self.common.call_view_with_tx(tx, params, &mut self.instance); self.trapped = trapped; res @@ -434,11 +408,10 @@ impl InstanceCommon { } } - async fn call_procedure>( + async fn call_procedure( &mut self, params: CallProcedureParams, - log_traceback: impl FnOnce(&str, &str, &anyhow::Error), - vm_call_procedure: impl FnOnce(ProcedureOp, FunctionBudget) -> F, + inst: &mut I, ) -> Result { let CallProcedureParams { timestamp, @@ -476,7 +449,7 @@ impl InstanceCommon { // TODO(procedure-energy): replace with call to separate function `procedure_budget`. let budget = self.energy_monitor.reducer_budget(&energy_fingerprint); - let result = vm_call_procedure(op, budget).await; + let result = inst.call_procedure(op, budget).await; let ProcedureExecuteResult { stats: @@ -496,7 +469,7 @@ impl InstanceCommon { match call_result { Err(err) => { - log_traceback("procedure", &procedure_def.name, &err); + inst.log_traceback("procedure", &procedure_def.name, &err); WORKER_METRICS .wasm_instance_errors @@ -550,13 +523,11 @@ impl InstanceCommon { /// /// The `bool` in the return type signifies whether there was an "outer error". /// For WASM, this should be interpreted as a trap occurring. - pub(crate) fn call_reducer_with_tx( + pub(crate) fn call_reducer_with_tx( &mut self, - replica_ctx: &ReplicaContext, tx: Option, params: CallReducerParams, - log_traceback: impl FnOnce(&str, &str, &anyhow::Error), - vm_call_reducer: impl FnOnce(MutTxId, ReducerOp<'_>, FunctionBudget) -> (MutTxId, ReducerExecuteResult), + inst: &mut I, ) -> (ReducerCallResult, bool) { let CallReducerParams { timestamp, @@ -570,6 +541,7 @@ impl InstanceCommon { } = params; let caller_connection_id_opt = (caller_connection_id != ConnectionId::ZERO).then_some(caller_connection_id); + let replica_ctx = inst.replica_ctx(); let stdb = &*replica_ctx.relational_db.clone(); let database_identity = replica_ctx.database_identity; let info = self.info.clone(); @@ -592,14 +564,13 @@ impl InstanceCommon { let workload = Workload::Reducer(ReducerContext::from(op.clone())); let tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload)); + let mut tx_slot = inst.tx_slot(); let _guard = vm_metrics.timer_guard_for_reducer_plus_query(tx.timer); let vm_metrics = VmMetrics::new(&database_identity, reducer_name); - let (tx, result) = self.call_function(caller_identity, reducer_name, |budget| { - let (tx, res) = vm_call_reducer(tx, op, budget); - (Some(tx), res) + let (mut tx, result) = tx_slot.set(tx, || { + self.call_function(caller_identity, reducer_name, |budget| inst.call_reducer(op, budget)) }); - let mut tx = tx.expect("transaction should be present here"); let energy_used = result.stats.energy.used(); let energy_quanta_used = energy_used.into(); @@ -610,17 +581,17 @@ impl InstanceCommon { &result.stats.timings.wasm_instance_env_call_times, ); - let mut trapped = false; + // An outer error occurred. + // This signifies a logic error in the module rather than a properly + // handled bad argument from the caller of a reducer. + // For WASM, this will be interpreted as a trap + // and that the instance must be discarded. + // However, that does not necessarily apply to e.g., V8. + let trapped = matches!(result.call_result, Err(ExecutionError::Trap(_))); + let status = match result.call_result { - Err(err) => { - log_traceback("reducer", reducer_name, &err); - // An outer error occurred. - // This signifies a logic error in the module rather than a properly - // handled bad argument from the caller of a reducer. - // For WASM, this will be interpreted as a trap - // and that the instance must be discarded. - // However, that does not necessarily apply to e.g., V8. - trapped = true; + Err(ExecutionError::Recoverable(err) | ExecutionError::Trap(err)) => { + inst.log_traceback("reducer", reducer_name, &err); self.handle_outer_error( &result.stats.energy, @@ -629,25 +600,30 @@ impl InstanceCommon { reducer_name, ) } + Err(ExecutionError::User(err)) => { + log_reducer_error(inst.replica_ctx(), timestamp, reducer_name, &err); + EventStatus::Failed(err.into()) + } // We haven't actually committed yet - `commit_and_broadcast_event` will commit // for us and replace this with the actual database update. - Ok(res) => match res.and_then(|()| { + Ok(()) => { // If this is an OnDisconnect lifecycle event, remove the client from st_clients. // We handle OnConnect events before running the reducer. - match reducer_def.lifecycle { - Some(Lifecycle::OnDisconnect) => tx - .delete_st_client(caller_identity, caller_connection_id, database_identity) - .map_err(|e| e.to_string().into()), + let res = match reducer_def.lifecycle { + Some(Lifecycle::OnDisconnect) => { + tx.delete_st_client(caller_identity, caller_connection_id, database_identity) + } _ => Ok(()), + }; + match res { + Ok(()) => EventStatus::Committed(DatabaseUpdate::default()), + Err(err) => { + let err = err.to_string(); + log_reducer_error(inst.replica_ctx(), timestamp, reducer_name, &err); + EventStatus::Failed(err) + } } - }) { - Ok(()) => EventStatus::Committed(DatabaseUpdate::default()), - Err(err) => { - log::info!("reducer returned error: {err}"); - log_reducer_error(replica_ctx, timestamp, reducer_name, &err); - EventStatus::Failed(err.into()) - } - }, + } }; let event = ModuleEvent { @@ -706,9 +682,9 @@ impl InstanceCommon { caller_identity: Identity, function_name: &str, vm_call_function: F, - ) -> (Option, R) + ) -> R where - F: FnOnce(FunctionBudget) -> (Option, R), + F: FnOnce(FunctionBudget) -> R, { let energy_fingerprint = FunctionFingerprint { module_hash: self.info.module_hash, @@ -720,7 +696,7 @@ impl InstanceCommon { let function_span = start_run_function_span(budget); - let (tx, result) = vm_call_function(budget); + let result = vm_call_function(budget); let stats: &ExecutionStats = result.as_ref(); let energy_used = stats.energy.used(); @@ -741,7 +717,7 @@ impl InstanceCommon { .record("timings.total_duration", tracing::field::debug(timings.total_duration)) .record("energy.used", tracing::field::debug(energy_used)); - (tx, result) + result } /// Execute a view. @@ -749,13 +725,11 @@ impl InstanceCommon { /// Similar to `call_reducer_with_tx`, but for views. /// unlike to `call_reducer_with_tx`, It does not handle `tx`creation or commit, /// It returns the updated `tx` instead. - pub(crate) fn call_view_with_tx( + pub(crate) fn call_view_with_tx( &mut self, - replica_ctx: &ReplicaContext, tx: MutTxId, params: CallViewParams, - log_traceback: impl FnOnce(&str, &str, &anyhow::Error), - vm_call_view: impl FnOnce(MutTxId, ViewOp<'_>, FunctionBudget) -> (MutTxId, ViewExecuteResult), + inst: &mut I, ) -> (ViewCallResult, bool) { let CallViewParams { caller_identity, @@ -772,6 +746,8 @@ impl InstanceCommon { let view_def = info.module_def.view_by_id(view_id, is_anonymous); let view_name = &*view_def.name; + let mut tx_slot = inst.tx_slot(); + let _outer_span = start_call_function_span(view_name, &caller_identity, caller_connection_id); let op = ViewOp { @@ -783,23 +759,34 @@ impl InstanceCommon { timestamp, }; - let (tx, result) = self.call_function(caller_identity, view_name, |budget| { - let (tx, res) = vm_call_view(tx, op, budget); - (Some(tx), res) + let (mut tx, result) = tx_slot.set(tx, || { + self.call_function(caller_identity, view_name, |budget| { + if is_anonymous { + inst.call_view_anon(op.into(), budget) + } else { + inst.call_view(op, budget) + } + }) }); - let mut tx = tx.expect("transaction should be present here"); - let mut trapped = false; + let trapped = matches!(result.call_result, Err(ExecutionError::Trap(_))); + let outcome = match result.call_result { - Err(err) => { - log_traceback("view", view_name, &err); - trapped = true; + Err(ExecutionError::Recoverable(err) | ExecutionError::Trap(err)) => { + inst.log_traceback("view", view_name, &err); + + self.handle_outer_error(&result.stats.energy, &caller_identity, &caller_connection_id, view_name) + .into() + } + // TODO: maybe do something else with user errors? + Err(ExecutionError::User(err)) => { + inst.log_traceback("view", view_name, &anyhow::anyhow!(err)); self.handle_outer_error(&result.stats.energy, &caller_identity, &caller_connection_id, view_name) .into() } Ok(res) => { - let db = &replica_ctx.relational_db.clone(); + let db = &inst.replica_ctx().relational_db; db.materialize_view( &mut tx, view_name, @@ -918,6 +905,8 @@ fn maybe_log_long_running_function(reducer_name: &str, total_duration: Duration) fn log_reducer_error(replica_ctx: &ReplicaContext, timestamp: Timestamp, reducer: &str, message: &str) { use database_logger::Record; + log::info!("reducer returned error: {message}"); + let record = Record { ts: chrono::DateTime::from_timestamp_micros(timestamp.to_micros_since_unix_epoch()).unwrap(), function: Some(reducer), @@ -963,6 +952,12 @@ fn commit_and_broadcast_event( } } +pub trait InstanceOp { + fn name(&self) -> &str; + fn timestamp(&self) -> Timestamp; + fn call_type(&self) -> FuncCallType; +} + /// Describes a view call in a cheaply shareable way. #[derive(Clone, Debug)] pub struct ViewOp<'a> { @@ -974,6 +969,22 @@ pub struct ViewOp<'a> { pub timestamp: Timestamp, } +impl InstanceOp for ViewOp<'_> { + fn name(&self) -> &str { + self.name + } + fn timestamp(&self) -> Timestamp { + self.timestamp + } + fn call_type(&self) -> FuncCallType { + FuncCallType::View(ViewCall::with_identity( + *self.caller_identity, + self.db_id, + self.args.get_bsatn().clone(), + )) + } +} + /// Describes an anonymous view call in a cheaply shareable way. #[derive(Clone, Debug)] pub struct AnonymousViewOp<'a> { @@ -984,6 +995,18 @@ pub struct AnonymousViewOp<'a> { pub timestamp: Timestamp, } +impl InstanceOp for AnonymousViewOp<'_> { + fn name(&self) -> &str { + self.name + } + fn timestamp(&self) -> Timestamp { + self.timestamp + } + fn call_type(&self) -> FuncCallType { + FuncCallType::View(ViewCall::anonymous(self.db_id, self.args.get_bsatn().clone())) + } +} + impl<'a> From> for AnonymousViewOp<'a> { fn from( ViewOp { @@ -1017,6 +1040,18 @@ pub struct ReducerOp<'a> { pub args: &'a ArgsTuple, } +impl InstanceOp for ReducerOp<'_> { + fn name(&self) -> &str { + self.name + } + fn timestamp(&self) -> Timestamp { + self.timestamp + } + fn call_type(&self) -> FuncCallType { + FuncCallType::Reducer + } +} + impl From> for execution_context::ReducerContext { fn from( ReducerOp { @@ -1048,3 +1083,15 @@ pub struct ProcedureOp { pub timestamp: Timestamp, pub arg_bytes: Bytes, } + +impl InstanceOp for ProcedureOp { + fn name(&self) -> &str { + &self.name + } + fn timestamp(&self) -> Timestamp { + self.timestamp + } + fn call_type(&self) -> FuncCallType { + FuncCallType::Procedure + } +} diff --git a/crates/core/src/host/wasmtime/wasmtime_module.rs b/crates/core/src/host/wasmtime/wasmtime_module.rs index a4e631040ac..a7947956f62 100644 --- a/crates/core/src/host/wasmtime/wasmtime_module.rs +++ b/crates/core/src/host/wasmtime/wasmtime_module.rs @@ -1,18 +1,21 @@ +use std::sync::Arc; + use self::module_host_actor::ReducerOp; use super::wasm_instance_env::WasmInstanceEnv; use super::{Mem, WasmtimeFuel, EPOCH_TICKS_PER_SECOND}; use crate::energy::FunctionBudget; -use crate::host::instance_env::InstanceEnv; +use crate::host::instance_env::{InstanceEnv, TxSlot}; use crate::host::module_common::run_describer; use crate::host::wasm_common::module_host_actor::{ - AnonymousViewOp, DescribeError, ExecutionStats, InitializationError, ViewOp, + AnonymousViewOp, DescribeError, ExecutionError, ExecutionStats, InitializationError, InstanceOp, ViewOp, }; use crate::host::wasm_common::*; +use crate::replica_context::ReplicaContext; use crate::util::string_from_utf8_lossy_owned; use futures_util::FutureExt; -use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, ViewCall}; -use spacetimedb_lib::{ConnectionId, Identity}; +use spacetimedb_datastore::locking_tx_datastore::FuncCallType; +use spacetimedb_lib::{bsatn, ConnectionId, Identity, RawModuleDef}; use spacetimedb_primitives::errno::HOST_CALL_FAILURE; use wasmtime::{ AsContext, AsContextMut, ExternType, Instance, InstancePre, Linker, Store, TypedFunc, WasmBacktrace, WasmParams, @@ -90,23 +93,19 @@ impl module_host_actor::WasmModule for WasmtimeModule { } } -fn handle_error_sink_code(code: i32, error: Vec) -> Result<(), Box> { - match code { - 0 => Ok(()), - CALL_FAILURE => Err(string_from_utf8_lossy_owned(error).into()), - _ => Err("unknown return code".into()), - } +fn handle_error_sink_code(code: i32, error: Vec) -> Result<(), ExecutionError> { + handle_result_sink_code(code, error).map(drop) } /// Handle the return code from a function using a result sink. /// /// On success, returns the result bytes. /// On failure, returns the error message. -fn handle_result_sink_code(code: i32, result: Vec) -> Result, Box> { +fn handle_result_sink_code(code: i32, result: Vec) -> Result, ExecutionError> { match code { 0 => Ok(result), - CALL_FAILURE => Err(string_from_utf8_lossy_owned(result).into()), - _ => Err("unknown return code".into()), + CALL_FAILURE => Err(ExecutionError::User(string_from_utf8_lossy_owned(result).into())), + _ => Err(ExecutionError::Recoverable(anyhow::anyhow!("unknown return code"))), } } @@ -172,14 +171,18 @@ impl module_host_actor::WasmInstancePre for WasmtimeModule { let setup_error = store.data_mut().setup_standard_bytes_sink(); let res = call_sync_typed_func(&init, &mut store, setup_error); let error = store.data_mut().take_standard_bytes_sink(); - match res { - // TODO: catch this and return the error message to the http client - Ok(code) => handle_error_sink_code(code, error).map_err(InitializationError::Setup)?, - Err(err) => { + + let res = res + .map_err(ExecutionError::Trap) + .and_then(|code| handle_error_sink_code(code, error)); + + res.map_err(|e| match e { + ExecutionError::User(err) => InitializationError::Setup(err), + ExecutionError::Recoverable(err) | ExecutionError::Trap(err) => { let func = SETUP_DUNDER.to_owned(); - return Err(InitializationError::RuntimeError { err, func }); + InitializationError::RuntimeError { err, func } } - } + })? } let call_reducer = instance @@ -330,9 +333,8 @@ pub struct WasmtimeInstance { call_view_anon: Option, } -#[async_trait::async_trait] impl module_host_actor::WasmInstance for WasmtimeInstance { - fn extract_descriptions(&mut self) -> Result, DescribeError> { + fn extract_descriptions(&mut self) -> Result { let describer_func_name = DESCRIBE_MODULE_DUNDER; let describer = self @@ -349,11 +351,17 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { // Fetch the bsatn returned by the describer call. let bytes = self.store.data_mut().take_standard_bytes_sink(); - Ok(bytes) + let desc: RawModuleDef = bsatn::from_slice(&bytes).map_err(DescribeError::Decode)?; + + Ok(desc) + } + + fn replica_ctx(&self) -> &Arc { + &self.store.data().instance_env().replica_ctx } - fn instance_env(&self) -> &InstanceEnv { - self.store.data().instance_env() + fn tx_slot(&self) -> TxSlot { + self.store.data().instance_env().tx.clone() } #[tracing::instrument(level = "trace", skip_all)] @@ -372,7 +380,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let (args_source, errors_sink) = store .data_mut() - .start_funcall(op.name, args_bytes, op.timestamp, FuncCallType::Reducer); + .start_funcall(op.name, args_bytes, op.timestamp, op.call_type()); let call_result = call_sync_typed_func( &self.call_reducer, @@ -391,25 +399,19 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { ), ); - // Signal that this reducer call is finished. This gets us the timings - // associated to our reducer call, and clears all of the instance state - // associated to the call. - let (timings, error) = store.data_mut().finish_funcall(); + let (stats, error) = finish_opcall(store, budget); - let call_result = call_result.map(|code| handle_error_sink_code(code, error)); + let call_result = call_result + .map_err(ExecutionError::Trap) + .and_then(|code| handle_error_sink_code(code, error)); - module_host_actor::ReducerExecuteResult { - stats: get_execution_stats(store, budget, timings), - call_result, - } + module_host_actor::ReducerExecuteResult { stats, call_result } } fn call_view(&mut self, op: ViewOp<'_>, budget: FunctionBudget) -> module_host_actor::ViewExecuteResult { let store = &mut self.store; prepare_store_for_call(store, budget); - let view = ViewCall::with_identity(*op.caller_identity, op.db_id, op.args.get_bsatn().clone()); - // Prepare sender identity and connection ID, as LITTLE-ENDIAN byte arrays. let [sender_0, sender_1, sender_2, sender_3] = prepare_identity_for_call(*op.caller_identity); // Prepare arguments to the reducer + the error sink & start timings. @@ -418,16 +420,16 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let (args_source, errors_sink) = store .data_mut() - .start_funcall(op.name, args_bytes, op.timestamp, FuncCallType::View(view)); + .start_funcall(op.name, args_bytes, op.timestamp, op.call_type()); let Some(call_view) = self.call_view.as_ref() else { return module_host_actor::ViewExecuteResult { stats: zero_execution_stats(store), - call_result: Err(anyhow::anyhow!( + call_result: Err(ExecutionError::Recoverable(anyhow::anyhow!( "Module defines view {} but does not export `{}`", op.name, CALL_VIEW_DUNDER, - )), + ))), }; }; @@ -445,19 +447,14 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { ), ); - // Signal that this view call is finished. This gets us the timings - // associated to our view call, and clears all of the instance state - // associated to the call. - let (timings, result_bytes) = store.data_mut().finish_funcall(); + let (stats, result_bytes) = finish_opcall(store, budget); let call_result = call_result - .and_then(|code| handle_result_sink_code(code, result_bytes).map_err(|e| anyhow::anyhow!(e))) + .map_err(ExecutionError::Trap) + .and_then(|code| handle_result_sink_code(code, result_bytes)) .map(|r| r.into()); - module_host_actor::ViewExecuteResult { - stats: get_execution_stats(store, budget, timings), - call_result, - } + module_host_actor::ViewExecuteResult { stats, call_result } } fn call_view_anon( @@ -468,44 +465,38 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { let store = &mut self.store; prepare_store_for_call(store, budget); - let view = ViewCall::anonymous(op.db_id, op.args.get_bsatn().clone()); // Prepare arguments to the reducer + the error sink & start timings. let args_bytes = op.args.get_bsatn().clone(); let (args_source, errors_sink) = store .data_mut() - .start_funcall(op.name, args_bytes, op.timestamp, FuncCallType::View(view)); + .start_funcall(op.name, args_bytes, op.timestamp, op.call_type()); let Some(call_view_anon) = self.call_view_anon.as_ref() else { return module_host_actor::ViewExecuteResult { stats: zero_execution_stats(store), - call_result: Err(anyhow::anyhow!( + call_result: Err(ExecutionError::Recoverable(anyhow::anyhow!( "Module defines anonymous view {} but does not export `{}`", op.name, CALL_VIEW_ANON_DUNDER, - )), + ))), }; }; let call_result = call_sync_typed_func(call_view_anon, &mut *store, (op.id.0, args_source.0, errors_sink)); - // Signal that this view call is finished. This gets us the timings - // associated to our view call, and clears all of the instance state - // associated to the call. - let (timings, result_bytes) = store.data_mut().finish_funcall(); + let (stats, result_bytes) = finish_opcall(store, budget); let call_result = call_result - .and_then(|code| handle_result_sink_code(code, result_bytes).map_err(|e| anyhow::anyhow!(e))) + .map_err(ExecutionError::Trap) + .and_then(|code| handle_result_sink_code(code, result_bytes)) .map(|r| r.into()); - module_host_actor::ViewExecuteResult { - stats: get_execution_stats(store, budget, timings), - call_result, - } + module_host_actor::ViewExecuteResult { stats, call_result } } - fn log_traceback(func_type: &str, func: &str, trap: &anyhow::Error) { + fn log_traceback(&self, func_type: &str, func: &str, trap: &anyhow::Error) { log_traceback(func_type, func, trap) } @@ -557,7 +548,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { .await; // Close the timing span for this procedure and get the BSATN bytes of its result. - let (timings, result_bytes) = store.data_mut().finish_funcall(); + let (stats, result_bytes) = finish_opcall(store, budget); let call_result = call_result.and_then(|code| { (code == 0).then_some(result_bytes.into()).ok_or_else(|| { @@ -567,10 +558,7 @@ impl module_host_actor::WasmInstance for WasmtimeInstance { }) }); - module_host_actor::ProcedureExecuteResult { - stats: get_execution_stats(store, budget, timings), - call_result, - } + module_host_actor::ProcedureExecuteResult { stats, call_result } } } @@ -620,23 +608,26 @@ fn prepare_connection_id_for_call(caller_connection_id: ConnectionId) -> [u64; 2 bytemuck::must_cast(caller_connection_id.as_le_byte_array()) } -/// Compute fuel and heap usage for a call and construct the `ExecutionStats`. -fn get_execution_stats( - store: &Store, - initial_budget: FunctionBudget, - timings: module_host_actor::ExecutionTimings, -) -> ExecutionStats { +/// Finish the op call and calculate its [`ExecutionStats`]. +fn finish_opcall(store: &mut Store, initial_budget: FunctionBudget) -> (ExecutionStats, Vec) { + // Signal that this call is finished. This gets us the timings + // associated with it, and clears all of the instance state + // related to it. + let (timings, ret_bytes) = store.data_mut().finish_funcall(); + let remaining_fuel = get_store_fuel(store); let remaining: FunctionBudget = remaining_fuel.into(); let energy = module_host_actor::EnergyStats { budget: initial_budget, remaining, }; - ExecutionStats { + + let stats = ExecutionStats { energy, timings, memory_allocation: get_memory_size(store), - } + }; + (stats, ret_bytes) } fn zero_execution_stats(store: &Store) -> ExecutionStats {