Skip to content

Commit 30b8eac

Browse files
Decrement view subscriber count on disconnect (#3547)
# Description of Changes Refactored `st_view_client` and renamed it `st_view_sub` which tracks the number of clients subscribed to a view. On disconnect, we decrement the `num_subscribers` column in the appropriate rows. An async task will be in charge of cleaning up views (and their read sets) whose subscriber count has gone to zero (not in this patch). On module init, we clear the entirety of each view table. # API and ABI breaking changes None. Technically this updates the schema of a system table, but the system table was added and modified between releases. # Expected complexity level and risk ~2 Need to make sure we cover all cases so that we don't leave dangling data. Making these tables ephemeral in the future should simplify this. # Testing Will add tests once we can subscribe to views
1 parent 7c4c3dd commit 30b8eac

File tree

15 files changed

+261
-67
lines changed

15 files changed

+261
-67
lines changed

crates/client-api/src/routes/database.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ pub async fn call<S: ControlStateDelegate + NodeDelegate>(
101101
};
102102

103103
module
104-
.call_identity_disconnected(caller_identity, connection_id)
104+
// We don't clear views after reducer calls
105+
.call_identity_disconnected(caller_identity, connection_id, false)
105106
.await
106107
.map_err(client_disconnected_error_to_response)?;
107108

@@ -274,7 +275,8 @@ async fn procedure<S: ControlStateDelegate + NodeDelegate>(
274275
};
275276

276277
module
277-
.call_identity_disconnected(caller_identity, connection_id)
278+
// We don't clear views after procedure calls
279+
.call_identity_disconnected(caller_identity, connection_id, false)
278280
.await
279281
.map_err(client_disconnected_error_to_response)?;
280282

crates/core/src/db/relational_db.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,6 +1403,11 @@ impl RelationalDB {
14031403
Ok(rows_deleted)
14041404
}
14051405

1406+
/// Clear all rows from all view tables without dropping them.
1407+
pub fn clear_all_views(&self, tx: &mut MutTx) -> Result<(), DBError> {
1408+
Ok(tx.clear_all_views()?)
1409+
}
1410+
14061411
pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result<SequenceId, DBError> {
14071412
Ok(self.inner.create_sequence_mut_tx(tx, sequence_schema)?)
14081413
}

crates/core/src/host/host_controller.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -847,9 +847,10 @@ impl Host {
847847
} = launched;
848848

849849
// Disconnect dangling clients.
850+
// No need to clear view tables here since we do it in `clear_all_clients`.
850851
for (identity, connection_id) in connected_clients {
851852
module_host
852-
.call_identity_disconnected(identity, connection_id)
853+
.call_identity_disconnected(identity, connection_id, false)
853854
.await
854855
.with_context(|| {
855856
format!(

crates/core/src/host/module_host.rs

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream;
3838
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
3939
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
4040
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
41-
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID};
41+
use spacetimedb_datastore::system_tables::{ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID};
4242
use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData};
4343
use spacetimedb_durability::DurableOffset;
4444
use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject};
@@ -903,7 +903,7 @@ impl ModuleHost {
903903
// Call the `client_disconnected` reducer, if it exists.
904904
// This is a no-op if the module doesn't define such a reducer.
905905
this.subscriptions().remove_subscriber(client_id);
906-
this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst)
906+
this.call_identity_disconnected_inner(client_id.identity, client_id.connection_id, inst, true)
907907
})
908908
.await
909909
{
@@ -1024,6 +1024,7 @@ impl ModuleHost {
10241024
caller_identity: Identity,
10251025
caller_connection_id: ConnectionId,
10261026
inst: &mut Instance,
1027+
drop_view_subscribers: bool,
10271028
) -> Result<(), ReducerCallError> {
10281029
let reducer_lookup = self.info.module_def.lifecycle_reducer(Lifecycle::OnDisconnect);
10291030
let reducer_name = reducer_lookup
@@ -1046,10 +1047,22 @@ impl ModuleHost {
10461047
let me = self.clone();
10471048
let stdb = me.module.replica_ctx().relational_db.clone();
10481049

1050+
// Decrement the number of subscribers for each view this caller is subscribed to
1051+
let dec_view_subscribers = |tx: &mut MutTxId| {
1052+
if drop_view_subscribers {
1053+
if let Err(err) = tx.dec_st_view_subscribers(caller_identity) {
1054+
log::error!("`call_identity_disconnected`: failed to delete client view data: {err}");
1055+
}
1056+
}
1057+
};
1058+
10491059
// A fallback transaction that deletes the client from `st_client`.
10501060
let fallback = || {
10511061
let database_identity = me.info.database_identity;
10521062
stdb.with_auto_commit(workload(), |mut_tx| {
1063+
1064+
dec_view_subscribers(mut_tx);
1065+
10531066
if !is_client_exist(mut_tx) {
10541067
// The client is already gone. Nothing to do.
10551068
log::debug!(
@@ -1076,7 +1089,9 @@ impl ModuleHost {
10761089

10771090
if let Some((reducer_id, reducer_def)) = reducer_lookup {
10781091
let stdb = me.module.replica_ctx().relational_db.clone();
1079-
let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());
1092+
let mut mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload());
1093+
1094+
dec_view_subscribers(&mut mut_tx);
10801095

10811096
if !is_client_exist(&mut_tx) {
10821097
// The client is already gone. Nothing to do.
@@ -1151,10 +1166,11 @@ impl ModuleHost {
11511166
&self,
11521167
caller_identity: Identity,
11531168
caller_connection_id: ConnectionId,
1169+
drop_view_subscribers: bool,
11541170
) -> Result<(), ReducerCallError> {
11551171
let me = self.clone();
11561172
self.call("call_identity_disconnected", move |inst| {
1157-
me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst)
1173+
me.call_identity_disconnected_inner(caller_identity, caller_connection_id, inst, drop_view_subscribers)
11581174
})
11591175
.await?
11601176
}
@@ -1166,8 +1182,10 @@ impl ModuleHost {
11661182
let stdb = &me.module.replica_ctx().relational_db;
11671183
let workload = Workload::Internal;
11681184
stdb.with_auto_commit(workload, |mut_tx| {
1185+
stdb.clear_all_views(mut_tx)?;
11691186
stdb.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?;
11701187
stdb.clear_table(mut_tx, ST_CLIENT_ID)?;
1188+
stdb.clear_table(mut_tx, ST_VIEW_SUB_ID)?;
11711189
Ok::<(), DBError>(())
11721190
})
11731191
})

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ use crate::{
2424
use crate::{
2525
locking_tx_datastore::mut_tx::ReadSet,
2626
system_tables::{
27-
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_CLIENT_ID, ST_VIEW_CLIENT_IDX,
28-
ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID, ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX,
27+
ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_IDX, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_IDX, ST_VIEW_ID,
28+
ST_VIEW_IDX, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_IDX, ST_VIEW_SUB_ID, ST_VIEW_SUB_IDX,
2929
},
3030
};
3131
use anyhow::anyhow;
@@ -304,7 +304,7 @@ impl CommittedState {
304304
self.create_table(ST_VIEW_ID, schemas[ST_VIEW_IDX].clone());
305305
self.create_table(ST_VIEW_PARAM_ID, schemas[ST_VIEW_PARAM_IDX].clone());
306306
self.create_table(ST_VIEW_COLUMN_ID, schemas[ST_VIEW_COLUMN_IDX].clone());
307-
self.create_table(ST_VIEW_CLIENT_ID, schemas[ST_VIEW_CLIENT_IDX].clone());
307+
self.create_table(ST_VIEW_SUB_ID, schemas[ST_VIEW_SUB_IDX].clone());
308308
self.create_table(ST_VIEW_ARG_ID, schemas[ST_VIEW_ARG_IDX].clone());
309309

310310
// Insert the sequences into `st_sequences`

crates/datastore/src/locking_tx_datastore/datastore.rs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1252,13 +1252,13 @@ mod tests {
12521252
use crate::system_tables::{
12531253
system_tables, StColumnRow, StConnectionCredentialsFields, StConstraintData, StConstraintFields,
12541254
StConstraintRow, StIndexAlgorithm, StIndexFields, StIndexRow, StRowLevelSecurityFields, StScheduledFields,
1255-
StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_NAME,
1256-
ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME, ST_CONSTRAINT_ID,
1257-
ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE,
1255+
StSequenceFields, StSequenceRow, StTableRow, StVarFields, StViewArgFields, StViewFields, ST_CLIENT_ID,
1256+
ST_CLIENT_NAME, ST_COLUMN_ID, ST_COLUMN_NAME, ST_CONNECTION_CREDENTIALS_ID, ST_CONNECTION_CREDENTIALS_NAME,
1257+
ST_CONSTRAINT_ID, ST_CONSTRAINT_NAME, ST_INDEX_ID, ST_INDEX_NAME, ST_MODULE_NAME, ST_RESERVED_SEQUENCE_RANGE,
12581258
ST_ROW_LEVEL_SECURITY_ID, ST_ROW_LEVEL_SECURITY_NAME, ST_SCHEDULED_ID, ST_SCHEDULED_NAME, ST_SEQUENCE_ID,
1259-
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_CLIENT_ID,
1260-
ST_VIEW_CLIENT_NAME, ST_VIEW_COLUMN_ID, ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID,
1261-
ST_VIEW_PARAM_NAME,
1259+
ST_SEQUENCE_NAME, ST_TABLE_NAME, ST_VAR_ID, ST_VAR_NAME, ST_VIEW_ARG_ID, ST_VIEW_ARG_NAME, ST_VIEW_COLUMN_ID,
1260+
ST_VIEW_COLUMN_NAME, ST_VIEW_ID, ST_VIEW_NAME, ST_VIEW_PARAM_ID, ST_VIEW_PARAM_NAME, ST_VIEW_SUB_ID,
1261+
ST_VIEW_SUB_NAME,
12621262
};
12631263
use crate::traits::{IsolationLevel, MutTx};
12641264
use crate::Result;
@@ -1272,7 +1272,7 @@ mod tests {
12721272
use spacetimedb_lib::error::ResultTest;
12731273
use spacetimedb_lib::st_var::StVarValue;
12741274
use spacetimedb_lib::{resolved_type_via_v9, ScheduleAt, TimeDuration};
1275-
use spacetimedb_primitives::{col_list, ColId, ScheduleId, ViewId};
1275+
use spacetimedb_primitives::{col_list, ArgId, ColId, ScheduleId, ViewId};
12761276
use spacetimedb_sats::algebraic_value::ser::value_serialize;
12771277
use spacetimedb_sats::bsatn::ToBsatn;
12781278
use spacetimedb_sats::layout::RowTypeLayout;
@@ -1715,7 +1715,7 @@ mod tests {
17151715
TableRow { id: ST_VIEW_ID.into(), name: ST_VIEW_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewFields::ViewId.into()) },
17161716
TableRow { id: ST_VIEW_PARAM_ID.into(), name: ST_VIEW_PARAM_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
17171717
TableRow { id: ST_VIEW_COLUMN_ID.into(), name: ST_VIEW_COLUMN_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
1718-
TableRow { id: ST_VIEW_CLIENT_ID.into(), name: ST_VIEW_CLIENT_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
1718+
TableRow { id: ST_VIEW_SUB_ID.into(), name: ST_VIEW_SUB_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: None },
17191719
TableRow { id: ST_VIEW_ARG_ID.into(), name: ST_VIEW_ARG_NAME, ty: StTableType::System, access: StAccess::Public, primary_key: Some(StViewArgFields::Id.into()) },
17201720

17211721
]));
@@ -1793,10 +1793,12 @@ mod tests {
17931793
ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 2, name: "col_name", ty: AlgebraicType::String },
17941794
ColRow { table: ST_VIEW_COLUMN_ID.into(), pos: 3, name: "col_type", ty: AlgebraicType::bytes() },
17951795

1796-
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() },
1797-
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 1, name: "arg_id", ty: AlgebraicType::U64 },
1798-
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 },
1799-
ColRow { table: ST_VIEW_CLIENT_ID.into(), pos: 3, name: "connection_id", ty: AlgebraicType::U128 },
1796+
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 0, name: "view_id", ty: ViewId::get_type() },
1797+
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 1, name: "arg_id", ty: ArgId::get_type() },
1798+
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 2, name: "identity", ty: AlgebraicType::U256 },
1799+
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 3, name: "num_subscribers", ty: AlgebraicType::U64 },
1800+
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 4, name: "has_subscribers", ty: AlgebraicType::Bool },
1801+
ColRow { table: ST_VIEW_SUB_ID.into(), pos: 5, name: "last_called", ty: AlgebraicType::I64 },
18001802

18011803
ColRow { table: ST_VIEW_ARG_ID.into(), pos: 0, name: "id", ty: AlgebraicType::U64 },
18021804
ColRow { table: ST_VIEW_ARG_ID.into(), pos: 1, name: "bytes", ty: AlgebraicType::bytes() },
@@ -1820,10 +1822,11 @@ mod tests {
18201822
IndexRow { id: 15, table: ST_VIEW_ID.into(), col: col(1), name: "st_view_view_name_idx_btree", },
18211823
IndexRow { id: 16, table: ST_VIEW_PARAM_ID.into(), col: col_list![0, 1], name: "st_view_param_view_id_param_pos_idx_btree", },
18221824
IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", },
1823-
IndexRow { id: 18, table: ST_VIEW_CLIENT_ID.into(), col: col_list![0, 1], name: "st_view_client_view_id_arg_id_idx_btree", },
1824-
IndexRow { id: 19, table: ST_VIEW_CLIENT_ID.into(), col: col_list![2, 3], name: "st_view_client_identity_connection_id_idx_btree", },
1825-
IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
1826-
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
1825+
IndexRow { id: 18, table: ST_VIEW_SUB_ID.into(), col: col(2), name: "st_view_sub_identity_idx_btree", },
1826+
IndexRow { id: 19, table: ST_VIEW_SUB_ID.into(), col: col(4), name: "st_view_sub_has_subscribers_idx_btree", },
1827+
IndexRow { id: 20, table: ST_VIEW_SUB_ID.into(), col: col_list![0, 1, 2], name: "st_view_sub_view_id_arg_id_identity_idx_btree", },
1828+
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
1829+
IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
18271830
]));
18281831
let start = ST_RESERVED_SEQUENCE_RANGE as i128 + 1;
18291832
#[rustfmt::skip]
@@ -2282,10 +2285,11 @@ mod tests {
22822285
IndexRow { id: 15, table: ST_VIEW_ID.into(), col: col(1), name: "st_view_view_name_idx_btree", },
22832286
IndexRow { id: 16, table: ST_VIEW_PARAM_ID.into(), col: col_list![0, 1], name: "st_view_param_view_id_param_pos_idx_btree", },
22842287
IndexRow { id: 17, table: ST_VIEW_COLUMN_ID.into(), col: col_list![0, 1], name: "st_view_column_view_id_col_pos_idx_btree", },
2285-
IndexRow { id: 18, table: ST_VIEW_CLIENT_ID.into(), col: col_list![0, 1], name: "st_view_client_view_id_arg_id_idx_btree", },
2286-
IndexRow { id: 19, table: ST_VIEW_CLIENT_ID.into(), col: col_list![2, 3], name: "st_view_client_identity_connection_id_idx_btree", },
2287-
IndexRow { id: 20, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
2288-
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
2288+
IndexRow { id: 18, table: ST_VIEW_SUB_ID.into(), col: col(2), name: "st_view_sub_identity_idx_btree", },
2289+
IndexRow { id: 19, table: ST_VIEW_SUB_ID.into(), col: col(4), name: "st_view_sub_has_subscribers_idx_btree", },
2290+
IndexRow { id: 20, table: ST_VIEW_SUB_ID.into(), col: col_list![0, 1, 2], name: "st_view_sub_view_id_arg_id_identity_idx_btree", },
2291+
IndexRow { id: 21, table: ST_VIEW_ARG_ID.into(), col: col(0), name: "st_view_arg_id_idx_btree", },
2292+
IndexRow { id: 22, table: ST_VIEW_ARG_ID.into(), col: col(1), name: "st_view_arg_bytes_idx_btree", },
22892293
IndexRow { id: seq_start, table: FIRST_NON_SYSTEM_ID, col: col(0), name: "Foo_id_idx_btree", },
22902294
IndexRow { id: seq_start + 1, table: FIRST_NON_SYSTEM_ID, col: col(1), name: "Foo_name_idx_btree", },
22912295
IndexRow { id: seq_start + 2, table: FIRST_NON_SYSTEM_ID, col: col(2), name: "Foo_age_idx_btree", },

0 commit comments

Comments
 (0)