1
- use std:: sync:: Arc ;
2
1
use std:: thread:: ThreadId ;
3
2
4
3
use crate :: active_query:: ActiveQuery ;
5
4
use crate :: key:: DatabaseKeyIndex ;
6
5
use crate :: runtime:: WaitResult ;
7
- use parking_lot:: { Condvar , MutexGuard } ;
6
+ use parking_lot:: MutexGuard ;
8
7
use rustc_hash:: FxHashMap ;
9
8
use smallvec:: SmallVec ;
10
9
11
- type QueryStack = Vec < ActiveQuery > ;
12
-
13
10
#[ derive( Debug , Default ) ]
14
11
pub ( super ) struct DependencyGraph {
15
12
/// A `(K -> V)` pair in this map indicates that the the runtime
16
13
/// `K` is blocked on some query executing in the runtime `V`.
17
14
/// This encodes a graph that must be acyclic (or else deadlock
18
15
/// will result).
19
- edges : FxHashMap < ThreadId , Edge > ,
16
+ edges : FxHashMap < ThreadId , edge :: Edge > ,
20
17
21
18
/// Encodes the `ThreadId` that are blocked waiting for the result
22
19
/// of a given query.
@@ -25,18 +22,7 @@ pub(super) struct DependencyGraph {
25
22
/// When a key K completes which had dependent queries Qs blocked on it,
26
23
/// it stores its `WaitResult` here. As they wake up, each query Q in Qs will
27
24
/// come here to fetch their results.
28
- wait_results : FxHashMap < ThreadId , ( QueryStack , WaitResult ) > ,
29
- }
30
-
31
- #[ derive( Debug ) ]
32
- struct Edge {
33
- blocked_on_id : ThreadId ,
34
- blocked_on_key : DatabaseKeyIndex ,
35
- stack : QueryStack ,
36
-
37
- /// Signalled whenever a query with dependents completes.
38
- /// Allows those dependents to check if they are ready to unblock.
39
- condvar : Arc < parking_lot:: Condvar > ,
25
+ wait_results : FxHashMap < ThreadId , WaitResult > ,
40
26
}
41
27
42
28
impl DependencyGraph {
@@ -64,7 +50,7 @@ impl DependencyGraph {
64
50
pub ( super ) fn for_each_cycle_participant (
65
51
& mut self ,
66
52
from_id : ThreadId ,
67
- from_stack : & mut QueryStack ,
53
+ from_stack : & mut [ ActiveQuery ] ,
68
54
database_key : DatabaseKeyIndex ,
69
55
to_id : ThreadId ,
70
56
mut closure : impl FnMut ( & mut [ ActiveQuery ] ) ,
@@ -104,7 +90,7 @@ impl DependencyGraph {
104
90
// load up the next thread (i.e., we start at B/QB2,
105
91
// and then load up the dependency on C/QC2).
106
92
let edge = self . edges . get_mut ( & id) . unwrap ( ) ;
107
- closure ( strip_prefix_query_stack_mut ( & mut edge. stack , key) ) ;
93
+ closure ( strip_prefix_query_stack_mut ( edge. stack_mut ( ) , key) ) ;
108
94
id = edge. blocked_on_id ;
109
95
key = edge. blocked_on_key ;
110
96
}
@@ -123,7 +109,7 @@ impl DependencyGraph {
123
109
pub ( super ) fn maybe_unblock_runtimes_in_cycle (
124
110
& mut self ,
125
111
from_id : ThreadId ,
126
- from_stack : & QueryStack ,
112
+ from_stack : & [ ActiveQuery ] ,
127
113
database_key : DatabaseKeyIndex ,
128
114
to_id : ThreadId ,
129
115
) -> ( bool , bool ) {
@@ -136,7 +122,7 @@ impl DependencyGraph {
136
122
let next_id = edge. blocked_on_id ;
137
123
let next_key = edge. blocked_on_key ;
138
124
139
- if let Some ( cycle) = strip_prefix_query_stack ( & edge. stack , key)
125
+ if let Some ( cycle) = strip_prefix_query_stack ( edge. stack ( ) , key)
140
126
. iter ( )
141
127
. rev ( )
142
128
. find_map ( |aq| aq. cycle . clone ( ) )
@@ -182,19 +168,21 @@ impl DependencyGraph {
182
168
from_id : ThreadId ,
183
169
database_key : DatabaseKeyIndex ,
184
170
to_id : ThreadId ,
185
- from_stack : QueryStack ,
171
+ from_stack : & mut [ ActiveQuery ] ,
186
172
query_mutex_guard : QueryMutexGuard ,
187
- ) -> ( QueryStack , WaitResult ) {
188
- let condvar = me. add_edge ( from_id, database_key, to_id, from_stack) ;
173
+ ) -> WaitResult {
174
+ // SAFETY: We are blocking until the result is removed from `DependencyGraph::wait_results`
175
+ // and as such we are keeping `from_stack` alive.
176
+ let condvar = unsafe { me. add_edge ( from_id, database_key, to_id, from_stack) } ;
189
177
190
178
// Release the mutex that prevents `database_key`
191
179
// from completing, now that the edge has been added.
192
180
drop ( query_mutex_guard) ;
193
181
194
182
loop {
195
- if let Some ( stack_and_result ) = me. wait_results . remove ( & from_id) {
183
+ if let Some ( result ) = me. wait_results . remove ( & from_id) {
196
184
debug_assert ! ( !me. edges. contains_key( & from_id) ) ;
197
- return stack_and_result ;
185
+ return result ;
198
186
}
199
187
condvar. wait ( & mut me) ;
200
188
}
@@ -203,32 +191,28 @@ impl DependencyGraph {
203
191
/// Helper for `block_on`: performs actual graph modification
204
192
/// to add a dependency edge from `from_id` to `to_id`, which is
205
193
/// computing `database_key`.
206
- fn add_edge (
194
+ ///
195
+ /// # Safety
196
+ ///
197
+ /// The caller needs to keep `from_stack`/`'aq`` alive until `from_id` has been removed from the `wait_results`.
198
+ // This safety invariant is consumed by the `Edge` struct
199
+ unsafe fn add_edge < ' aq > (
207
200
& mut self ,
208
201
from_id : ThreadId ,
209
202
database_key : DatabaseKeyIndex ,
210
203
to_id : ThreadId ,
211
- from_stack : QueryStack ,
212
- ) -> Arc < parking_lot :: Condvar > {
204
+ from_stack : & ' aq mut [ ActiveQuery ] ,
205
+ ) -> edge :: EdgeGuard < ' aq > {
213
206
assert_ne ! ( from_id, to_id) ;
214
207
debug_assert ! ( !self . edges. contains_key( & from_id) ) ;
215
208
debug_assert ! ( !self . depends_on( to_id, from_id) ) ;
216
-
217
- let condvar = Arc :: new ( Condvar :: new ( ) ) ;
218
- self . edges . insert (
219
- from_id,
220
- Edge {
221
- blocked_on_id : to_id,
222
- blocked_on_key : database_key,
223
- stack : from_stack,
224
- condvar : condvar. clone ( ) ,
225
- } ,
226
- ) ;
209
+ let ( edge, guard) = edge:: Edge :: new ( to_id, database_key, from_stack) ;
210
+ self . edges . insert ( from_id, edge) ;
227
211
self . query_dependents
228
212
. entry ( database_key)
229
213
. or_default ( )
230
214
. push ( from_id) ;
231
- condvar
215
+ guard
232
216
}
233
217
234
218
/// Invoked when runtime `to_id` completes executing
@@ -253,11 +237,100 @@ impl DependencyGraph {
253
237
/// the lock on this data structure first, to recover the wait result).
254
238
fn unblock_runtime ( & mut self , id : ThreadId , wait_result : WaitResult ) {
255
239
let edge = self . edges . remove ( & id) . expect ( "not blocked" ) ;
256
- self . wait_results . insert ( id, ( edge . stack , wait_result) ) ;
240
+ self . wait_results . insert ( id, wait_result) ;
257
241
258
242
// Now that we have inserted the `wait_results`,
259
243
// notify the thread.
260
- edge. condvar . notify_one ( ) ;
244
+ edge. notify ( ) ;
245
+ }
246
+ }
247
+
248
+ mod edge {
249
+ use std:: { marker:: PhantomData , ptr:: NonNull , sync:: Arc , thread:: ThreadId } ;
250
+
251
+ use parking_lot:: MutexGuard ;
252
+
253
+ use crate :: {
254
+ runtime:: { dependency_graph:: DependencyGraph , ActiveQuery } ,
255
+ DatabaseKeyIndex ,
256
+ } ;
257
+
258
+ #[ derive( Debug ) ]
259
+ pub ( super ) struct Edge {
260
+ pub ( super ) blocked_on_id : ThreadId ,
261
+ pub ( super ) blocked_on_key : DatabaseKeyIndex ,
262
+ stack : SendNonNull < [ ActiveQuery ] > ,
263
+
264
+ /// Signalled whenever a query with dependents completes.
265
+ /// Allows those dependents to check if they are ready to unblock.
266
+ condvar : Arc < parking_lot:: Condvar > ,
267
+ }
268
+
269
+ pub struct EdgeGuard < ' aq > {
270
+ condvar : Arc < parking_lot:: Condvar > ,
271
+ // Inform the borrow checker that the edge stack is borrowed until the guard is released.
272
+ // This is necessary to ensure that the stack is not modified by the caller of
273
+ // `DependencyGraph::add_edge` after the call returns.
274
+ _pd : PhantomData < & ' aq mut ( ) > ,
275
+ }
276
+
277
+ impl EdgeGuard < ' _ > {
278
+ pub fn wait ( & self , mutex_guard : & mut MutexGuard < ' _ , DependencyGraph > ) {
279
+ self . condvar . wait ( mutex_guard)
280
+ }
281
+ }
282
+
283
+ // Wrapper type to allow `Edge` to be `Send` without disregarding its other fields.
284
+ struct SendNonNull < T : ?Sized > ( NonNull < T > ) ;
285
+
286
+ // SAFETY: `Edge` is `Send` as its `stack: NonNull<[ActiveQuery]>,` field is a lifetime erased
287
+ // mutable reference to a `Send` type (`ActiveQuery`) that is subject to the owner of `Edge` and is
288
+ // guaranteed to be live according to the safety invariants of `DependencyGraph::add_edge`.`
289
+ unsafe impl < T : ?Sized > Send for SendNonNull < T > where for < ' a > & ' a mut T : Send { }
290
+ // unsafe impl<T> Sync for SendNonNull<T> where for<'a> &'a mut T: Sync {}
291
+
292
+ impl < T : ?Sized + std:: fmt:: Debug > std:: fmt:: Debug for SendNonNull < T > {
293
+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
294
+ self . 0 . fmt ( f)
295
+ }
296
+ }
297
+
298
+ impl Edge {
299
+ pub ( super ) fn new (
300
+ blocked_on_id : ThreadId ,
301
+ blocked_on_key : DatabaseKeyIndex ,
302
+ stack : & mut [ ActiveQuery ] ,
303
+ ) -> ( Self , EdgeGuard < ' _ > ) {
304
+ let condvar = Arc :: new ( parking_lot:: Condvar :: new ( ) ) ;
305
+ let stack = SendNonNull ( NonNull :: from ( stack) ) ;
306
+ let edge = Self {
307
+ blocked_on_id,
308
+ blocked_on_key,
309
+ stack,
310
+ condvar : condvar. clone ( ) ,
311
+ } ;
312
+ let edge_guard = EdgeGuard {
313
+ condvar,
314
+ _pd : PhantomData ,
315
+ } ;
316
+ ( edge, edge_guard)
317
+ }
318
+
319
+ // unerase the lifetime of the stack
320
+ pub ( super ) fn stack_mut ( & mut self ) -> & mut [ ActiveQuery ] {
321
+ // SAFETY: This is safe due to the invariants upheld by DependencyGraph::add_edge.
322
+ unsafe { self . stack . 0 . as_mut ( ) }
323
+ }
324
+
325
+ // unerase the lifetime of the stack
326
+ pub ( super ) fn stack ( & self ) -> & [ ActiveQuery ] {
327
+ // SAFETY: This is safe due to the invariants upheld by DependencyGraph::add_edge.
328
+ unsafe { self . stack . 0 . as_ref ( ) }
329
+ }
330
+
331
+ pub ( super ) fn notify ( self ) {
332
+ self . condvar . notify_one ( ) ;
333
+ }
261
334
}
262
335
}
263
336
0 commit comments