Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 84 additions & 102 deletions crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use self::syscall::{
use super::module_common::{build_common_module_from_raw, run_describer, ModuleCommon};
use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime};
use super::UpdateDatabaseResult;
use crate::host::instance_env::{ChunkPool, InstanceEnv};
use crate::host::instance_env::{ChunkPool, InstanceEnv, TxSlot};
use crate::host::module_host::{CallViewParams, Instance, ViewCallResult};
use crate::host::v8::error::{ErrorOrException, ExceptionThrown};
use crate::host::wasm_common::instrumentation::CallTimes;
use crate::host::wasm_common::module_host_actor::{
DescribeError, ExecutionStats, ExecutionTimings, InstanceCommon, ReducerExecuteResult, ViewExecuteResult,
AnonymousViewOp, DescribeError, ExecutionError, ExecutionResult, ExecutionStats, ExecutionTimings, InstanceCommon,
InstanceOp, ProcedureExecuteResult, ProcedureOp, ReducerExecuteResult, ReducerOp, ViewExecuteResult, ViewOp,
WasmInstance,
};
use crate::host::wasm_common::{RowIters, TimingSpanSet};
use crate::host::{ReducerCallResult, Scheduler};
Expand All @@ -28,7 +30,7 @@ use anyhow::Context as _;
use core::str;
use itertools::Either;
use spacetimedb_client_api_messages::energy::FunctionBudget;
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCall};
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId};
use spacetimedb_datastore::traits::Program;
use spacetimedb_lib::{RawModuleDef, Timestamp};
use spacetimedb_schema::auto_migrate::MigrationPolicy;
Expand Down Expand Up @@ -439,6 +441,12 @@ fn spawn_instance_worker(
let instance_env = InstanceEnv::new(replica_ctx.clone(), scheduler);
scope.set_slot(JsInstanceEnv::new(instance_env));

let mut inst = V8Instance {
scope,
replica_ctx,
hooks: &hooks,
};

// Process requests to the worker.
//
// The loop is terminated when a `JsInstance` is dropped.
Expand Down Expand Up @@ -466,7 +474,7 @@ fn spawn_instance_worker(
// but rather let this happen by `return_instance` using `JsInstance::trapped`
// which will cause `JsInstance` to be dropped,
// which in turn results in the loop being terminated.
let res = call_reducer(&mut instance_common, replica_ctx, scope, &hooks, tx, params);
let res = instance_common.call_reducer_with_tx(tx, params, &mut inst);

// Reply to `JsInstance::call_reducer`.
if let Err(e) = call_reducer_response_tx.send(res) {
Expand All @@ -476,7 +484,7 @@ fn spawn_instance_worker(
}
}
JsWorkerRequest::CallView { tx, params } => {
let res = call_view(&mut instance_common, replica_ctx, scope, &hooks, tx, params);
let res = instance_common.call_view_with_tx(tx, params, &mut inst);

if let Err(e) = call_view_response_tx.send(res) {
unreachable!("should have receiver for `call_view` response, {e}");
Expand Down Expand Up @@ -603,43 +611,83 @@ fn call_free_fun<'scope>(
fun.call(scope, receiver, args).ok_or_else(exception_already_thrown)
}

#[allow(clippy::too_many_arguments)]
fn common_call<'scope, R>(
struct V8Instance<'a, 'scope, 'isolate> {
scope: &'a mut PinScope<'scope, 'isolate>,
replica_ctx: &'a Arc<ReplicaContext>,
hooks: &'a HookFunctions<'a>,
}

impl WasmInstance for V8Instance<'_, '_, '_> {
fn extract_descriptions(&mut self) -> Result<RawModuleDef, DescribeError> {
extract_description(&mut self.scope, self.hooks, self.replica_ctx)
}

fn replica_ctx(&self) -> &Arc<ReplicaContext> {
self.replica_ctx
}

fn tx_slot(&self) -> TxSlot {
self.scope.get_slot::<JsInstanceEnv>().unwrap().instance_env.tx.clone()
}

fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> ReducerExecuteResult {
let ExecutionResult { stats, call_result } = common_call(self.scope, budget, op, |scope, op| {
Ok(call_call_reducer(scope, self.hooks, op)?)
});
let call_result = call_result.and_then(|res| res.map_err(ExecutionError::User));
ExecutionResult { stats, call_result }
}

fn call_view(&mut self, op: ViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult {
common_call(self.scope, budget, op, |scope, op| {
call_call_view(scope, self.hooks, op)
})
}

fn call_view_anon(&mut self, op: AnonymousViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult {
common_call(self.scope, budget, op, |scope, op| {
call_call_view_anon(scope, self.hooks, op)
})
}

fn log_traceback(&self, func_type: &str, func: &str, trap: &anyhow::Error) {
log_traceback(self.replica_ctx, func_type, func, trap)
}

async fn call_procedure(&mut self, _op: ProcedureOp, _budget: FunctionBudget) -> ProcedureExecuteResult {
todo!("JS/TS module procedure support")
}
}

fn common_call<'scope, R, O, F>(
scope: &mut PinScope<'scope, '_>,
tx: MutTxId,
name: &str,
timestamp: Timestamp,
budget: FunctionBudget,
func_type: FuncCallType,
trapped: &mut bool,
call: impl FnOnce(&mut PinScope<'scope, '_>) -> Result<R, ErrorOrException<ExceptionThrown>>,
) -> (MutTxId, ExecutionStats, anyhow::Result<R>) {
op: O,
call: F,
) -> ExecutionResult<Result<R, ExecutionError>>
where
O: InstanceOp,
F: FnOnce(&mut PinScope<'scope, '_>, O) -> Result<R, ErrorOrException<ExceptionThrown>>,
{
// TODO(v8): Start the budget timeout and long-running logger.
let env = env_on_isolate_unwrap(scope);
let mut tx_slot = env.instance_env.tx.clone();

// Start the timer.
// We'd like this tightly around `call`.
env.start_funcall(name, timestamp, func_type);

// Call the function with `tx` provided.
// It should not be available before.
let (tx, call_result) = tx_slot.set(tx, || {
catch_exception(scope, call).map_err(|(e, can_continue)| {
// Convert `can_continue` to whether the isolate has "trapped".
// Also cancel execution termination if needed,
// that can occur due to terminating long running reducers.
*trapped = match can_continue {
CanContinue::No => false,
CanContinue::Yes => true,
CanContinue::YesCancelTermination => {
scope.cancel_terminate_execution();
true
}
};

anyhow::Error::from(e)
})
env.start_funcall(op.name(), op.timestamp(), op.call_type());

let call_result = catch_exception(scope, |scope| call(scope, op)).map_err(|(e, can_continue)| {
// Convert `can_continue` to whether the isolate has "trapped".
// Also cancel execution termination if needed,
// that can occur due to terminating long running reducers.
match can_continue {
CanContinue::No => ExecutionError::Trap(e.into()),
CanContinue::Yes => ExecutionError::Recoverable(e.into()),
CanContinue::YesCancelTermination => {
scope.cancel_terminate_execution();
ExecutionError::Trap(e.into())
}
}
});

// Finish timings.
Expand All @@ -657,73 +705,7 @@ fn common_call<'scope, R>(
timings,
memory_allocation,
};
(tx, stats, call_result)
}

fn call_reducer<'scope>(
instance_common: &mut InstanceCommon,
replica_ctx: &ReplicaContext,
scope: &mut PinScope<'scope, '_>,
hooks: &HookFunctions<'_>,
tx: Option<MutTxId>,
params: CallReducerParams,
) -> (super::ReducerCallResult, bool) {
let mut trapped = false;

let (res, _) = instance_common.call_reducer_with_tx(
replica_ctx,
tx,
params,
move |a, b, c| log_traceback(replica_ctx, a, b, c),
|tx, op, budget| {
let func = FuncCallType::Reducer;
let (tx, stats, call_result) =
common_call(scope, tx, op.name, op.timestamp, budget, func, &mut trapped, |scope| {
let res = call_call_reducer(scope, hooks, op)?;
Ok(res)
});
(tx, ReducerExecuteResult { stats, call_result })
},
);

(res, trapped)
}

fn call_view<'scope>(
instance_common: &mut InstanceCommon,
replica_ctx: &ReplicaContext,
scope: &mut PinScope<'scope, '_>,
hooks: &HookFunctions<'_>,
tx: MutTxId,
params: CallViewParams,
) -> (ViewCallResult, bool) {
let mut trapped = false;

let is_anonymous = params.is_anonymous;
let (res, _) = instance_common.call_view_with_tx(
replica_ctx,
tx,
params,
move |a, b, c| log_traceback(replica_ctx, a, b, c),
|tx, op, budget| {
let func = FuncCallType::View(if is_anonymous {
ViewCall::anonymous(op.db_id, op.args.get_bsatn().clone())
} else {
ViewCall::with_identity(*op.caller_identity, op.db_id, op.args.get_bsatn().clone())
});
let (tx, stats, call_result) =
common_call(scope, tx, op.name, op.timestamp, budget, func, &mut trapped, |scope| {
Ok(if is_anonymous {
call_call_view_anon(scope, hooks, op.into())?
} else {
call_call_view(scope, hooks, op)?
})
});
(tx, ViewExecuteResult { stats, call_result })
},
);

(res, trapped)
ExecutionResult { stats, call_result }
}

/// Extracts the raw module def by running the registered `__describe_module__` hook.
Expand Down
Loading
Loading