1
- use std:: { borrow:: Borrow , collections :: BTreeMap } ;
1
+ use std:: borrow:: Borrow ;
2
2
3
3
use crate :: {
4
4
backends:: TableReader ,
@@ -7,25 +7,30 @@ use crate::{
7
7
traits:: {
8
8
IsCompleted , KeyValueStoreBulksTrait , KeyValueStoreManager , KeyValueStoreRead , NeedNext ,
9
9
} ,
10
- types:: ValueEntry ,
11
10
StorageError ,
12
11
} ;
13
12
14
13
use super :: {
15
14
get_versioned_key,
15
+ pending_part:: { pending_schema:: PendingKeyValueConfig , VersionedMap } ,
16
16
table_schema:: { HistoryChangeTable , HistoryIndicesTable , VersionedKeyValueSchema } ,
17
17
HistoryIndexKey , PendingError , VersionedStore ,
18
18
} ;
19
19
20
- pub struct SnapshotView < ' db , T : VersionedKeyValueSchema > {
21
- pending_updates : Option < BTreeMap < T :: Key , ValueEntry < T :: Value > > > ,
20
+ #[ cfg( test) ]
21
+ use crate :: types:: ValueEntry ;
22
+ #[ cfg( test) ]
23
+ use std:: collections:: BTreeMap ;
24
+
25
+ pub struct SnapshotView < ' cache , ' db , T : VersionedKeyValueSchema > {
26
+ pending_updates : Option < SnapshotPending < ' cache , T > > ,
22
27
history : Option < SnapshotHistorical < ' db , T > > ,
23
28
}
24
29
25
30
#[ cfg( test) ]
26
31
const MIN_HISTORY_NUMBER_MINUS_ONE : u64 = 0 ;
27
32
28
- impl < ' db , T : VersionedKeyValueSchema > SnapshotView < ' db , T > {
33
+ impl < ' cache , ' db , T : VersionedKeyValueSchema > SnapshotView < ' cache , ' db , T > {
29
34
#[ cfg( test) ]
30
35
fn iter_history ( & self ) -> Result < BTreeMap < T :: Key , ValueEntry < T :: Value > > > {
31
36
if let Some ( ref history) = self . history {
@@ -94,7 +99,10 @@ impl<'db, T: VersionedKeyValueSchema> SnapshotView<'db, T> {
94
99
pub fn iter ( & self ) -> Result < impl Iterator < Item = ( T :: Key , ValueEntry < T :: Value > ) > > {
95
100
let mut map = self . iter_history ( ) ?;
96
101
97
- if let Some ( ref pending_map) = self . pending_updates {
102
+ if let Some ( ref pending_updates) = self . pending_updates {
103
+ let pending_map = pending_updates
104
+ . inner
105
+ . get_versioned_store ( pending_updates. commit_id ) ?;
98
106
for ( k, v) in pending_map {
99
107
map. insert ( k. clone ( ) , v. clone ( ) ) ;
100
108
}
@@ -104,16 +112,28 @@ impl<'db, T: VersionedKeyValueSchema> SnapshotView<'db, T> {
104
112
}
105
113
}
106
114
115
+ struct SnapshotPending < ' cache , T : VersionedKeyValueSchema > {
116
+ commit_id : CommitID ,
117
+ inner : & ' cache VersionedMap < PendingKeyValueConfig < T , CommitID > > ,
118
+ }
119
+
107
120
pub struct SnapshotHistorical < ' db , T : VersionedKeyValueSchema > {
108
121
history_number : HistoryNumber ,
109
122
history_index_table : TableReader < ' db , HistoryIndicesTable < T > > ,
110
123
change_history_table : KeyValueStoreBulks < ' db , HistoryChangeTable < T > > ,
111
124
}
112
125
113
- impl < ' db , T : VersionedKeyValueSchema > KeyValueStoreRead < T :: Key , T :: Value > for SnapshotView < ' db , T > {
126
+ impl < ' cache , ' db , T : VersionedKeyValueSchema > KeyValueStoreRead < T :: Key , T :: Value >
127
+ for SnapshotView < ' cache , ' db , T >
128
+ {
114
129
fn get ( & self , key : & T :: Key ) -> Result < Option < T :: Value > > {
115
- if let Some ( opt_v) = self . pending_updates . as_ref ( ) . and_then ( |u| u. get ( key) ) {
116
- return Ok ( opt_v. to_option ( ) ) ;
130
+ if let Some ( pending_updates) = & self . pending_updates {
131
+ let pending_optv = pending_updates
132
+ . inner
133
+ . get_versioned_key ( & pending_updates. commit_id , key) ?;
134
+ if let Some ( pending_v) = pending_optv {
135
+ return Ok ( pending_v. into_option ( ) ) ;
136
+ }
117
137
}
118
138
119
139
if let Some ( history) = & self . history {
@@ -129,8 +149,8 @@ impl<'db, T: VersionedKeyValueSchema> KeyValueStoreRead<T::Key, T::Value> for Sn
129
149
}
130
150
}
131
151
132
- impl < ' db , T : VersionedKeyValueSchema > KeyValueStoreRead < T :: Key , T :: Value >
133
- for Option < SnapshotView < ' db , T > >
152
+ impl < ' cache , ' db , T : VersionedKeyValueSchema > KeyValueStoreRead < T :: Key , T :: Value >
153
+ for Option < SnapshotView < ' cache , ' db , T > >
134
154
{
135
155
fn get ( & self , key : & T :: Key ) -> Result < Option < T :: Value > > {
136
156
if let Some ( view) = self {
@@ -144,38 +164,35 @@ impl<'db, T: VersionedKeyValueSchema> KeyValueStoreRead<T::Key, T::Value>
144
164
impl < ' cache , ' db , T : VersionedKeyValueSchema > KeyValueStoreManager < T :: Key , T :: Value , CommitID >
145
165
for VersionedStore < ' cache , ' db , T >
146
166
{
147
- type Store = SnapshotView < ' db , T > ;
148
- fn get_versioned_store ( & self , commit : & CommitID ) -> Result < Self :: Store > {
149
- let pending_res = self . pending_part . get_versioned_store ( * commit) ;
150
- match pending_res {
151
- Ok ( pending_map) => {
152
- let history = if let Some ( history_commit) = self . pending_part . get_parent_of_root ( ) {
153
- Some ( SnapshotHistorical {
154
- history_number : self . get_history_number_by_commit_id ( history_commit) ?,
155
- history_index_table : self . history_index_table . clone ( ) ,
156
- change_history_table : self . change_history_table . clone ( ) ,
157
- } )
158
- } else {
159
- None
160
- } ;
161
- Ok ( SnapshotView {
162
- pending_updates : Some ( pending_map) ,
163
- history,
164
- } )
165
- }
166
- Err ( PendingError :: CommitIDNotFound ( target_commit_id) ) => {
167
- assert_eq ! ( target_commit_id, * commit) ;
168
- let history = SnapshotHistorical {
169
- history_number : self . get_history_number_by_commit_id ( * commit) ?,
167
+ type Store < ' a > = SnapshotView < ' a , ' db , T > where Self : ' a ;
168
+ fn get_versioned_store < ' a > ( & ' a self , commit : & CommitID ) -> Result < Self :: Store < ' a > > {
169
+ if self . pending_part . contains_commit_id ( commit) {
170
+ let history = if let Some ( history_commit) = self . pending_part . get_parent_of_root ( ) {
171
+ Some ( SnapshotHistorical {
172
+ history_number : self . get_history_number_by_commit_id ( history_commit) ?,
170
173
history_index_table : self . history_index_table . clone ( ) ,
171
174
change_history_table : self . change_history_table . clone ( ) ,
172
- } ;
173
- Ok ( SnapshotView {
174
- pending_updates : None ,
175
- history : Some ( history) ,
176
175
} )
177
- }
178
- Err ( other_err) => Err ( StorageError :: PendingError ( other_err) ) ,
176
+ } else {
177
+ None
178
+ } ;
179
+ Ok ( SnapshotView {
180
+ pending_updates : Some ( SnapshotPending {
181
+ commit_id : * commit,
182
+ inner : & * self . pending_part ,
183
+ } ) ,
184
+ history,
185
+ } )
186
+ } else {
187
+ let history = SnapshotHistorical {
188
+ history_number : self . get_history_number_by_commit_id ( * commit) ?,
189
+ history_index_table : self . history_index_table . clone ( ) ,
190
+ change_history_table : self . change_history_table . clone ( ) ,
191
+ } ;
192
+ Ok ( SnapshotView {
193
+ pending_updates : None ,
194
+ history : Some ( history) ,
195
+ } )
179
196
}
180
197
}
181
198
0 commit comments