Skip to content

Commit e93de33

Browse files
CentrilShubham8287
andauthored
datastore: add clear_table and fix drop_table (#3214)
# Description of Changes Aternative to and closes #3210. This version relies on `pending_schema_changes`. The first commit adds `clear_table` to the datastore that's efficient and can be exposed to the module ABI in a follow up. The second commit fixes `drop_table`. # API and ABI breaking changes None # Expected complexity level and risk 3? # Testing `test_drop_table_is_transactional` is amended to check `TxData`. --------- Signed-off-by: Mazdak Farrokhzad <twingoow@gmail.com> Co-authored-by: Shubham Mishra <shubham@clockworklabs.io> Co-authored-by: Shubham Mishra <shivam828787@gmail.com>
1 parent 3255904 commit e93de33

File tree

7 files changed

+282
-107
lines changed

7 files changed

+282
-107
lines changed

crates/core/src/db/relational_db.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1427,13 +1427,9 @@ impl RelationalDB {
14271427
}
14281428

14291429
/// Clear all rows from a table without dropping it.
1430-
pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result<(), DBError> {
1431-
let relation = self
1432-
.iter_mut(tx, table_id)?
1433-
.map(|row_ref| row_ref.pointer())
1434-
.collect::<Vec<_>>();
1435-
self.delete(tx, table_id, relation);
1436-
Ok(())
1430+
pub fn clear_table(&self, tx: &mut MutTx, table_id: TableId) -> Result<usize, DBError> {
1431+
let rows_deleted = tx.clear_table(table_id)?;
1432+
Ok(rows_deleted)
14371433
}
14381434

14391435
pub fn create_sequence(&self, tx: &mut MutTx, sequence_schema: SequenceSchema) -> Result<SequenceId, DBError> {

crates/core/src/host/module_host.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,8 @@ impl ModuleHost {
904904
let workload = Workload::Internal;
905905
stdb.with_auto_commit(workload, |mut_tx| {
906906
stdb.clear_table(mut_tx, ST_CONNECTION_CREDENTIALS_ID)?;
907-
stdb.clear_table(mut_tx, ST_CLIENT_ID)
907+
stdb.clear_table(mut_tx, ST_CLIENT_ID)?;
908+
Ok::<(), DBError>(())
908909
})
909910
})
910911
.await?

crates/datastore/src/locking_tx_datastore/committed_state.rs

Lines changed: 102 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
};
2525
use anyhow::anyhow;
2626
use core::{convert::Infallible, ops::RangeBounds};
27-
use spacetimedb_data_structures::map::{HashSet, IntMap};
27+
use spacetimedb_data_structures::map::{HashSet, IntMap, IntSet};
2828
use spacetimedb_durability::TxOffset;
2929
use spacetimedb_lib::{db::auth::StTableType, Identity};
3030
use spacetimedb_primitives::{ColId, ColList, ColSet, IndexId, TableId};
@@ -42,6 +42,7 @@ use spacetimedb_table::{
4242
};
4343
use std::collections::BTreeMap;
4444
use std::sync::Arc;
45+
use thin_vec::ThinVec;
4546

4647
/// Contains the live, in-memory snapshot of a database. This structure
4748
/// is exposed in order to support tools wanting to process the commit
@@ -63,6 +64,11 @@ pub struct CommittedState {
6364
/// Pages are shared between all modules running on a particular host,
6465
/// not allocated per-module.
6566
pub(super) page_pool: PagePool,
67+
/// Whether the table was dropped during replay.
68+
/// TODO(centril): Only used during bootstrap and is otherwise unused.
69+
/// We should split `CommittedState` into two types
70+
/// where one, e.g., `ReplayCommittedState`, has this field.
71+
table_dropped: IntSet<TableId>,
6672
}
6773

6874
impl MemoryUsage for CommittedState {
@@ -73,9 +79,14 @@ impl MemoryUsage for CommittedState {
7379
blob_store,
7480
index_id_map,
7581
page_pool: _,
82+
table_dropped,
7683
} = self;
7784
// NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource.
78-
next_tx_offset.heap_usage() + tables.heap_usage() + blob_store.heap_usage() + index_id_map.heap_usage()
85+
next_tx_offset.heap_usage()
86+
+ tables.heap_usage()
87+
+ blob_store.heap_usage()
88+
+ index_id_map.heap_usage()
89+
+ table_dropped.heap_usage()
7990
}
8091
}
8192

