Skip to content

Commit 5d9e78c

Browse files
committed
Pool ActiveQuerys in the query stack
1 parent 7ce1f6f commit 5d9e78c

File tree

4 files changed

+111
-30
lines changed

4 files changed

+111
-30
lines changed

src/active_query.rs

+95-11
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::ops::Not;
2+
use std::{mem, ops};
23

34
use super::zalsa_local::{QueryEdges, QueryOrigin, QueryRevisions};
45
use crate::accumulator::accumulated_map::AtomicInputAccumulatedValues;
@@ -63,7 +64,7 @@ pub(crate) struct ActiveQuery {
6364
}
6465

6566
impl ActiveQuery {
66-
pub(super) fn new(database_key_index: DatabaseKeyIndex) -> Self {
67+
pub(crate) fn new(database_key_index: DatabaseKeyIndex) -> Self {
6768
ActiveQuery {
6869
database_key_index,
6970
durability: Durability::MAX,
@@ -78,6 +79,32 @@ impl ActiveQuery {
7879
}
7980
}
8081

82+
fn reset(&mut self, new_database_key_index: DatabaseKeyIndex) {
83+
let Self {
84+
database_key_index,
85+
durability,
86+
changed_at,
87+
input_outputs,
88+
untracked_read,
89+
cycle,
90+
disambiguator_map,
91+
tracked_struct_ids,
92+
accumulated,
93+
accumulated_inputs,
94+
} = self;
95+
*database_key_index = new_database_key_index;
96+
*durability = Durability::MAX;
97+
*changed_at = Revision::start();
98+
input_outputs.clear();
99+
*untracked_read = false;
100+
*cycle = None;
101+
*accumulated_inputs = Default::default();
102+
disambiguator_map.clear();
103+
// These two are cleared via `mem::take`` when popped off as revisions.
104+
debug_assert!(tracked_struct_ids.is_empty());
105+
_ = accumulated;
106+
}
107+
81108
pub(super) fn add_read(
82109
&mut self,
83110
input: InputDependencyIndex,
@@ -113,24 +140,38 @@ impl ActiveQuery {
113140
self.input_outputs.contains(&QueryEdge::Output(key))
114141
}
115142

116-
pub(crate) fn into_revisions(self) -> QueryRevisions {
117-
let edges = QueryEdges::new(self.input_outputs);
118-
let origin = if self.untracked_read {
143+
fn take_revisions(&mut self) -> QueryRevisions {
144+
let &mut Self {
145+
database_key_index: _,
146+
cycle: _,
147+
disambiguator_map: _,
148+
durability,
149+
changed_at,
150+
untracked_read,
151+
accumulated_inputs,
152+
ref mut input_outputs,
153+
ref mut tracked_struct_ids,
154+
ref mut accumulated,
155+
} = self;
156+
157+
let edges = QueryEdges::new(input_outputs.drain(..));
158+
let origin = if untracked_read {
119159
QueryOrigin::DerivedUntracked(edges)
120160
} else {
121161
QueryOrigin::Derived(edges)
122162
};
123-
let accumulated = self
124-
.accumulated
163+
let accumulated = accumulated
125164
.is_empty()
126165
.not()
127-
.then(|| Box::new(self.accumulated));
166+
.then(|| Box::new(mem::take(accumulated)));
167+
let tracked_struct_ids = mem::take(tracked_struct_ids);
168+
let accumulated_inputs = AtomicInputAccumulatedValues::new(accumulated_inputs);
128169
QueryRevisions {
129-
changed_at: self.changed_at,
170+
changed_at,
171+
durability,
130172
origin,
131-
durability: self.durability,
132-
tracked_struct_ids: self.tracked_struct_ids,
133-
accumulated_inputs: AtomicInputAccumulatedValues::new(self.accumulated_inputs),
173+
tracked_struct_ids,
174+
accumulated_inputs,
134175
accumulated,
135176
}
136177
}
@@ -166,3 +207,46 @@ impl ActiveQuery {
166207
self.disambiguator_map.disambiguate(key)
167208
}
168209
}
210+
211+
#[derive(Debug, Default)]
212+
pub(crate) struct QueryStack {
213+
stack: Vec<ActiveQuery>,
214+
len: usize,
215+
}
216+
217+
impl ops::Deref for QueryStack {
218+
type Target = [ActiveQuery];
219+
220+
fn deref(&self) -> &Self::Target {
221+
&self.stack[..self.len]
222+
}
223+
}
224+
225+
impl ops::DerefMut for QueryStack {
226+
fn deref_mut(&mut self) -> &mut Self::Target {
227+
&mut self.stack[..self.len]
228+
}
229+
}
230+
231+
impl QueryStack {
232+
pub(crate) fn push_new_query(&mut self, database_key_index: DatabaseKeyIndex) {
233+
if self.len < self.stack.len() {
234+
self.stack[self.len].reset(database_key_index);
235+
} else {
236+
self.stack.push(ActiveQuery::new(database_key_index));
237+
}
238+
self.len += 1;
239+
}
240+
241+
pub(crate) fn len(&self) -> usize {
242+
self.len
243+
}
244+
245+
pub(crate) fn pop_into_revisions(&mut self) -> Option<QueryRevisions> {
246+
if self.len == 0 {
247+
return None;
248+
}
249+
self.len -= 1;
250+
Some(self.stack[self.len].take_revisions())
251+
}
252+
}

src/function/execute.rs

-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ where
4545

4646
// Query was not previously executed, or value is potentially
4747
// stale, or value is absent. Let's execute!
48-
let database_key_index = active_query.database_key_index;
4948
let id = database_key_index.key_index;
5049
let value = match Cycle::catch(|| C::execute(db, C::id_to_input(db, id))) {
5150
Ok(v) => v,

src/tracked_struct.rs

+4
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,10 @@ impl DisambiguatorMap {
326326
disambiguator.0 += 1;
327327
result
328328
}
329+
330+
pub fn clear(&mut self) {
331+
self.map.clear()
332+
}
329333
}
330334

331335
impl<C> IngredientImpl<C>

src/zalsa_local.rs

+12-18
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use tracing::debug;
44
use crate::accumulator::accumulated_map::{
55
AccumulatedMap, AtomicInputAccumulatedValues, InputAccumulatedValues,
66
};
7-
use crate::active_query::ActiveQuery;
7+
use crate::active_query::QueryStack;
88
use crate::durability::Durability;
99
use crate::key::{DatabaseKeyIndex, InputDependencyIndex, OutputDependencyIndex};
1010
use crate::runtime::StampedValue;
@@ -37,7 +37,7 @@ pub struct ZalsaLocal {
3737
///
3838
/// Unwinding note: pushes onto this vector must be popped -- even
3939
/// during unwinding.
40-
query_stack: RefCell<Vec<ActiveQuery>>,
40+
query_stack: RefCell<QueryStack>,
4141

4242
/// Stores the most recent page for a given ingredient.
4343
/// This is thread-local to avoid contention.
@@ -47,7 +47,7 @@ pub struct ZalsaLocal {
4747
impl ZalsaLocal {
4848
pub(crate) fn new() -> Self {
4949
ZalsaLocal {
50-
query_stack: RefCell::new(vec![]),
50+
query_stack: RefCell::new(QueryStack::default()),
5151
most_recent_pages: RefCell::new(FxHashMap::default()),
5252
}
5353
}
@@ -88,7 +88,7 @@ impl ZalsaLocal {
8888
#[inline]
8989
pub(crate) fn push_query(&self, database_key_index: DatabaseKeyIndex) -> ActiveQueryGuard<'_> {
9090
let mut query_stack = self.query_stack.borrow_mut();
91-
query_stack.push(ActiveQuery::new(database_key_index));
91+
query_stack.push_new_query(database_key_index);
9292
ActiveQueryGuard {
9393
local_state: self,
9494
database_key_index,
@@ -97,8 +97,8 @@ impl ZalsaLocal {
9797
}
9898

9999
/// Executes a closure within the context of the current active query stacks.
100-
pub(crate) fn with_query_stack<R>(&self, c: impl FnOnce(&mut Vec<ActiveQuery>) -> R) -> R {
101-
c(self.query_stack.borrow_mut().as_mut())
100+
pub(crate) fn with_query_stack<R>(&self, c: impl FnOnce(&mut QueryStack) -> R) -> R {
101+
c(&mut self.query_stack.borrow_mut())
102102
}
103103

104104
/// Returns the index of the active query along with its *current* durability/changed-at
@@ -485,15 +485,15 @@ pub(crate) struct ActiveQueryGuard<'me> {
485485
}
486486

487487
impl ActiveQueryGuard<'_> {
488-
fn pop_helper(&self) -> ActiveQuery {
488+
fn pop_impl(&self) -> QueryRevisions {
489489
self.local_state.with_query_stack(|stack| {
490490
// Sanity check: pushes and pops should be balanced.
491491
assert_eq!(stack.len(), self.push_len);
492492
debug_assert_eq!(
493493
stack.last().unwrap().database_key_index,
494494
self.database_key_index
495495
);
496-
stack.pop().unwrap()
496+
stack.pop_into_revisions().unwrap()
497497
})
498498
}
499499

@@ -508,8 +508,8 @@ impl ActiveQueryGuard<'_> {
508508
}
509509

510510
/// Invoked when the query has successfully completed execution.
511-
pub(crate) fn complete(self) -> ActiveQuery {
512-
let query = self.pop_helper();
511+
fn complete(self) -> QueryRevisions {
512+
let query = self.pop_impl();
513513
std::mem::forget(self);
514514
query
515515
}
@@ -519,13 +519,7 @@ impl ActiveQueryGuard<'_> {
519519
/// query's execution.
520520
#[inline]
521521
pub(crate) fn pop(self) -> QueryRevisions {
522-
// Extract accumulated inputs.
523-
let popped_query = self.complete();
524-
525-
// If this frame were a cycle participant, it would have unwound.
526-
assert!(popped_query.cycle.is_none());
527-
528-
popped_query.into_revisions()
522+
self.complete()
529523
}
530524

531525
/// If the active query is registered as a cycle participant, remove and
@@ -538,6 +532,6 @@ impl ActiveQueryGuard<'_> {
538532

539533
impl Drop for ActiveQueryGuard<'_> {
540534
fn drop(&mut self) {
541-
self.pop_helper();
535+
self.pop_impl();
542536
}
543537
}

0 commit comments

Comments
 (0)