Skip to content

Commit 0e43a96

Browse files
Update last_called timestamp in st_view_sub for sql queries
1 parent 30b8eac commit 0e43a96

File tree

7 files changed

+126
-10
lines changed

7 files changed

+126
-10
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/src/sql/execute.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -195,16 +195,17 @@ pub fn run(
195195
) -> Result<SqlResult, DBError> {
196196
// We parse the sql statement in a mutable transaction.
197197
// If it turns out to be a query, we downgrade the tx.
198-
let (tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| {
198+
let (mut tx, stmt) = db.with_auto_rollback(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Sql), |tx| {
199199
compile_sql_stmt(sql_text, &SchemaViewer::new(tx, &auth), &auth)
200200
})?;
201201

202202
let mut metrics = ExecutionMetrics::default();
203203

204204
match stmt {
205205
Statement::Select(stmt) => {
206-
// Up to this point, the tx has been read-only,
207-
// and hence there are no deltas to process.
206+
// Materialize views before we downgrade to a read-only transaction
207+
tx.materialize_views(&stmt, auth.caller)?;
208+
208209
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql);
209210

210211
let (tx_offset_send, tx_offset) = oneshot::channel();

crates/datastore/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ spacetimedb-schema.workspace = true
1919
spacetimedb-table.workspace = true
2020
spacetimedb-snapshot.workspace = true
2121
spacetimedb-execution.workspace = true
22+
spacetimedb-expr.workspace = true
2223

2324
anyhow = { workspace = true, features = ["backtrace"] }
2425
bytes.workspace = true

crates/datastore/src/locking_tx_datastore/mut_tx.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use smallvec::SmallVec;
3333
use spacetimedb_data_structures::map::{IntMap, IntSet};
3434
use spacetimedb_durability::TxOffset;
3535
use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row};
36+
use spacetimedb_expr::expr::CollectViews;
3637
use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics, Timestamp};
3738
use spacetimedb_lib::{
3839
db::{auth::StAccess, raw_def::SEQUENCE_ALLOCATION_STEP},
@@ -62,7 +63,7 @@ use spacetimedb_table::{
6263
table_index::TableIndex,
6364
};
6465
use std::{
65-
collections::HashMap,
66+
collections::{HashMap, HashSet},
6667
sync::Arc,
6768
time::{Duration, Instant},
6869
};
@@ -1767,6 +1768,19 @@ impl<'a, I: Iterator<Item = RowRef<'a>>> Iterator for FilterDeleted<'a, I> {
17671768
}
17681769

17691770
impl MutTxId {
1771+
/// Materialize views for `sender`, collected from `view_collector`.
1772+
pub fn materialize_views(&mut self, view_collector: &impl CollectViews, sender: Identity) -> Result<()> {
1773+
let mut view_ids = HashSet::new();
1774+
view_collector.collect_views(&mut view_ids);
1775+
for view_id in view_ids {
1776+
if !self.is_view_materialized(view_id, ArgId::SENTINEL, sender)? {
1777+
// TODO: __call_view__
1778+
}
1779+
self.st_view_sub_update_or_insert_last_called(view_id, ArgId::SENTINEL, sender)?;
1780+
}
1781+
Ok(())
1782+
}
1783+
17701784
/// Does this caller have an entry for `view_id` in `st_view_sub`?
17711785
pub fn is_view_materialized(&self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<bool> {
17721786
use StViewSubFields::*;

crates/expr/src/expr.rs

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,28 @@
1-
use std::sync::Arc;
1+
use std::{collections::HashSet, sync::Arc};
22

33
use spacetimedb_lib::{query::Delta, AlgebraicType, AlgebraicValue};
4-
use spacetimedb_primitives::TableId;
4+
use spacetimedb_primitives::{TableId, ViewId};
55
use spacetimedb_schema::schema::TableOrViewSchema;
66
use spacetimedb_sql_parser::ast::{BinOp, LogOp};
77

8+
pub trait CollectViews {
9+
fn collect_views(&self, views: &mut HashSet<ViewId>);
10+
}
11+
12+
impl<T: CollectViews> CollectViews for Arc<T> {
13+
fn collect_views(&self, views: &mut HashSet<ViewId>) {
14+
self.as_ref().collect_views(views);
15+
}
16+
}
17+
18+
impl<T: CollectViews> CollectViews for Vec<T> {
19+
fn collect_views(&self, views: &mut HashSet<ViewId>) {
20+
for item in self {
21+
item.collect_views(views);
22+
}
23+
}
24+
}
25+
826
/// A projection is the root of any relational expression.
927
/// This type represents a projection that returns relvars.
1028
///
@@ -25,6 +43,14 @@ pub enum ProjectName {
2543
Some(RelExpr, Box<str>),
2644
}
2745

46+
impl CollectViews for ProjectName {
47+
fn collect_views(&self, views: &mut HashSet<ViewId>) {
48+
match self {
49+
Self::None(expr) | Self::Some(expr, _) => expr.collect_views(views),
50+
}
51+
}
52+
}
53+
2854
impl ProjectName {
2955
/// Unwrap the outer projection, returning the inner expression
3056
pub fn unwrap(self) -> RelExpr {
@@ -146,6 +172,26 @@ pub enum AggType {
146172
Count,
147173
}
148174

175+
impl CollectViews for ProjectList {
176+
fn collect_views(&self, views: &mut HashSet<ViewId>) {
177+
match self {
178+
Self::Limit(proj, _) => {
179+
proj.collect_views(views);
180+
}
181+
Self::Name(exprs) => {
182+
for expr in exprs {
183+
expr.collect_views(views);
184+
}
185+
}
186+
Self::List(exprs, _) | Self::Agg(exprs, ..) => {
187+
for expr in exprs {
188+
expr.collect_views(views);
189+
}
190+
}
191+
}
192+
}
193+
}
194+
149195
impl ProjectList {
150196
/// Does this expression project a single relvar?
151197
/// If so, we return it's [`TableOrViewSchema`].
@@ -212,6 +258,18 @@ pub struct Relvar {
212258
pub delta: Option<Delta>,
213259
}
214260

261+
impl CollectViews for RelExpr {
262+
fn collect_views(&self, views: &mut HashSet<ViewId>) {
263+
self.visit(&mut |expr| {
264+
if let Self::RelVar(Relvar { schema, .. }) = expr {
265+
if let Some(info) = schema.view_info {
266+
views.insert(info.view_id);
267+
}
268+
}
269+
});
270+
}
271+
}
272+
215273
impl RelExpr {
216274
/// Walk the expression tree and call `f` on each node
217275
pub fn visit(&self, f: &mut impl FnMut(&Self)) {

crates/physical-plan/src/plan.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
use std::{
22
borrow::Cow,
3+
collections::HashSet,
34
ops::{Bound, Deref, DerefMut},
45
sync::Arc,
56
};
67

78
use anyhow::{bail, Result};
89
use derive_more::From;
910
use either::Either;
10-
use spacetimedb_expr::{expr::AggType, StatementSource};
11+
use spacetimedb_expr::{
12+
expr::{AggType, CollectViews},
13+
StatementSource,
14+
};
1115
use spacetimedb_lib::{identity::AuthCtx, query::Delta, sats::size_of::SizeOf, AlgebraicValue, ProductValue};
12-
use spacetimedb_primitives::{ColId, ColSet, IndexId, TableId};
16+
use spacetimedb_primitives::{ColId, ColSet, IndexId, TableId, ViewId};
1317
use spacetimedb_schema::schema::{IndexSchema, TableSchema};
1418
use spacetimedb_sql_parser::ast::{BinOp, LogOp};
1519
use spacetimedb_table::table::RowRef;
@@ -68,6 +72,14 @@ impl DerefMut for ProjectPlan {
6872
}
6973
}
7074

75+
impl CollectViews for ProjectPlan {
76+
fn collect_views(&self, views: &mut HashSet<ViewId>) {
77+
match self {
78+
Self::None(plan) | Self::Name(plan, ..) => plan.collect_views(views),
79+
}
80+
}
81+
}
82+
7183
impl ProjectPlan {
7284
pub fn optimize(self, auth: &AuthCtx) -> Result<Self> {
7385
match self {
@@ -240,6 +252,29 @@ pub enum PhysicalPlan {
240252
Filter(Box<PhysicalPlan>, PhysicalExpr),
241253
}
242254

255+
impl CollectViews for PhysicalPlan {
256+
fn collect_views(&self, views: &mut HashSet<ViewId>) {
257+
self.visit(&mut |plan| match plan {
258+
Self::TableScan(scan, _) => {
259+
if let Some(info) = scan.schema.view_info {
260+
views.insert(info.view_id);
261+
}
262+
}
263+
Self::IxScan(scan, _) => {
264+
if let Some(info) = scan.schema.view_info {
265+
views.insert(info.view_id);
266+
}
267+
}
268+
Self::IxJoin(join, _) => {
269+
if let Some(info) = join.rhs.view_info {
270+
views.insert(info.view_id);
271+
}
272+
}
273+
_ => {}
274+
});
275+
}
276+
}
277+
243278
impl PhysicalPlan {
244279
/// Walks the plan tree and calls `f` on every op
245280
pub fn visit(&self, f: &mut impl FnMut(&Self)) {

crates/subscription/src/lib.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ use spacetimedb_execution::{
66
},
77
Datastore, DeltaStore, Row,
88
};
9-
use spacetimedb_expr::check::SchemaView;
9+
use spacetimedb_expr::{check::SchemaView, expr::CollectViews};
1010
use spacetimedb_lib::{identity::AuthCtx, metrics::ExecutionMetrics, query::Delta, AlgebraicValue};
1111
use spacetimedb_physical_plan::plan::{IxJoin, IxScan, Label, PhysicalPlan, ProjectPlan, Sarg, TableScan, TupleField};
12-
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId};
12+
use spacetimedb_primitives::{ColId, ColList, IndexId, TableId, ViewId};
1313
use spacetimedb_query::compile_subscription;
1414
use std::sync::Arc;
1515
use std::{collections::HashSet, ops::RangeBounds};
@@ -363,6 +363,12 @@ pub struct SubscriptionPlan {
363363
plan_opt: ProjectPlan,
364364
}
365365

366+
impl CollectViews for SubscriptionPlan {
367+
fn collect_views(&self, views: &mut HashSet<ViewId>) {
368+
self.plan_opt.collect_views(views);
369+
}
370+
}
371+
366372
impl SubscriptionPlan {
367373
/// Is this a plan for a join?
368374
pub fn is_join(&self) -> bool {

0 commit comments

Comments
 (0)