Skip to content

Commit 2c86ee8

Browse files
Materialize views for subscriptions
1 parent f858d77 commit 2c86ee8

File tree

10 files changed

+410
-158
lines changed

10 files changed

+410
-158
lines changed

crates/core/src/client/client_connection.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -865,10 +865,11 @@ impl ClientConnection {
865865
) -> Result<Option<ExecutionMetrics>, DBError> {
866866
let me = self.clone();
867867
self.module()
868-
.on_module_thread("subscribe_single", move || {
869-
me.module()
870-
.subscriptions()
871-
.add_single_subscription(me.sender, subscription, timer, None)
868+
.on_module_thread_async("subscribe_single", async move || {
869+
let host = me.module();
870+
host.subscriptions()
871+
.add_single_subscription(Some(&host), me.sender, subscription, timer, None)
872+
.await
872873
})
873874
.await?
874875
}
@@ -890,10 +891,11 @@ impl ClientConnection {
890891
) -> Result<Option<ExecutionMetrics>, DBError> {
891892
let me = self.clone();
892893
self.module()
893-
.on_module_thread("subscribe_multi", move || {
894-
me.module()
895-
.subscriptions()
896-
.add_multi_subscription(me.sender, request, timer, None)
894+
.on_module_thread_async("subscribe_multi", async move || {
895+
let host = me.module();
896+
host.subscriptions()
897+
.add_multi_subscription(Some(&host), me.sender, request, timer, None)
898+
.await
897899
})
898900
.await?
899901
}
@@ -915,12 +917,14 @@ impl ClientConnection {
915917

916918
pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<ExecutionMetrics, DBError> {
917919
let me = self.clone();
918-
asyncify(move || {
919-
me.module()
920-
.subscriptions()
921-
.add_legacy_subscriber(me.sender, subscription, timer, None)
922-
})
923-
.await
920+
self.module()
921+
.on_module_thread_async("subscribe", async move || {
922+
let host = me.module();
923+
host.subscriptions()
924+
.add_legacy_subscriber(Some(&host), me.sender, subscription, timer, None)
925+
.await
926+
})
927+
.await?
924928
}
925929

926930
pub async fn one_off_query_json(

crates/core/src/host/module_host.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
3939
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
4040
use spacetimedb_datastore::error::DatastoreError;
4141
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
42-
use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
43-
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
42+
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
4443
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID};
4544
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
4645
use spacetimedb_durability::DurableOffset;
@@ -1120,7 +1119,7 @@ impl ModuleHost {
11201119
// Decrement the number of subscribers for each view this caller is subscribed to
11211120
let dec_view_subscribers = |tx: &mut MutTxId| {
11221121
if drop_view_subscribers {
1123-
if let Err(err) = tx.dec_st_view_subscribers(caller_identity) {
1122+
if let Err(err) = tx.unsubscribe_views(caller_identity) {
11241123
log::error!("`call_identity_disconnected`: failed to delete client view data: {err}");
11251124
}
11261125
}
@@ -1489,16 +1488,19 @@ impl ModuleHost {
14891488
.await?
14901489
}
14911490

1492-
/// Downgrade this mutable `tx` after:
1493-
/// 1. Collecting view ids from `view_collector` and
1494-
/// 2. Materializing them if necessary
1495-
pub async fn materialize_views_and_downgrade_tx(
1491+
/// Materializes the views return by the `view_collector`, if not already materialized,
1492+
/// and updates `st_view_sub` accordingly.
1493+
///
1494+
/// Passing [`Workload::Sql`] will update `st_view_sub.last_called`.
1495+
/// Passing [`Workload::Subscribe`] will also increment `st_view_sub.num_subscribers`,
1496+
/// in addition to updating `st_view_sub.last_called`.
1497+
pub async fn materialize_views(
14961498
&self,
14971499
mut tx: MutTxId,
14981500
view_collector: &impl CollectViews,
14991501
sender: Identity,
15001502
workload: Workload,
1501-
) -> Result<(TxData, TxMetrics, TxId), ViewCallError> {
1503+
) -> Result<MutTxId, ViewCallError> {
15021504
use FunctionArgs::*;
15031505
let mut view_ids = HashSet::new();
15041506
view_collector.collect_views(&mut view_ids);
@@ -1507,9 +1509,16 @@ impl ModuleHost {
15071509
if !tx.is_view_materialized(view_id, ArgId::SENTINEL, sender)? {
15081510
tx = self.call_view(tx, &name, Nullary, sender, None).await?.tx;
15091511
}
1510-
tx.st_view_sub_update_or_insert_last_called(view_id, ArgId::SENTINEL, sender)?;
1512+
// If this is a sql call, we only update this view's "last called" timestamp
1513+
if let Workload::Sql = workload {
1514+
tx.update_view_timestamp(view_id, ArgId::SENTINEL, sender)?;
1515+
}
1516+
// If this is a subscribe call, we also increment this view's subscriber count
1517+
if let Workload::Subscribe = workload {
1518+
tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?;
1519+
}
15111520
}
1512-
Ok(tx.commit_downgrade(workload))
1521+
Ok(tx)
15131522
}
15141523

15151524
pub async fn call_view(

crates/core/src/sql/execute.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -194,24 +194,23 @@ pub async fn run(
194194
module: Option<&ModuleHost>,
195195
head: &mut Vec<(Box<str>, AlgebraicType)>,
196196
) -> Result<SqlResult, DBError> {
197-
let module = module
198-
.as_ref()
199-
.ok_or_else(|| anyhow!("Cannot execute views without module context"))?;
200-
201-
let mut metrics = ExecutionMetrics::default();
202-
203197
// We parse the sql statement in a mutable transaction.
204198
// If it turns out to be a query, we downgrade the tx.
205199
let (tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| {
206200
compile_sql_stmt(sql_text, &SchemaViewer::new(tx, &auth), &auth)
207201
})?;
208202

203+
let mut metrics = ExecutionMetrics::default();
204+
209205
match stmt {
210206
Statement::Select(stmt) => {
211207
// Materialize views and downgrade to a read-only transaction
212-
let (tx_data, tx_metrics_mut, tx) = module
213-
.materialize_views_and_downgrade_tx(tx, &stmt, auth.caller, Workload::Sql)
214-
.await?;
208+
let tx = match module {
209+
Some(module) => module.materialize_views(tx, &stmt, auth.caller, Workload::Sql).await?,
210+
None => tx,
211+
};
212+
213+
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql);
215214

216215
let (tx_offset_send, tx_offset) = oneshot::channel();
217216
// Release the tx on drop, so that we record metrics

0 commit comments

Comments
 (0)