1
- use std:: {
2
- sync:: atomic:: { AtomicBool , Ordering } ,
3
- thread:: ThreadId ,
4
- } ;
1
+ use std:: thread:: ThreadId ;
5
2
6
- use parking_lot:: RwLock ;
3
+ use parking_lot:: Mutex ;
7
4
8
5
use crate :: {
9
6
key:: DatabaseKeyIndex ,
10
7
runtime:: WaitResult ,
11
8
zalsa:: { MemoIngredientIndex , Zalsa } ,
12
- zalsa_local:: ZalsaLocal ,
13
9
Database ,
14
10
} ;
15
11
@@ -19,36 +15,35 @@ use super::util;
19
15
/// worker threads.
20
16
#[ derive( Default ) ]
21
17
pub ( crate ) struct SyncTable {
22
- syncs : RwLock < Vec < Option < SyncState > > > ,
18
+ syncs : Mutex < Vec < Option < SyncState > > > ,
23
19
}
24
20
25
21
struct SyncState {
26
22
id : ThreadId ,
27
23
28
24
/// Set to true if any other queries are blocked,
29
25
/// waiting for this query to complete.
30
- anyone_waiting : AtomicBool ,
26
+ anyone_waiting : bool ,
31
27
}
32
28
33
29
impl SyncTable {
34
30
pub ( crate ) fn claim < ' me > (
35
31
& ' me self ,
36
- db : & ' me dyn Database ,
32
+ db : & ' me ( impl ? Sized + Database ) ,
37
33
zalsa : & ' me Zalsa ,
38
- zalsa_local : & ZalsaLocal ,
39
34
database_key_index : DatabaseKeyIndex ,
40
35
memo_ingredient_index : MemoIngredientIndex ,
41
36
) -> Option < ClaimGuard < ' me > > {
42
- let mut syncs = self . syncs . write ( ) ;
37
+ let mut syncs = self . syncs . lock ( ) ;
43
38
let thread_id = std:: thread:: current ( ) . id ( ) ;
44
39
45
40
util:: ensure_vec_len ( & mut syncs, memo_ingredient_index. as_usize ( ) + 1 ) ;
46
41
47
- match & syncs[ memo_ingredient_index. as_usize ( ) ] {
42
+ match & mut syncs[ memo_ingredient_index. as_usize ( ) ] {
48
43
None => {
49
44
syncs[ memo_ingredient_index. as_usize ( ) ] = Some ( SyncState {
50
45
id : thread_id,
51
- anyone_waiting : AtomicBool :: new ( false ) ,
46
+ anyone_waiting : false ,
52
47
} ) ;
53
48
Some ( ClaimGuard {
54
49
database_key_index,
@@ -61,16 +56,10 @@ impl SyncTable {
61
56
id : other_id,
62
57
anyone_waiting,
63
58
} ) => {
64
- // NB: `Ordering::Relaxed` is sufficient here,
65
- // as there are no loads that are "gated" on this
66
- // value. Everything that is written is also protected
67
- // by a lock that must be acquired. The role of this
68
- // boolean is to decide *whether* to acquire the lock,
69
- // not to gate future atomic reads.
70
- anyone_waiting. store ( true , Ordering :: Relaxed ) ;
59
+ * anyone_waiting = true ;
71
60
zalsa. runtime ( ) . block_on_or_unwind (
72
- db,
73
- zalsa_local,
61
+ db. as_dyn_database ( ) ,
62
+ db . zalsa_local ( ) ,
74
63
database_key_index,
75
64
* other_id,
76
65
syncs,
@@ -92,30 +81,28 @@ pub(crate) struct ClaimGuard<'me> {
92
81
}
93
82
94
83
impl ClaimGuard < ' _ > {
95
- fn remove_from_map_and_unblock_queries ( & self , wait_result : WaitResult ) {
96
- let mut syncs = self . sync_table . syncs . write ( ) ;
84
+ fn remove_from_map_and_unblock_queries ( & self ) {
85
+ let mut syncs = self . sync_table . syncs . lock ( ) ;
97
86
98
87
let SyncState { anyone_waiting, .. } =
99
88
syncs[ self . memo_ingredient_index . as_usize ( ) ] . take ( ) . unwrap ( ) ;
100
89
101
- // NB: `Ordering::Relaxed` is sufficient here,
102
- // see `store` above for explanation.
103
- if anyone_waiting. load ( Ordering :: Relaxed ) {
104
- self . zalsa
105
- . runtime ( )
106
- . unblock_queries_blocked_on ( self . database_key_index , wait_result)
90
+ if anyone_waiting {
91
+ self . zalsa . runtime ( ) . unblock_queries_blocked_on (
92
+ self . database_key_index ,
93
+ if std:: thread:: panicking ( ) {
94
+ WaitResult :: Panicked
95
+ } else {
96
+ WaitResult :: Completed
97
+ } ,
98
+ )
107
99
}
108
100
}
109
101
}
110
102
111
103
impl Drop for ClaimGuard < ' _ > {
112
104
fn drop ( & mut self ) {
113
- let wait_result = if std:: thread:: panicking ( ) {
114
- WaitResult :: Panicked
115
- } else {
116
- WaitResult :: Completed
117
- } ;
118
- self . remove_from_map_and_unblock_queries ( wait_result)
105
+ self . remove_from_map_and_unblock_queries ( )
119
106
}
120
107
}
121
108
0 commit comments