@@ -137,6 +148,7 @@ impl CommittedState {
137148
tables: <_>::default(),
138149
blob_store: <_>::default(),
139150
index_id_map: <_>::default(),
151+
table_dropped: <_>::default(),
140152
page_pool,
141153
}
142154
}
@@ -303,17 +315,35 @@ impl CommittedState {
303315
Ok(())
304316
}
305317

306-
pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, rel: &ProductValue) -> Result<()> {
307-
let table = self
308-
.tables
309-
.get_mut(&table_id)
310-
.ok_or(TableError::IdNotFoundState(table_id))?;
318+
pub(super) fn replay_delete_by_rel(&mut self, table_id: TableId, row: &ProductValue) -> Result<()> {
319+
// Get the table for mutation.
320+
let table = match self.tables.get_mut(&table_id) {
321+
Some(t) => t,
322+
// (1) If it was dropped, avoid an error and just ignore the row instead.
323+
None if self.table_dropped.contains(&table_id) => return Ok(()),
324+
None => return Err(TableError::IdNotFoundState(table_id).into()),
325+
};
326+
327+
// Delete the row.
311328
let blob_store = &mut self.blob_store;
312329
table
313-
.delete_equal_row(&self.page_pool, blob_store, rel)
330+
.delete_equal_row(&self.page_pool, blob_store, row)
314331
.map_err(TableError::Bflatn)?
315332
.ok_or_else(|| anyhow!("Delete for non-existent row when replaying transaction"))?;
316333

334+
if table_id == ST_TABLE_ID {
335+
// A row was removed from `st_table`, so a table was dropped.
336+
// Remove that table from the in-memory structures.
337+
let dropped_table_id = Self::read_table_id(row);
338+
self.tables
339+
.remove(&dropped_table_id)
340+
.expect("table to remove should exist");
341+
// Mark the table as dropped so that when
342+
// processing row deletions for that table later,
343+
// they are simply ignored in (1).
344+
self.table_dropped.insert(dropped_table_id);
345+
}
346+
317347
Ok(())
318348
}
319349

@@ -348,8 +378,7 @@ impl CommittedState {
348378
///
349379
/// The `row_ptr` is a pointer to `row`.
350380
fn st_column_changed(&mut self, row: &ProductValue, row_ptr: RowPointer) -> Result<()> {
351-
let target_table_id = TableId::deserialize(ValueDeserializer::from_ref(&row.elements[0]))
352-
.expect("first field in `st_column` should decode to a `TableId`");
381+
let target_table_id = Self::read_table_id(row);
353382
let target_col_id = ColId::deserialize(ValueDeserializer::from_ref(&row.elements[1]))
354383
.expect("second field in `st_column` should decode to a `ColId`");
355384

@@ -380,6 +409,12 @@ impl CommittedState {
380409
Ok(())
381410
}
382411

412+
/// Assuming that a `TableId` is stored as the first field in `row`, read it.
413+
fn read_table_id(row: &ProductValue) -> TableId {
414+
TableId::deserialize(ValueDeserializer::from_ref(&row.elements[0]))
415+
.expect("first field in `st_column` should decode to a `TableId`")
416+
}
417+
383418
/// Builds the in-memory state of sequences from `st_sequence` system table.
384419
/// The tables store the lasted allocated value, which tells us where to start generating.
385420
pub(super) fn build_sequence_state(&mut self) -> Result<SequencesState> {
@@ -562,7 +597,7 @@ impl CommittedState {
562597
let mut tx_data = TxData::default();
563598

564599
// First, apply deletes. This will free up space in the committed tables.
565-
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables);
600+
self.merge_apply_deletes(&mut tx_data, tx_state.delete_tables, tx_state.pending_schema_changes);
566601

567602
// Then, apply inserts. This will re-fill the holes freed by deletions
568603
// before allocating new pages.
@@ -578,33 +613,68 @@ impl CommittedState {
578613
tx_data
579614
}
580615

581-
fn merge_apply_deletes(&mut self, tx_data: &mut TxData, delete_tables: BTreeMap<TableId, DeleteTable>) {
616+
fn merge_apply_deletes(
617+
&mut self,
618+
tx_data: &mut TxData,
619+
delete_tables: BTreeMap<TableId, DeleteTable>,
620+
pending_schema_changes: ThinVec<PendingSchemaChange>,
621+
) {
622+
fn delete_rows(
623+
tx_data: &mut TxData,
624+
table_id: TableId,
625+
table: &mut Table,
626+
blob_store: &mut dyn BlobStore,
627+
row_ptrs_len: usize,
628+
row_ptrs: impl Iterator<Item = RowPointer>,
629+
) {
630+
let mut deletes = Vec::with_capacity(row_ptrs_len);
631+
632+
// Note: we maintain the invariant that the delete_tables
633+
// holds only committed rows which should be deleted,
634+
// i.e. `RowPointer`s with `SquashedOffset::COMMITTED_STATE`,
635+
// so no need to check before applying the deletes.
636+
for row_ptr in row_ptrs {
637+
debug_assert!(row_ptr.squashed_offset().is_committed_state());
638+
639+
// TODO: re-write `TxData` to remove `ProductValue`s
640+
let pv = table
641+
.delete(blob_store, row_ptr, |row| row.to_product_value())
642+
.expect("Delete for non-existent row!");
643+
deletes.push(pv);
644+
}
645+
646+
if !deletes.is_empty() {
647+
let table_name = &*table.get_schema().table_name;
648+
// TODO(centril): Pass this along to record truncated tables.
649+
let _truncated = table.row_count == 0;
650+
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
651+
}
652+
}
653+
582654
for (table_id, row_ptrs) in delete_tables {
583655
if let (Some(table), blob_store, _) = self.get_table_and_blob_store_mut(table_id) {
584-
let mut deletes = Vec::with_capacity(row_ptrs.len());
585-
586-
// Note: we maintain the invariant that the delete_tables
587-
// holds only committed rows which should be deleted,
588-
// i.e. `RowPointer`s with `SquashedOffset::COMMITTED_STATE`,
589-
// so no need to check before applying the deletes.
590-
for row_ptr in row_ptrs.iter() {
591-
debug_assert!(row_ptr.squashed_offset().is_committed_state());
592-
593-
// TODO: re-write `TxData` to remove `ProductValue`s
594-
let pv = table
595-
.delete(blob_store, row_ptr, |row| row.to_product_value())
596-
.expect("Delete for non-existent row!");
597-
deletes.push(pv);
598-
}
599-
600-
if !deletes.is_empty() {
601-
let table_name = &*table.get_schema().table_name;
602-
tx_data.set_deletes_for_table(table_id, table_name, deletes.into());
603-
}
656+
delete_rows(tx_data, table_id, table, blob_store, row_ptrs.len(), row_ptrs.iter());
604657
} else if !row_ptrs.is_empty() {
605658
panic!("Deletion for non-existent table {table_id:?}... huh?");
606659
}
607660
}
661+
662+
// Delete all tables marked for deletion.
663+
// The order here does not matter as once a `table_id` has been dropped
664+
// it will never be re-created.
665+
for change in pending_schema_changes {
666+
if let PendingSchemaChange::TableRemoved(table_id, mut table) = change {
667+
let row_ptrs = table.scan_all_row_ptrs();
668+
delete_rows(
669+
tx_data,
670+
table_id,
671+
&mut table,
672+
&mut self.blob_store,
673+
row_ptrs.len(),
674+
row_ptrs.into_iter(),
675+
);
676+
}
677+
}
608678
}
609679

610680
fn merge_apply_inserts(

0 commit comments

Comments
 (0)