1
1
//! GroveDB debugging support module.
2
2
3
- use std:: { collections:: BTreeMap , fs, sync:: Weak } ;
3
+ use std:: {
4
+ collections:: { BTreeMap , HashMap } ,
5
+ fs,
6
+ sync:: { Arc , Weak } ,
7
+ time:: { Duration , Instant , SystemTime } ,
8
+ } ;
4
9
5
10
use axum:: { extract:: State , http:: StatusCode , response:: IntoResponse , routing:: post, Json , Router } ;
6
11
use grovedb_merk:: {
@@ -11,14 +16,19 @@ use grovedb_merk::{
11
16
use grovedb_path:: SubtreePath ;
12
17
use grovedb_version:: version:: GroveVersion ;
13
18
use grovedbg_types:: {
14
- MerkProofNode , MerkProofOp , NodeFetchRequest , NodeUpdate , Path , PathQuery , Query , QueryItem ,
15
- SizedQuery , SubqueryBranch ,
19
+ DropSessionRequest , MerkProofNode , MerkProofOp , NewSessionResponse , NodeFetchRequest ,
20
+ NodeUpdate , Path , PathQuery , Query , QueryItem , SessionId , SizedQuery , SubqueryBranch ,
21
+ WithSession ,
16
22
} ;
17
23
use indexmap:: IndexMap ;
24
+ use tempfile:: tempdir;
18
25
use tokio:: {
19
26
net:: ToSocketAddrs ,
20
- sync:: mpsc:: { self , Sender } ,
27
+ select,
28
+ sync:: { RwLock , RwLockReadGuard } ,
29
+ time:: sleep,
21
30
} ;
31
+ use tokio_util:: sync:: CancellationToken ;
22
32
use tower_http:: services:: ServeDir ;
23
33
24
34
use crate :: {
@@ -30,6 +40,8 @@ use crate::{
30
40
31
41
const GROVEDBG_ZIP : & [ u8 ] = include_bytes ! ( concat!( env!( "OUT_DIR" ) , "/grovedbg.zip" ) ) ;
32
42
43
+ const SESSION_TIMEOUT : Duration = Duration :: from_secs ( 60 * 10 ) ;
44
+
33
45
pub ( super ) fn start_visualizer < A > ( grovedb : Weak < GroveDb > , addr : A )
34
46
where
35
47
A : ToSocketAddrs + Send + ' static ,
@@ -44,33 +56,132 @@ where
44
56
zip_extensions:: read:: zip_extract ( & grovedbg_zip, & grovedbg_www)
45
57
. expect ( "cannot extract grovedbg contents" ) ;
46
58
47
- let ( shutdown_send, mut shutdown_receive) = mpsc:: channel :: < ( ) > ( 1 ) ;
59
+ let cancellation_token = CancellationToken :: new ( ) ;
60
+
61
+ let state: AppState = AppState {
62
+ cancellation_token : cancellation_token. clone ( ) ,
63
+ grovedb,
64
+ sessions : Default :: default ( ) ,
65
+ } ;
66
+
48
67
let app = Router :: new ( )
68
+ . route ( "/new_session" , post ( new_session) )
69
+ . route ( "/drop_session" , post ( drop_session) )
49
70
. route ( "/fetch_node" , post ( fetch_node) )
50
71
. route ( "/fetch_root_node" , post ( fetch_root_node) )
51
72
. route ( "/prove_path_query" , post ( prove_path_query) )
52
73
. route ( "/fetch_with_path_query" , post ( fetch_with_path_query) )
53
74
. fallback_service ( ServeDir :: new ( grovedbg_www) )
54
- . with_state ( ( shutdown_send, grovedb) ) ;
55
-
56
- tokio:: runtime:: Runtime :: new ( )
57
- . unwrap ( )
58
- . block_on ( async move {
59
- let listener = tokio:: net:: TcpListener :: bind ( addr)
60
- . await
61
- . expect ( "can't bind visualizer port" ) ;
62
- axum:: serve ( listener, app)
63
- . with_graceful_shutdown ( async move {
64
- shutdown_receive. recv ( ) . await ;
65
- } )
66
- . await
67
- . unwrap ( )
68
- } ) ;
75
+ . with_state ( state. clone ( ) ) ;
76
+
77
+ let rt = tokio:: runtime:: Runtime :: new ( ) . unwrap ( ) ;
78
+
79
+ let cloned_cancellation_token = cancellation_token. clone ( ) ;
80
+ rt. spawn ( async move {
81
+ loop {
82
+ select ! {
83
+ _ = cloned_cancellation_token. cancelled( ) => break ,
84
+ _ = sleep( Duration :: from_secs( 10 ) ) => {
85
+ let now = Instant :: now( ) ;
86
+ let mut lock = state. sessions. write( ) . await ;
87
+ let to_delete: Vec <SessionId > = lock. iter( ) . filter_map(
88
+ |( id, session) | ( session. last_access < now - SESSION_TIMEOUT ) . then_some( * id)
89
+ ) . collect( ) ;
90
+
91
+ to_delete. into_iter( ) . for_each( |id| { lock. remove( & id) ; } ) ;
92
+ }
93
+ }
94
+ }
95
+ } ) ;
96
+
97
+ rt. block_on ( async move {
98
+ let listener = tokio:: net:: TcpListener :: bind ( addr)
99
+ . await
100
+ . expect ( "can't bind visualizer port" ) ;
101
+ axum:: serve ( listener, app)
102
+ . with_graceful_shutdown ( async move {
103
+ cancellation_token. cancelled ( ) . await ;
104
+ } )
105
+ . await
106
+ . unwrap ( )
107
+ } ) ;
69
108
} ) ;
70
109
}
71
110
111
+ #[ derive( Clone ) ]
112
+ struct AppState {
113
+ cancellation_token : CancellationToken ,
114
+ grovedb : Weak < GroveDb > ,
115
+ sessions : Arc < RwLock < HashMap < SessionId , Session > > > ,
116
+ }
117
+
118
+ impl AppState {
119
+ fn verify_running ( & self ) -> Result < ( ) , AppError > {
120
+ if self . grovedb . strong_count ( ) == 0 {
121
+ self . cancellation_token . cancel ( ) ;
122
+ Err ( AppError :: Closed )
123
+ } else {
124
+ Ok ( ( ) )
125
+ }
126
+ }
127
+
128
+ async fn new_session ( & self ) -> Result < SessionId , AppError > {
129
+ let grovedb = self . grovedb . upgrade ( ) . ok_or ( AppError :: Closed ) ?;
130
+ let id = SystemTime :: now ( )
131
+ . duration_since ( SystemTime :: UNIX_EPOCH )
132
+ . expect ( "time went backwards" )
133
+ . as_secs ( ) ;
134
+ self . sessions
135
+ . write ( )
136
+ . await
137
+ . insert ( id, Session :: new ( & grovedb) ?) ;
138
+
139
+ Ok ( id)
140
+ }
141
+
142
+ async fn drop_session ( & self , id : SessionId ) {
143
+ self . sessions . write ( ) . await . remove ( & id) ;
144
+ }
145
+
146
+ async fn get_snapshot ( & self , id : SessionId ) -> Result < RwLockReadGuard < GroveDb > , AppError > {
147
+ self . verify_running ( ) ?;
148
+ let mut lock = self . sessions . write ( ) . await ;
149
+ if let Some ( session) = lock. get_mut ( & id) {
150
+ session. last_access = Instant :: now ( ) ;
151
+ Ok ( RwLockReadGuard :: map ( lock. downgrade ( ) , |l| {
152
+ & l. get ( & id) . as_ref ( ) . expect ( "checked above" ) . snapshot
153
+ } ) )
154
+ } else {
155
+ Err ( AppError :: NoSession )
156
+ }
157
+ }
158
+ }
159
+
160
+ struct Session {
161
+ last_access : Instant ,
162
+ _tempdir : tempfile:: TempDir ,
163
+ snapshot : GroveDb ,
164
+ }
165
+
166
+ impl Session {
167
+ fn new ( grovedb : & GroveDb ) -> Result < Self , AppError > {
168
+ let tempdir = tempdir ( ) . map_err ( |e| AppError :: Any ( e. to_string ( ) ) ) ?;
169
+ let path = tempdir. path ( ) . join ( "grovedbg_session" ) ;
170
+ grovedb
171
+ . create_checkpoint ( & path)
172
+ . map_err ( |e| AppError :: Any ( e. to_string ( ) ) ) ?;
173
+ let snapshot = GroveDb :: open ( path) . map_err ( |e| AppError :: Any ( e. to_string ( ) ) ) ?;
174
+ Ok ( Session {
175
+ last_access : Instant :: now ( ) ,
176
+ _tempdir : tempdir,
177
+ snapshot,
178
+ } )
179
+ }
180
+ }
181
+
72
182
enum AppError {
73
183
Closed ,
184
+ NoSession ,
74
185
Any ( String ) ,
75
186
}
76
187
@@ -80,6 +191,9 @@ impl IntoResponse for AppError {
80
191
AppError :: Closed => {
81
192
( StatusCode :: SERVICE_UNAVAILABLE , "GroveDB is closed" ) . into_response ( )
82
193
}
194
+ AppError :: NoSession => {
195
+ ( StatusCode :: UNAUTHORIZED , "No session with this id" ) . into_response ( )
196
+ }
83
197
AppError :: Any ( e) => ( StatusCode :: INTERNAL_SERVER_ERROR , e) . into_response ( ) ,
84
198
}
85
199
}
@@ -91,16 +205,28 @@ impl<E: std::error::Error> From<E> for AppError {
91
205
}
92
206
}
93
207
208
+ async fn new_session ( State ( state) : State < AppState > ) -> Result < Json < NewSessionResponse > , AppError > {
209
+ Ok ( Json ( NewSessionResponse {
210
+ session_id : state. new_session ( ) . await ?,
211
+ } ) )
212
+ }
213
+
214
+ async fn drop_session (
215
+ State ( state) : State < AppState > ,
216
+ Json ( DropSessionRequest { session_id } ) : Json < DropSessionRequest > ,
217
+ ) {
218
+ state. drop_session ( session_id) . await ;
219
+ }
220
+
94
221
async fn fetch_node (
95
- State ( ( shutdown, grovedb) ) : State < ( Sender < ( ) > , Weak < GroveDb > ) > ,
96
- Json ( NodeFetchRequest { path, key } ) : Json < NodeFetchRequest > ,
222
+ State ( state) : State < AppState > ,
223
+ Json ( WithSession {
224
+ session_id,
225
+ request : NodeFetchRequest { path, key } ,
226
+ } ) : Json < WithSession < NodeFetchRequest > > ,
97
227
) -> Result < Json < Option < NodeUpdate > > , AppError > {
98
- let Some ( db) = grovedb. upgrade ( ) else {
99
- shutdown. send ( ( ) ) . await . ok ( ) ;
100
- return Err ( AppError :: Closed ) ;
101
- } ;
228
+ let db = state. get_snapshot ( session_id) . await ?;
102
229
103
- // todo: GroveVersion::latest() to actual version
104
230
let merk = db
105
231
. open_non_transactional_merk_at_path ( path. as_slice ( ) . into ( ) , None , GroveVersion :: latest ( ) )
106
232
. unwrap ( ) ?;
@@ -115,14 +241,14 @@ async fn fetch_node(
115
241
}
116
242
117
243
async fn fetch_root_node (
118
- State ( ( shutdown, grovedb) ) : State < ( Sender < ( ) > , Weak < GroveDb > ) > ,
244
+ State ( state) : State < AppState > ,
245
+ Json ( WithSession {
246
+ session_id,
247
+ request : ( ) ,
248
+ } ) : Json < WithSession < ( ) > > ,
119
249
) -> Result < Json < Option < NodeUpdate > > , AppError > {
120
- let Some ( db) = grovedb. upgrade ( ) else {
121
- shutdown. send ( ( ) ) . await . ok ( ) ;
122
- return Err ( AppError :: Closed ) ;
123
- } ;
250
+ let db = state. get_snapshot ( session_id) . await ?;
124
251
125
- // todo: GroveVersion::latest() to actual version
126
252
let merk = db
127
253
. open_non_transactional_merk_at_path ( SubtreePath :: empty ( ) , None , GroveVersion :: latest ( ) )
128
254
. unwrap ( ) ?;
@@ -138,13 +264,13 @@ async fn fetch_root_node(
138
264
}
139
265
140
266
async fn prove_path_query (
141
- State ( ( shutdown, grovedb) ) : State < ( Sender < ( ) > , Weak < GroveDb > ) > ,
142
- Json ( json_path_query) : Json < PathQuery > ,
267
+ State ( state) : State < AppState > ,
268
+ Json ( WithSession {
269
+ session_id,
270
+ request : json_path_query,
271
+ } ) : Json < WithSession < PathQuery > > ,
143
272
) -> Result < Json < grovedbg_types:: Proof > , AppError > {
144
- let Some ( db) = grovedb. upgrade ( ) else {
145
- shutdown. send ( ( ) ) . await . ok ( ) ;
146
- return Err ( AppError :: Closed ) ;
147
- } ;
273
+ let db = state. get_snapshot ( session_id) . await ?;
148
274
149
275
let path_query = path_query_to_grovedb ( json_path_query) ;
150
276
@@ -155,13 +281,13 @@ async fn prove_path_query(
155
281
}
156
282
157
283
async fn fetch_with_path_query (
158
- State ( ( shutdown, grovedb) ) : State < ( Sender < ( ) > , Weak < GroveDb > ) > ,
159
- Json ( json_path_query) : Json < PathQuery > ,
284
+ State ( state) : State < AppState > ,
285
+ Json ( WithSession {
286
+ session_id,
287
+ request : json_path_query,
288
+ } ) : Json < WithSession < PathQuery > > ,
160
289
) -> Result < Json < Vec < grovedbg_types:: NodeUpdate > > , AppError > {
161
- let Some ( db) = grovedb. upgrade ( ) else {
162
- shutdown. send ( ( ) ) . await . ok ( ) ;
163
- return Err ( AppError :: Closed ) ;
164
- } ;
290
+ let db = state. get_snapshot ( session_id) . await ?;
165
291
166
292
let path_query = path_query_to_grovedb ( json_path_query) ;
167
293
@@ -487,7 +613,6 @@ fn node_to_update(
487
613
feature_type,
488
614
} : NodeDbg ,
489
615
) -> Result < NodeUpdate , crate :: Error > {
490
- // todo: GroveVersion::latest() to actual version
491
616
let grovedb_element = crate :: Element :: deserialize ( & value, GroveVersion :: latest ( ) ) ?;
492
617
493
618
let element = element_to_grovedbg ( grovedb_element) ;
0 commit comments