diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index 32ec5668434..a55e13c5f2e 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -91,7 +91,6 @@ impl Host { auth, Some(&module_host.info.subscriptions), Some(&module_host), - auth.caller, &mut header, ) .await diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 84e321a261b..a78a36fc015 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -15,6 +15,7 @@ use spacetimedb_table::table::ReadViaBsatnError; use thiserror::Error; use crate::client::ClientActorId; +use crate::host::module_host::ViewCallError; use crate::host::scheduler::ScheduleError; use spacetimedb_lib::buffer::DecodeError; use spacetimedb_primitives::*; @@ -147,6 +148,8 @@ pub enum DBError { RestoreSnapshot(#[from] RestoreSnapshotError), #[error(transparent)] DurabilityGone(#[from] DurabilityExited), + #[error(transparent)] + View(#[from] ViewCallError), } impl DBError { diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index fc090ddcfec..513b710fba8 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -39,17 +39,19 @@ use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_datastore::error::DatastoreError; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; -use spacetimedb_datastore::locking_tx_datastore::MutTxId; +use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; +use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; use spacetimedb_durability::DurableOffset; use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject}; +use spacetimedb_expr::expr::CollectViews; use spacetimedb_lib::db::raw_def::v9::Lifecycle; use spacetimedb_lib::identity::{AuthCtx, RequestId}; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::ConnectionId; use spacetimedb_lib::Timestamp; -use spacetimedb_primitives::{ProcedureId, TableId, ViewDatabaseId, ViewId}; +use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewDatabaseId, ViewId}; use spacetimedb_query::compile_subscription; use spacetimedb_sats::{AlgebraicTypeRef, ProductValue}; use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy}; @@ -57,7 +59,7 @@ use spacetimedb_schema::def::deserialize::ArgsSeed; use spacetimedb_schema::def::{ModuleDef, ProcedureDef, ReducerDef, TableDef, ViewDef}; use spacetimedb_schema::schema::{Schema, TableSchema}; use spacetimedb_vm::relation::RelValue; -use std::collections::VecDeque; +use std::collections::{HashSet, VecDeque}; use std::fmt; use std::future::Future; use std::sync::atomic::AtomicBool; @@ -1487,6 +1489,29 @@ impl ModuleHost { .await? } + /// Downgrade this mutable `tx` after: + /// 1. Collecting view ids from `view_collector` and + /// 2. Materializing them if necessary + pub async fn materialize_views_and_downgrade_tx( + &self, + mut tx: MutTxId, + view_collector: &impl CollectViews, + sender: Identity, + workload: Workload, + ) -> Result<(TxData, TxMetrics, TxId), ViewCallError> { + use FunctionArgs::*; + let mut view_ids = HashSet::new(); + view_collector.collect_views(&mut view_ids); + for view_id in view_ids { + let name = tx.lookup_st_view(view_id)?.view_name; + if !tx.is_view_materialized(view_id, ArgId::SENTINEL, sender)? { + tx = self.call_view(tx, &name, Nullary, sender, None).await?.tx; + } + tx.st_view_sub_update_or_insert_last_called(view_id, ArgId::SENTINEL, sender)?; + } + Ok(tx.commit_downgrade(workload)) + } + pub async fn call_view( &self, tx: MutTxId, diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 314e1f8ede3..b05cdad487c 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -20,8 +20,8 @@ use spacetimedb_datastore::traits::IsolationLevel; use spacetimedb_expr::statement::Statement; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::metrics::ExecutionMetrics; +use spacetimedb_lib::Timestamp; use spacetimedb_lib::{AlgebraicType, ProductType, ProductValue}; -use spacetimedb_lib::{Identity, Timestamp}; use spacetimedb_query::{compile_sql_stmt, execute_dml_stmt, execute_select_stmt}; use spacetimedb_schema::relation::FieldName; use spacetimedb_vm::eval::run_ast; @@ -192,50 +192,26 @@ pub async fn run( auth: AuthCtx, subs: Option<&ModuleSubscriptions>, module: Option<&ModuleHost>, - caller_identity: Identity, head: &mut Vec<(Box, AlgebraicType)>, ) -> Result { + let module = module + .as_ref() + .ok_or_else(|| anyhow!("Cannot execute views without module context"))?; + + let mut metrics = ExecutionMetrics::default(); + // We parse the sql statement in a mutable transaction. // If it turns out to be a query, we downgrade the tx. - let (mut tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| { + let (tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| { compile_sql_stmt(sql_text, &SchemaViewer::new(tx, &auth), &auth) })?; - let mut metrics = ExecutionMetrics::default(); - - for (view_name, args) in stmt.views() { - let (is_materialized, args) = tx - .is_materialized(view_name, args, caller_identity) - .map_err(|e| DBError::Other(anyhow!("Failed to check memoized view: {e}")))?; - - // Skip if already memoized - if is_materialized { - continue; - } - - let module = module - .as_ref() - .ok_or_else(|| anyhow!("Cannot execute view `{view_name}` without module context"))?; - - let res = module - .call_view( - tx, - view_name, - crate::host::FunctionArgs::Bsatn(args), - caller_identity, - None, - ) - .await - .map_err(|e| DBError::Other(anyhow!("Failed to execute view `{view_name}`: {e}")))?; - - tx = res.tx; - } - match stmt { Statement::Select(stmt) => { - // Up to this point, the tx has been read-only, - // and hence there are no deltas to process. - let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql); + // Materialize views and downgrade to a read-only transaction + let (tx_data, tx_metrics_mut, tx) = module + .materialize_views_and_downgrade_tx(tx, &stmt, auth.caller, Workload::Sql) + .await?; let (tx_offset_send, tx_offset) = oneshot::channel(); // Release the tx on drop, so that we record metrics @@ -397,7 +373,6 @@ pub(crate) mod tests { AuthCtx::for_testing(), Some(&subs), None, - Identity::ZERO, &mut vec![], )) .map(|x| x.rows) @@ -550,7 +525,7 @@ pub(crate) mod tests { expected: impl IntoIterator, ) { assert_eq!( - run(db, sql, *auth, None, None, Identity::ZERO, &mut vec![]) + run(db, sql, *auth, None, None, &mut vec![]) .await .unwrap() .rows @@ -1505,9 +1480,7 @@ pub(crate) mod tests { let rt = db.runtime().expect("runtime should be there"); - let run = |db, sql, auth, subs, mut tmp_vec| { - rt.block_on(run(db, sql, auth, subs, None, Identity::ZERO, &mut tmp_vec)) - }; + let run = |db, sql, auth, subs, mut tmp_vec| rt.block_on(run(db, sql, auth, subs, None, &mut tmp_vec)); // No row limit, both queries pass. assert!(run(&db, "SELECT * FROM T", internal_auth, None, tmp_vec.clone()).is_ok()); assert!(run(&db, "SELECT * FROM T", external_auth, None, tmp_vec.clone()).is_ok()); @@ -1557,9 +1530,7 @@ pub(crate) mod tests { let internal_auth = AuthCtx::new(server, server); let tmp_vec = Vec::new(); - let run = |db, sql, auth, subs, mut tmp_vec| async move { - run(db, sql, auth, subs, None, Identity::ZERO, &mut tmp_vec).await - }; + let run = |db, sql, auth, subs, mut tmp_vec| async move { run(db, sql, auth, subs, None, &mut tmp_vec).await }; let check = |db, sql, auth, metrics: ExecutionMetrics| { let result = rt.block_on(run(db, sql, auth, None, tmp_vec.clone()))?; diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 88abc8ca6c2..5ce72d6eca7 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -2163,7 +2163,6 @@ mod tests { auth, Some(&subs), None, - Identity::ZERO, &mut vec![], ) .await?; @@ -2171,30 +2170,12 @@ mod tests { // Client should receive insert assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8, 1_u8]], []).await; - run( - &db, - "UPDATE t SET y=2 WHERE x=0", - auth, - Some(&subs), - None, - Identity::ZERO, - &mut vec![], - ) - .await?; + run(&db, "UPDATE t SET y=2 WHERE x=0", auth, Some(&subs), None, &mut vec![]).await?; // Client should receive update assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8, 2_u8]], [product![0_u8, 1_u8]]).await; - run( - &db, - "DELETE FROM t WHERE x=0", - auth, - Some(&subs), - None, - Identity::ZERO, - &mut vec![], - ) - .await?; + run(&db, "DELETE FROM t WHERE x=0", auth, Some(&subs), None, &mut vec![]).await?; // Client should receive delete assert_tx_update_for_table(rx.recv(), t_id, &schema, [], [product![0_u8, 2_u8]]).await; @@ -3046,7 +3027,6 @@ mod tests { auth, Some(&subs), None, - Identity::ZERO, &mut vec![], ) .await?; diff --git a/crates/expr/src/expr.rs b/crates/expr/src/expr.rs index 92883df0cc9..a4debeab56d 100644 --- a/crates/expr/src/expr.rs +++ b/crates/expr/src/expr.rs @@ -1,10 +1,28 @@ -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use spacetimedb_lib::{query::Delta, AlgebraicType, AlgebraicValue}; -use spacetimedb_primitives::TableId; +use spacetimedb_primitives::{TableId, ViewDatabaseId}; use spacetimedb_schema::schema::TableOrViewSchema; use spacetimedb_sql_parser::ast::{BinOp, LogOp}; +pub trait CollectViews { + fn collect_views(&self, views: &mut HashSet); +} + +impl CollectViews for Arc { + fn collect_views(&self, views: &mut HashSet) { + self.as_ref().collect_views(views); + } +} + +impl CollectViews for Vec { + fn collect_views(&self, views: &mut HashSet) { + for item in self { + item.collect_views(views); + } + } +} + /// A projection is the root of any relational expression. /// This type represents a projection that returns relvars. /// @@ -25,6 +43,14 @@ pub enum ProjectName { Some(RelExpr, Box), } +impl CollectViews for ProjectName { + fn collect_views(&self, views: &mut HashSet) { + match self { + Self::None(expr) | Self::Some(expr, _) => expr.collect_views(views), + } + } +} + impl ProjectName { /// Unwrap the outer projection, returning the inner expression pub fn unwrap(self) -> RelExpr { @@ -146,6 +172,26 @@ pub enum AggType { Count, } +impl CollectViews for ProjectList { + fn collect_views(&self, views: &mut HashSet) { + match self { + Self::Limit(proj, _) => { + proj.collect_views(views); + } + Self::Name(exprs) => { + for expr in exprs { + expr.collect_views(views); + } + } + Self::List(exprs, _) | Self::Agg(exprs, ..) => { + for expr in exprs { + expr.collect_views(views); + } + } + } + } +} + impl ProjectList { /// Does this expression project a single relvar? /// If so, we return it's [`TableOrViewSchema`]. @@ -212,6 +258,18 @@ pub struct Relvar { pub delta: Option, } +impl CollectViews for RelExpr { + fn collect_views(&self, views: &mut HashSet) { + self.visit(&mut |expr| { + if let Self::RelVar(Relvar { schema, .. }) = expr { + if let Some(info) = &schema.view_info { + views.insert(info.view_id); + } + } + }); + } +} + impl RelExpr { /// Walk the expression tree and call `f` on each node pub fn visit(&self, f: &mut impl FnMut(&Self)) { diff --git a/crates/expr/src/statement.rs b/crates/expr/src/statement.rs index 291282c48a2..43f782ca698 100644 --- a/crates/expr/src/statement.rs +++ b/crates/expr/src/statement.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use bytes::Bytes; use spacetimedb_lib::{identity::AuthCtx, st_var::StVarValue, AlgebraicType, AlgebraicValue, ProductValue}; use spacetimedb_primitives::{ColId, TableId}; use spacetimedb_schema::schema::{ColumnSchema, TableOrViewSchema}; @@ -32,13 +31,6 @@ pub enum Statement { DML(DML), } -impl Statement { - pub fn views(&self) -> Vec<(&str, Bytes)> { - //TODO: implement view name extraction - vec![] - } -} - pub enum DML { Insert(TableInsert), Update(TableUpdate), diff --git a/crates/physical-plan/src/plan.rs b/crates/physical-plan/src/plan.rs index 76851923982..946a1b9c130 100644 --- a/crates/physical-plan/src/plan.rs +++ b/crates/physical-plan/src/plan.rs @@ -1,5 +1,6 @@ use std::{ borrow::Cow, + collections::HashSet, ops::{Bound, Deref, DerefMut}, sync::Arc, }; @@ -7,9 +8,12 @@ use std::{ use anyhow::{bail, Result}; use derive_more::From; use either::Either; -use spacetimedb_expr::{expr::AggType, StatementSource}; +use spacetimedb_expr::{ + expr::{AggType, CollectViews}, + StatementSource, +}; use spacetimedb_lib::{identity::AuthCtx, query::Delta, sats::size_of::SizeOf, AlgebraicValue, ProductValue}; -use spacetimedb_primitives::{ColId, ColSet, IndexId, TableId}; +use spacetimedb_primitives::{ColId, ColSet, IndexId, TableId, ViewDatabaseId}; use spacetimedb_schema::schema::{IndexSchema, TableSchema}; use spacetimedb_sql_parser::ast::{BinOp, LogOp}; use spacetimedb_table::table::RowRef; @@ -68,6 +72,14 @@ impl DerefMut for ProjectPlan { } } +impl CollectViews for ProjectPlan { + fn collect_views(&self, views: &mut HashSet) { + match self { + Self::None(plan) | Self::Name(plan, ..) => plan.collect_views(views), + } + } +} + impl ProjectPlan { pub fn optimize(self, auth: &AuthCtx) -> Result { match self { @@ -240,6 +252,29 @@ pub enum PhysicalPlan { Filter(Box, PhysicalExpr), } +impl CollectViews for PhysicalPlan { + fn collect_views(&self, views: &mut HashSet) { + self.visit(&mut |plan| match plan { + Self::TableScan(scan, _) => { + if let Some(info) = &scan.schema.view_info { + views.insert(info.view_id); + } + } + Self::IxScan(scan, _) => { + if let Some(info) = &scan.schema.view_info { + views.insert(info.view_id); + } + } + Self::IxJoin(join, _) => { + if let Some(info) = &join.rhs.view_info { + views.insert(info.view_id); + } + } + _ => {} + }); + } +} + impl PhysicalPlan { /// Walks the plan tree and calls `f` on every op pub fn visit(&self, f: &mut impl FnMut(&Self)) { diff --git a/crates/schema/src/schema.rs b/crates/schema/src/schema.rs index 141c99cfa66..d958476ab64 100644 --- a/crates/schema/src/schema.rs +++ b/crates/schema/src/schema.rs @@ -273,6 +273,7 @@ impl TableSchema { /// Will only be non-zero in the case of views. pub fn num_private_cols(&self) -> usize { self.view_info + .as_ref() .map(|view_info| view_info.num_private_cols()) .unwrap_or_default() } diff --git a/crates/subscription/src/lib.rs b/crates/subscription/src/lib.rs index f6fc92edce2..c7787b2ad80 100644 --- a/crates/subscription/src/lib.rs +++ b/crates/subscription/src/lib.rs @@ -6,10 +6,10 @@ use spacetimedb_execution::{ }, Datastore, DeltaStore, Row, }; -use spacetimedb_expr::check::SchemaView; +use spacetimedb_expr::{check::SchemaView, expr::CollectViews}; use spacetimedb_lib::{identity::AuthCtx, metrics::ExecutionMetrics, query::Delta, AlgebraicValue}; use spacetimedb_physical_plan::plan::{IxJoin, IxScan, Label, PhysicalPlan, ProjectPlan, Sarg, TableScan, TupleField}; -use spacetimedb_primitives::{ColId, ColList, IndexId, TableId}; +use spacetimedb_primitives::{ColId, ColList, IndexId, TableId, ViewDatabaseId}; use spacetimedb_query::compile_subscription; use std::sync::Arc; use std::{collections::HashSet, ops::RangeBounds}; @@ -363,6 +363,12 @@ pub struct SubscriptionPlan { plan_opt: ProjectPlan, } +impl CollectViews for SubscriptionPlan { + fn collect_views(&self, views: &mut HashSet) { + self.plan_opt.collect_views(views); + } +} + impl SubscriptionPlan { /// Is this a plan for a join? pub fn is_join(&self) -> bool {