Skip to content

Commit 97436d2

Browse files
Materialize views for subscriptions
1 parent 0e43a96 commit 97436d2

File tree

4 files changed

+121
-36
lines changed

4 files changed

+121
-36
lines changed

crates/core/src/subscription/module_subscription_actor.rs

Lines changed: 92 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ use spacetimedb_client_api_messages::websocket::{
3131
use spacetimedb_datastore::db_metrics::DB_METRICS;
3232
use spacetimedb_datastore::execution_context::{Workload, WorkloadType};
3333
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
34-
use spacetimedb_datastore::locking_tx_datastore::TxId;
35-
use spacetimedb_datastore::traits::TxData;
34+
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
35+
use spacetimedb_datastore::traits::{IsolationLevel, TxData};
3636
use spacetimedb_durability::TxOffset;
3737
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
38+
use spacetimedb_expr::expr::CollectViews;
3839
use spacetimedb_lib::identity::AuthCtx;
3940
use spacetimedb_lib::metrics::ExecutionMetrics;
4041
use spacetimedb_lib::Identity;
@@ -368,7 +369,7 @@ impl ModuleSubscriptions {
368369
let hash = QueryHash::from_string(&sql, auth.caller, false);
369370
let hash_with_param = QueryHash::from_string(&sql, auth.caller, true);
370371

371-
let (tx, tx_offset) = self.begin_tx(Workload::Subscribe);
372+
let (mut_tx, _) = self.begin_mut_tx(Workload::Subscribe);
372373

373374
let existing_query = {
374375
let guard = self.subscriptions.read();
@@ -378,7 +379,7 @@ impl ModuleSubscriptions {
378379
let query = return_on_err_with_sql!(
379380
existing_query.map(Ok).unwrap_or_else(|| compile_query_with_hashes(
380381
&auth,
381-
&tx,
382+
&*mut_tx,
382383
&sql,
383384
hash,
384385
hash_with_param
@@ -388,6 +389,9 @@ impl ModuleSubscriptions {
388389
send_err_msg
389390
);
390391

392+
let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
393+
let (tx, tx_offset) = self.materialize_views_and_downgrade_tx(mut_tx, &query, auth.caller)?;
394+
391395
let (table_rows, metrics) = return_on_err_with_sql!(
392396
self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Subscribe),
393397
query.sql(),
@@ -605,7 +609,7 @@ impl ModuleSubscriptions {
605609
queries: &[Box<str>],
606610
num_queries: usize,
607611
metrics: &SubscriptionMetrics,
608-
) -> Result<(Vec<Arc<Plan>>, AuthCtx, TxId, HistogramTimer), DBError> {
612+
) -> Result<(Vec<Arc<Plan>>, AuthCtx, MutTxId, HistogramTimer), DBError> {
609613
let mut subscribe_to_all_tables = false;
610614
let mut plans = Vec::with_capacity(num_queries);
611615
let mut query_hashes = Vec::with_capacity(num_queries);
@@ -624,7 +628,7 @@ impl ModuleSubscriptions {
624628
let auth = AuthCtx::new(self.owner_identity, sender);
625629

626630
// We always get the db lock before the subscription lock to avoid deadlocks.
627-
let (tx, _tx_offset) = self.begin_tx(Workload::Subscribe);
631+
let (mut_tx, _tx_offset) = self.begin_mut_tx(Workload::Subscribe);
628632

629633
let compile_timer = metrics.compilation_time.start_timer();
630634

@@ -637,9 +641,14 @@ impl ModuleSubscriptions {
637641

638642
if subscribe_to_all_tables {
639643
plans.extend(
640-
super::subscription::get_all(&self.relational_db, &tx, &auth)?
641-
.into_iter()
642-
.map(Arc::new),
644+
super::subscription::get_all(
645+
|relational_db, tx| relational_db.get_all_tables_mut(tx).map(|schemas| schemas.into_iter()),
646+
&self.relational_db,
647+
&*mut_tx,
648+
&auth,
649+
)?
650+
.into_iter()
651+
.map(Arc::new),
643652
);
644653
}
645654

@@ -652,7 +661,7 @@ impl ModuleSubscriptions {
652661
plans.push(unit);
653662
} else {
654663
plans.push(Arc::new(
655-
compile_query_with_hashes(&auth, &tx, sql, hash, hash_with_param).map_err(|err| {
664+
compile_query_with_hashes(&auth, &*mut_tx, sql, hash, hash_with_param).map_err(|err| {
656665
DBError::WithSql {
657666
error: Box::new(DBError::Other(err.into())),
658667
sql: sql.into(),
@@ -666,7 +675,7 @@ impl ModuleSubscriptions {
666675
// How many queries in this subscription are not cached?
667676
metrics.num_new_queries_subscribed.inc_by(new_queries);
668677

669-
Ok((plans, auth, scopeguard::ScopeGuard::into_inner(tx), compile_timer))
678+
Ok((plans, auth, ScopeGuard::<MutTxId, _>::into_inner(mut_tx), compile_timer))
670679
}
671680

672681
/// Send a message to a client connection.
@@ -731,7 +740,7 @@ impl ModuleSubscriptions {
731740
// How many queries make up this subscription?
732741
subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);
733742

734-
let (queries, auth, tx, compile_timer) = return_on_err!(
743+
let (queries, auth, mut_tx, compile_timer) = return_on_err!(
735744
self.compile_queries(
736745
sender.id.identity,
737746
&request.query_strings,
@@ -741,7 +750,7 @@ impl ModuleSubscriptions {
741750
send_err_msg,
742751
None
743752
);
744-
let (tx, tx_offset) = self.guard_tx(tx, <_>::default());
753+
let (mut_tx, _) = self.guard_mut_tx(mut_tx, <_>::default());
745754

746755
// We minimize locking so that other clients can add subscriptions concurrently.
747756
// We are protected from race conditions with broadcasts, because we have the db lock,
@@ -761,6 +770,9 @@ impl ModuleSubscriptions {
761770
// Record how long it took to compile the subscription
762771
drop(compile_timer);
763772

773+
let mut_tx = ScopeGuard::<MutTxId, _>::into_inner(mut_tx);
774+
let (tx, tx_offset) = self.materialize_views_and_downgrade_tx(mut_tx, &queries, auth.caller)?;
775+
764776
let Ok((update, metrics)) =
765777
self.evaluate_queries(sender.clone(), &queries, &tx, &auth, TableUpdateType::Subscribe)
766778
else {
@@ -827,13 +839,14 @@ impl ModuleSubscriptions {
827839
// How many queries make up this subscription?
828840
subscription_metrics.num_queries_subscribed.inc_by(num_queries as _);
829841

830-
let (queries, auth, tx, compile_timer) = self.compile_queries(
842+
let (queries, auth, mut_tx, compile_timer) = self.compile_queries(
831843
sender.id.identity,
832844
&subscription.query_strings,
833845
num_queries,
834846
&subscription_metrics,
835847
)?;
836-
let (tx, tx_offset) = self.guard_tx(tx, <_>::default());
848+
849+
let (tx, tx_offset) = self.materialize_views_and_downgrade_tx(mut_tx, &queries, auth.caller)?;
837850

838851
check_row_limit(
839852
&queries,
@@ -983,13 +996,35 @@ impl ModuleSubscriptions {
983996
}))
984997
}
985998

999+
/// Materialize views for `sender`, collected from `view_collector`.
1000+
fn materialize_views_and_downgrade_tx(
1001+
&self,
1002+
mut tx: MutTxId,
1003+
view_collector: &impl CollectViews,
1004+
sender: Identity,
1005+
) -> Result<(TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset), DBError> {
1006+
tx.materialize_views(view_collector, sender)?;
1007+
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Subscribe);
1008+
let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut);
1009+
Ok(self.guard_tx(tx, opts))
1010+
}
1011+
9861012
/// Helper that starts a new read transaction, and guards it using
9871013
/// [`Self::guard_tx`] with the default configuration.
988-
fn begin_tx(&self, workload: Workload) -> (ScopeGuard<TxId, impl FnOnce(TxId) + '_>, TransactionOffset) {
1014+
fn begin_tx(&self, workload: Workload) -> (TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset) {
9891015
self.guard_tx(self.relational_db.begin_tx(workload), <_>::default())
9901016
}
9911017

992-
/// Helper wrapping `tx` in a scopegard, with a configurable drop fn.
1018+
/// Helper that starts a new mutable transaction, and guards it using
1019+
/// [`Self::guard_mut_tx`] with the default configuration.
1020+
fn begin_mut_tx(&self, workload: Workload) -> (MutTxGuard<impl FnOnce(MutTxId) + '_>, TransactionOffset) {
1021+
self.guard_mut_tx(
1022+
self.relational_db.begin_mut_tx(IsolationLevel::Serializable, workload),
1023+
<_>::default(),
1024+
)
1025+
}
1026+
1027+
/// Helper wrapping a [`TxId`] in a scopegard, with a configurable drop fn.
9931028
///
9941029
/// By default, `tx` is released when the returned [`ScopeGuard`] is dropped,
9951030
/// and reports the transaction metrics via [`RelationalDB::report_tx_metrics`].
@@ -1004,27 +1039,42 @@ impl ModuleSubscriptions {
10041039
/// If another receiver of the transaction offset is needed, its sending
10051040
/// side can be passed in as `extra_tx_offset_sender`. It will be sent the
10061041
/// offset as well.
1007-
fn guard_tx(
1008-
&self,
1009-
tx: TxId,
1010-
GuardTxOptions {
1011-
extra_tx_offset_sender,
1012-
tx_data,
1013-
tx_metrics_mut,
1014-
}: GuardTxOptions,
1015-
) -> (ScopeGuard<TxId, impl FnOnce(TxId) + '_>, TransactionOffset) {
1042+
fn guard_tx(&self, tx: TxId, opts: GuardTxOptions) -> (TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset) {
10161043
let (offset_tx, offset_rx) = oneshot::channel();
10171044
let guard = scopeguard::guard(tx, |tx| {
10181045
let (tx_offset, tx_metrics, reducer) = self.relational_db.release_tx(tx);
10191046
log::trace!("read tx released with offset {tx_offset}");
10201047
let _ = offset_tx.send(tx_offset);
1021-
if let Some(extra) = extra_tx_offset_sender {
1048+
if let Some(extra) = opts.extra_tx_offset_sender {
10221049
let _ = extra.send(tx_offset);
10231050
}
10241051
self.relational_db
1025-
.report_tx_metrics(reducer, tx_data, tx_metrics_mut, Some(tx_metrics));
1052+
.report_tx_metrics(reducer, opts.tx_data, opts.tx_metrics_mut, Some(tx_metrics));
10261053
});
1054+
(guard, offset_rx)
1055+
}
10271056

1057+
/// The same as [`Self::guard_tx`] but for mutable transactions.
1058+
///
1059+
/// By default, `tx` is committed when the returned [`ScopeGuard`] is dropped,
1060+
/// and reports the transaction metrics via [`RelationalDB::report_tx_metrics`].
1061+
fn guard_mut_tx(
1062+
&self,
1063+
tx: MutTxId,
1064+
opts: GuardTxOptions,
1065+
) -> (MutTxGuard<impl FnOnce(MutTxId) + '_>, TransactionOffset) {
1066+
let (offset_tx, offset_rx) = oneshot::channel();
1067+
let guard = scopeguard::guard(tx, |tx| {
1068+
if let Ok(Some((tx_offset, tx_data, tx_metrics_mut, reducer))) = self.relational_db.commit_tx(tx) {
1069+
log::trace!("mutable tx committed with offset {tx_offset}");
1070+
let _ = offset_tx.send(tx_offset);
1071+
if let Some(extra) = opts.extra_tx_offset_sender {
1072+
let _ = extra.send(tx_offset);
1073+
}
1074+
self.relational_db
1075+
.report_tx_metrics(reducer, Some(Arc::new(tx_data)), Some(tx_metrics_mut), None);
1076+
}
1077+
});
10281078
(guard, offset_rx)
10291079
}
10301080
}
@@ -1052,10 +1102,24 @@ impl GuardTxOptions {
10521102
tx_metrics_mut: tx_metrics_mut.into(),
10531103
}
10541104
}
1105+
1106+
fn from_mut(tx_data: TxData, tx_metrics_mut: TxMetrics) -> Self {
1107+
Self {
1108+
extra_tx_offset_sender: None,
1109+
tx_data: Some(Arc::new(tx_data)),
1110+
tx_metrics_mut: tx_metrics_mut.into(),
1111+
}
1112+
}
10551113
}
10561114

10571115
pub struct WriteConflict;
10581116

1117+
/// A [`ScopeGuard`] for [`TxId`]
1118+
type TxGuard<F> = ScopeGuard<TxId, F>;
1119+
1120+
/// A [`ScopeGuard`] for [`MutTxId`]
1121+
type MutTxGuard<F> = ScopeGuard<MutTxId, F>;
1122+
10591123
#[cfg(test)]
10601124
mod tests {
10611125
use super::{AssertTxFn, ModuleSubscriptions};

crates/core/src/subscription/module_subscription_manager.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ use spacetimedb_client_api_messages::websocket::{
2222
use spacetimedb_data_structures::map::{Entry, IntMap};
2323
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
2424
use spacetimedb_durability::TxOffset;
25+
use spacetimedb_expr::expr::CollectViews;
2526
use spacetimedb_lib::metrics::ExecutionMetrics;
2627
use spacetimedb_lib::{AlgebraicValue, ConnectionId, Identity, ProductValue};
27-
use spacetimedb_primitives::{ColId, IndexId, TableId};
28+
use spacetimedb_primitives::{ColId, IndexId, TableId, ViewId};
2829
use spacetimedb_subscription::{JoinEdge, SubscriptionPlan, TableName};
2930
use std::collections::BTreeMap;
3031
use std::fmt::Debug;
@@ -53,6 +54,14 @@ pub struct Plan {
5354
plans: Vec<SubscriptionPlan>,
5455
}
5556

57+
impl CollectViews for Plan {
58+
fn collect_views(&self, views: &mut std::collections::HashSet<ViewId>) {
59+
for plan in &self.plans {
60+
plan.collect_views(views);
61+
}
62+
}
63+
}
64+
5665
impl Plan {
5766
/// Create a new subscription plan to be cached
5867
pub fn new(plans: Vec<SubscriptionPlan>, hash: QueryHash, text: String) -> Self {

crates/core/src/subscription/query.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use crate::sql::compiler::compile_sql;
55
use crate::subscription::subscription::SupportedQuery;
66
use once_cell::sync::Lazy;
77
use regex::Regex;
8+
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
9+
use spacetimedb_execution::Datastore;
810
use spacetimedb_lib::identity::AuthCtx;
911
use spacetimedb_subscription::SubscriptionPlan;
1012
use spacetimedb_vm::expr::{self, Crud, CrudExpr, QueryExpr};
@@ -93,7 +95,7 @@ pub fn compile_read_only_query(auth: &AuthCtx, tx: &Tx, input: &str) -> Result<P
9395

9496
/// Compile a string into a single read-only query.
9597
/// This returns an error if the string has multiple queries or mutations.
96-
pub fn compile_query_with_hashes(
98+
pub fn compile_query_with_hashes<Tx: Datastore + StateView>(
9799
auth: &AuthCtx,
98100
tx: &Tx,
99101
input: &str,

crates/core/src/subscription/subscription.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use anyhow::Context;
3434
use itertools::Either;
3535
use spacetimedb_client_api_messages::websocket::Compression;
3636
use spacetimedb_data_structures::map::HashSet;
37+
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
3738
use spacetimedb_datastore::locking_tx_datastore::TxId;
3839
use spacetimedb_lib::db::auth::{StAccess, StTableType};
3940
use spacetimedb_lib::identity::AuthCtx;
@@ -42,13 +43,13 @@ use spacetimedb_primitives::TableId;
4243
use spacetimedb_sats::ProductValue;
4344
use spacetimedb_schema::def::error::AuthError;
4445
use spacetimedb_schema::relation::DbTable;
46+
use spacetimedb_schema::schema::TableSchema;
4547
use spacetimedb_subscription::SubscriptionPlan;
4648
use spacetimedb_vm::expr::{self, AuthAccess, IndexJoin, Query, QueryExpr, SourceExpr, SourceProvider, SourceSet};
4749
use spacetimedb_vm::rel_ops::RelOps;
4850
use spacetimedb_vm::relation::{MemTable, RelValue};
4951
use std::hash::Hash;
5052
use std::iter;
51-
use std::ops::Deref;
5253
use std::sync::Arc;
5354
use std::time::Duration;
5455

@@ -611,11 +612,18 @@ impl AuthAccess for ExecutionSet {
611612
/// Queries all the [`StTableType::User`] tables *right now*
612613
/// and turns them into [`QueryExpr`],
613614
/// the moral equivalent of `SELECT * FROM table`.
614-
pub(crate) fn get_all(relational_db: &RelationalDB, tx: &Tx, auth: &AuthCtx) -> Result<Vec<Plan>, DBError> {
615-
Ok(relational_db
616-
.get_all_tables(tx)?
617-
.iter()
618-
.map(Deref::deref)
615+
pub(crate) fn get_all<T, F, I>(
616+
get_all_tables: F,
617+
relational_db: &RelationalDB,
618+
tx: &T,
619+
auth: &AuthCtx,
620+
) -> Result<Vec<Plan>, DBError>
621+
where
622+
T: StateView,
623+
F: Fn(&RelationalDB, &T) -> Result<I, DBError>,
624+
I: Iterator<Item = Arc<TableSchema>>,
625+
{
626+
Ok(get_all_tables(relational_db, tx)?
619627
.filter(|t| t.table_type == StTableType::User && (auth.is_owner() || t.table_access == StAccess::Public))
620628
.map(|schema| {
621629
let sql = format!("SELECT * FROM {}", schema.table_name);
@@ -648,6 +656,8 @@ pub(crate) fn legacy_get_all(
648656
tx: &Tx,
649657
auth: &AuthCtx,
650658
) -> Result<Vec<SupportedQuery>, DBError> {
659+
use std::ops::Deref;
660+
651661
Ok(relational_db
652662
.get_all_tables(tx)?
653663
.iter()

0 commit comments

Comments
 (0)