@@ -3,6 +3,7 @@ use loro_common::{LoroValue, PeerID};
3
3
use serde:: { Deserialize , Serialize } ;
4
4
5
5
use crate :: change:: { get_sys_timestamp, Timestamp } ;
6
+ use crate :: { SubscriberSetWithQueue , Subscription } ;
6
7
7
8
/// `Awareness` is a structure that tracks the ephemeral state of peers.
8
9
///
@@ -159,190 +160,190 @@ impl Awareness {
159
160
}
160
161
}
161
162
162
- pub mod v2 {
163
- use fxhash:: FxHashMap ;
164
- use loro_common:: LoroValue ;
165
- use serde:: { Deserialize , Serialize } ;
163
+ pub type LocalAwarenessCallback = Box < dyn Fn ( & Vec < u8 > ) -> bool + Send + Sync + ' static > ;
166
164
167
- use crate :: {
168
- change:: { get_sys_timestamp, Timestamp } ,
169
- SubscriberSetWithQueue , Subscription ,
170
- } ;
171
-
172
- pub type LocalAwarenessCallback = Box < dyn Fn ( & Vec < u8 > ) -> bool + Send + Sync + ' static > ;
165
+ /// `EphemeralStore` is a structure that tracks the ephemeral state of peers.
166
+ ///
167
+ /// It can be used to synchronize cursor positions, selections, and the names of the peers.
168
+ /// We use the latest timestamp as the tie-breaker for LWW (Last-Write-Wins) conflict resolution.
169
+ pub struct EphemeralStore {
170
+ states : FxHashMap < String , State > ,
171
+ subs : SubscriberSetWithQueue < ( ) , LocalAwarenessCallback , Vec < u8 > > ,
172
+ timeout : i64 ,
173
+ }
173
174
174
- pub struct AwarenessV2 {
175
- states : FxHashMap < String , PeerState > ,
176
- subs : SubscriberSetWithQueue < ( ) , LocalAwarenessCallback , Vec < u8 > > ,
177
- timeout : i64 ,
175
+ impl std:: fmt:: Debug for EphemeralStore {
176
+ fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
177
+ write ! (
178
+ f,
179
+ "AwarenessV2 {{ states: {:?}, timeout: {:?} }}" ,
180
+ self . states, self . timeout
181
+ )
178
182
}
183
+ }
179
184
180
- impl std:: fmt:: Debug for AwarenessV2 {
181
- fn fmt ( & self , f : & mut std:: fmt:: Formatter < ' _ > ) -> std:: fmt:: Result {
182
- write ! (
183
- f,
184
- "AwarenessV2 {{ states: {:?}, timeout: {:?} }}" ,
185
- self . states, self . timeout
186
- )
187
- }
188
- }
185
+ #[ derive( Serialize , Deserialize ) ]
186
+ struct EncodedState < ' a > {
187
+ #[ serde( borrow) ]
188
+ key : & ' a str ,
189
+ value : Option < LoroValue > ,
190
+ timestamp : i64 ,
191
+ }
189
192
190
- #[ derive( Serialize , Deserialize ) ]
191
- struct EncodedPeerInfo < ' a > {
192
- #[ serde( borrow) ]
193
- key : & ' a str ,
194
- record : Option < LoroValue > ,
195
- timestamp : i64 ,
196
- }
193
+ #[ derive( Debug , Clone ) ]
194
+ struct State {
195
+ state : Option < LoroValue > ,
196
+ timestamp : i64 ,
197
+ }
197
198
198
- #[ derive( Debug , Clone ) ]
199
- pub struct PeerState {
200
- pub state : Option < LoroValue > ,
201
- pub timestamp : i64 ,
202
- }
199
+ #[ derive( Debug , Clone ) ]
200
+ pub struct AwarenessUpdates {
201
+ pub added : Vec < String > ,
202
+ pub changed : Vec < String > ,
203
+ pub removed : Vec < String > ,
204
+ }
203
205
204
- impl AwarenessV2 {
205
- pub fn new ( timeout : i64 ) -> AwarenessV2 {
206
- AwarenessV2 {
207
- timeout,
208
- states : FxHashMap :: default ( ) ,
209
- subs : SubscriberSetWithQueue :: new ( ) ,
210
- }
206
+ impl EphemeralStore {
207
+ pub fn new ( timeout : i64 ) -> EphemeralStore {
208
+ EphemeralStore {
209
+ timeout,
210
+ states : FxHashMap :: default ( ) ,
211
+ subs : SubscriberSetWithQueue :: new ( ) ,
211
212
}
213
+ }
212
214
213
- pub fn encode ( & self , key : & str ) -> Vec < u8 > {
214
- let mut peers_info = Vec :: new ( ) ;
215
- let now = get_sys_timestamp ( ) as Timestamp ;
216
- if let Some ( peer_state) = self . states . get ( key) {
217
- if now - peer_state. timestamp > self . timeout {
218
- return vec ! [ ] ;
219
- }
220
- let encoded_peer_info = EncodedPeerInfo {
221
- key,
222
- record : peer_state. state . clone ( ) ,
223
- timestamp : peer_state. timestamp ,
224
- } ;
225
- peers_info. push ( encoded_peer_info) ;
215
+ pub fn encode ( & self , key : & str ) -> Vec < u8 > {
216
+ let mut peers_info = Vec :: new ( ) ;
217
+ let now = get_sys_timestamp ( ) as Timestamp ;
218
+ if let Some ( peer_state) = self . states . get ( key) {
219
+ if now - peer_state. timestamp > self . timeout {
220
+ return vec ! [ ] ;
226
221
}
227
-
228
- postcard:: to_allocvec ( & peers_info) . unwrap ( )
222
+ let encoded_peer_info = EncodedState {
223
+ key,
224
+ value : peer_state. state . clone ( ) ,
225
+ timestamp : peer_state. timestamp ,
226
+ } ;
227
+ peers_info. push ( encoded_peer_info) ;
229
228
}
230
229
231
- pub fn encode_all ( & self ) -> Vec < u8 > {
232
- let mut peers_info = Vec :: new ( ) ;
233
- let now = get_sys_timestamp ( ) as Timestamp ;
234
- for ( key, peer_state) in self . states . iter ( ) {
235
- if now - peer_state. timestamp > self . timeout {
236
- continue ;
237
- }
238
- let encoded_peer_info = EncodedPeerInfo {
239
- key,
240
- record : peer_state. state . clone ( ) ,
241
- timestamp : peer_state. timestamp ,
242
- } ;
243
- peers_info. push ( encoded_peer_info) ;
230
+ postcard:: to_allocvec ( & peers_info) . unwrap ( )
231
+ }
232
+
233
+ pub fn encode_all ( & self ) -> Vec < u8 > {
234
+ let mut peers_info = Vec :: new ( ) ;
235
+ let now = get_sys_timestamp ( ) as Timestamp ;
236
+ for ( key, peer_state) in self . states . iter ( ) {
237
+ if now - peer_state. timestamp > self . timeout {
238
+ continue ;
244
239
}
245
- postcard:: to_allocvec ( & peers_info) . unwrap ( )
240
+ let encoded_peer_info = EncodedState {
241
+ key,
242
+ value : peer_state. state . clone ( ) ,
243
+ timestamp : peer_state. timestamp ,
244
+ } ;
245
+ peers_info. push ( encoded_peer_info) ;
246
246
}
247
+ postcard:: to_allocvec ( & peers_info) . unwrap ( )
248
+ }
247
249
248
- /// Returns (updated, added, removed)
249
- pub fn apply (
250
- & mut self ,
251
- encoded_peers_info : & [ u8 ] ,
252
- ) -> ( Vec < String > , Vec < String > , Vec < String > ) {
253
- let peers_info: Vec < EncodedPeerInfo > =
254
- postcard:: from_bytes ( encoded_peers_info) . unwrap ( ) ;
255
- let mut changed_keys = Vec :: new ( ) ;
256
- let mut added_keys = Vec :: new ( ) ;
257
- let mut removed_keys = Vec :: new ( ) ;
258
- let now = get_sys_timestamp ( ) as Timestamp ;
259
- for EncodedPeerInfo {
260
- key,
261
- record,
262
- timestamp,
263
- } in peers_info
264
- {
265
- match self . states . get_mut ( key) {
266
- Some ( peer_info) if peer_info. timestamp >= timestamp => {
267
- // do nothing
268
- }
269
- _ => {
270
- let old = self . states . insert (
271
- key. to_string ( ) ,
272
- PeerState {
273
- state : record. clone ( ) ,
274
- timestamp : now,
275
- } ,
276
- ) ;
277
- match ( old, record) {
278
- ( Some ( _) , Some ( _) ) => changed_keys. push ( key. to_string ( ) ) ,
279
- ( None , Some ( _) ) => added_keys. push ( key. to_string ( ) ) ,
280
- ( Some ( _) , None ) => removed_keys. push ( key. to_string ( ) ) ,
281
- ( None , None ) => { }
282
- }
250
+ /// Returns (updated, added, removed)
251
+ pub fn apply ( & mut self , encoded_peers_info : & [ u8 ] ) -> AwarenessUpdates {
252
+ let peers_info: Vec < EncodedState > = postcard:: from_bytes ( encoded_peers_info) . unwrap ( ) ;
253
+ let mut changed_keys = Vec :: new ( ) ;
254
+ let mut added_keys = Vec :: new ( ) ;
255
+ let mut removed_keys = Vec :: new ( ) ;
256
+ let now = get_sys_timestamp ( ) as Timestamp ;
257
+ for EncodedState {
258
+ key,
259
+ value : record,
260
+ timestamp,
261
+ } in peers_info
262
+ {
263
+ match self . states . get_mut ( key) {
264
+ Some ( peer_info) if peer_info. timestamp >= timestamp => {
265
+ // do nothing
266
+ }
267
+ _ => {
268
+ let old = self . states . insert (
269
+ key. to_string ( ) ,
270
+ State {
271
+ state : record. clone ( ) ,
272
+ timestamp : now,
273
+ } ,
274
+ ) ;
275
+ match ( old, record) {
276
+ ( Some ( _) , Some ( _) ) => changed_keys. push ( key. to_string ( ) ) ,
277
+ ( None , Some ( _) ) => added_keys. push ( key. to_string ( ) ) ,
278
+ ( Some ( _) , None ) => removed_keys. push ( key. to_string ( ) ) ,
279
+ ( None , None ) => { }
283
280
}
284
281
}
285
282
}
286
-
287
- ( changed_keys, added_keys, removed_keys)
288
283
}
289
284
290
- pub fn set ( & mut self , key : & str , value : impl Into < LoroValue > ) {
291
- self . _set_local_state ( key, Some ( value. into ( ) ) ) ;
285
+ AwarenessUpdates {
286
+ added : added_keys,
287
+ changed : changed_keys,
288
+ removed : removed_keys,
292
289
}
290
+ }
293
291
294
- pub fn delete ( & mut self , key : & str ) {
295
- self . _set_local_state ( key, None ) ;
296
- }
292
+ pub fn set ( & mut self , key : & str , value : impl Into < LoroValue > ) {
293
+ self . _set_local_state ( key, Some ( value . into ( ) ) ) ;
294
+ }
297
295
298
- fn _set_local_state ( & mut self , key : & str , value : Option < LoroValue > ) {
299
- self . states . insert (
300
- key. to_string ( ) ,
301
- PeerState {
302
- state : value,
303
- timestamp : get_sys_timestamp ( ) as Timestamp ,
304
- } ,
305
- ) ;
306
- if self . subs . inner ( ) . is_empty ( ) {
307
- return ;
308
- }
309
- self . subs . emit ( & ( ) , self . encode ( key) ) ;
310
- }
296
+ pub fn delete ( & mut self , key : & str ) {
297
+ self . _set_local_state ( key, None ) ;
298
+ }
311
299
312
- pub fn get ( & self , key : & str ) -> Option < LoroValue > {
313
- self . states . get ( key) . and_then ( |x| x. state . clone ( ) )
314
- }
300
+ pub fn get ( & self , key : & str ) -> Option < LoroValue > {
301
+ self . states . get ( key) . and_then ( |x| x. state . clone ( ) )
302
+ }
315
303
316
- pub fn remove_outdated ( & mut self ) -> Vec < String > {
317
- let now = get_sys_timestamp ( ) as Timestamp ;
318
- let mut removed = Vec :: new ( ) ;
304
+ pub fn remove_outdated ( & mut self ) -> Vec < String > {
305
+ let now = get_sys_timestamp ( ) as Timestamp ;
306
+ let mut removed = Vec :: new ( ) ;
319
307
320
- self . states . retain ( |key, state| {
321
- if now - state. timestamp > self . timeout {
322
- if state. state . is_some ( ) {
323
- removed. push ( key. clone ( ) ) ;
324
- }
325
- false
326
- } else {
327
- true
308
+ self . states . retain ( |key, state| {
309
+ if now - state. timestamp > self . timeout {
310
+ if state. state . is_some ( ) {
311
+ removed. push ( key. clone ( ) ) ;
328
312
}
329
- } ) ;
313
+ false
314
+ } else {
315
+ true
316
+ }
317
+ } ) ;
330
318
331
- removed
332
- }
319
+ removed
320
+ }
333
321
334
- pub fn get_all_states ( & self ) -> FxHashMap < String , LoroValue > {
335
- self . states
336
- . iter ( )
337
- . filter ( |( _, v) | v. state . is_some ( ) )
338
- . map ( |( k, v) | ( k. clone ( ) , v. state . clone ( ) . unwrap ( ) ) )
339
- . collect ( )
340
- }
322
+ pub fn get_all_states ( & self ) -> FxHashMap < String , LoroValue > {
323
+ self . states
324
+ . iter ( )
325
+ . filter ( |( _, v) | v. state . is_some ( ) )
326
+ . map ( |( k, v) | ( k. clone ( ) , v. state . clone ( ) . unwrap ( ) ) )
327
+ . collect ( )
328
+ }
329
+
330
+ pub fn subscribe_local_update ( & self , callback : LocalAwarenessCallback ) -> Subscription {
331
+ let ( sub, activate) = self . subs . inner ( ) . insert ( ( ) , callback) ;
332
+ activate ( ) ;
333
+ sub
334
+ }
341
335
342
- pub fn subscribe_local_update ( & self , callback : LocalAwarenessCallback ) -> Subscription {
343
- let ( sub, activate) = self . subs . inner ( ) . insert ( ( ) , callback) ;
344
- activate ( ) ;
345
- sub
336
+ fn _set_local_state ( & mut self , key : & str , value : Option < LoroValue > ) {
337
+ self . states . insert (
338
+ key. to_string ( ) ,
339
+ State {
340
+ state : value,
341
+ timestamp : get_sys_timestamp ( ) as Timestamp ,
342
+ } ,
343
+ ) ;
344
+ if self . subs . inner ( ) . is_empty ( ) {
345
+ return ;
346
346
}
347
+ self . subs . emit ( & ( ) , self . encode ( key) ) ;
347
348
}
348
349
}
0 commit comments