Skip to content

Commit 16f099f

Browse files
committed
fix: key value awareness
1 parent a4efbaa commit 16f099f

File tree

1 file changed

+106
-115
lines changed

1 file changed

+106
-115
lines changed

crates/loro-internal/src/awareness.rs

+106-115
Original file line numberDiff line numberDiff line change
@@ -160,63 +160,69 @@ impl Awareness {
160160
}
161161

162162
pub mod v2 {
163-
use fxhash::{FxHashMap, FxHashSet};
164-
use loro_common::{LoroValue, PeerID};
163+
use fxhash::FxHashMap;
164+
use loro_common::LoroValue;
165165
use serde::{Deserialize, Serialize};
166166

167-
use crate::change::{get_sys_timestamp, Timestamp};
167+
use crate::{
168+
change::{get_sys_timestamp, Timestamp},
169+
SubscriberSetWithQueue, Subscription,
170+
};
171+
172+
pub type LocalAwarenessCallback = Box<dyn Fn(&Vec<u8>) -> bool + Send + Sync + 'static>;
168173

169-
#[derive(Debug, Clone)]
170174
pub struct AwarenessV2 {
171-
peer: PeerID,
172-
peers: FxHashMap<PeerID, FxHashMap<String, PeerInfo>>,
175+
states: FxHashMap<String, PeerState>,
176+
subs: SubscriberSetWithQueue<(), LocalAwarenessCallback, Vec<u8>>,
173177
timeout: i64,
174178
}
175179

180+
impl std::fmt::Debug for AwarenessV2 {
181+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
182+
write!(
183+
f,
184+
"AwarenessV2 {{ states: {:?}, timeout: {:?} }}",
185+
self.states, self.timeout
186+
)
187+
}
188+
}
189+
176190
#[derive(Serialize, Deserialize)]
177191
struct EncodedPeerInfo<'a> {
178-
peer: PeerID,
179192
#[serde(borrow)]
180-
field: &'a str,
181-
record: LoroValue,
193+
key: &'a str,
194+
record: Option<LoroValue>,
182195
timestamp: i64,
183196
}
184197

185198
#[derive(Debug, Clone)]
186-
pub struct PeerInfo {
187-
pub state: LoroValue,
188-
// This field is generated locally
199+
pub struct PeerState {
200+
pub state: Option<LoroValue>,
189201
pub timestamp: i64,
190202
}
191203

192204
impl AwarenessV2 {
193-
pub fn new(peer: PeerID, timeout: i64) -> AwarenessV2 {
205+
pub fn new(timeout: i64) -> AwarenessV2 {
194206
AwarenessV2 {
195-
peer,
196207
timeout,
197-
peers: FxHashMap::default(),
208+
states: FxHashMap::default(),
209+
subs: SubscriberSetWithQueue::new(),
198210
}
199211
}
200212

201-
pub fn encode(&self, peers: &[PeerID], field: &str) -> Vec<u8> {
213+
pub fn encode(&self, key: &str) -> Vec<u8> {
202214
let mut peers_info = Vec::new();
203215
let now = get_sys_timestamp() as Timestamp;
204-
for peer in peers {
205-
if let Some(peer_state) = self.peers.get(peer) {
206-
let Some(peer_info) = peer_state.get(field) else {
207-
continue;
208-
};
209-
if now - peer_info.timestamp > self.timeout {
210-
continue;
211-
}
212-
let encoded_peer_info = EncodedPeerInfo {
213-
peer: *peer,
214-
field,
215-
record: peer_info.state.clone(),
216-
timestamp: peer_info.timestamp,
217-
};
218-
peers_info.push(encoded_peer_info);
216+
if let Some(peer_state) = self.states.get(key) {
217+
if now - peer_state.timestamp > self.timeout {
218+
return vec![];
219219
}
220+
let encoded_peer_info = EncodedPeerInfo {
221+
key,
222+
record: peer_state.state.clone(),
223+
timestamp: peer_state.timestamp,
224+
};
225+
peers_info.push(encoded_peer_info);
220226
}
221227

222228
postcard::to_allocvec(&peers_info).unwrap()
@@ -225,133 +231,118 @@ pub mod v2 {
225231
pub fn encode_all(&self) -> Vec<u8> {
226232
let mut peers_info = Vec::new();
227233
let now = get_sys_timestamp() as Timestamp;
228-
for peer in self.peers.keys() {
229-
if let Some(peer_state) = self.peers.get(peer) {
230-
for (field, peer_info) in peer_state.iter() {
231-
if now - peer_info.timestamp > self.timeout {
232-
continue;
233-
}
234-
let encoded_peer_info = EncodedPeerInfo {
235-
peer: *peer,
236-
field,
237-
record: peer_info.state.clone(),
238-
timestamp: peer_info.timestamp,
239-
};
240-
peers_info.push(encoded_peer_info);
241-
}
234+
for (key, peer_state) in self.states.iter() {
235+
if now - peer_state.timestamp > self.timeout {
236+
continue;
242237
}
238+
let encoded_peer_info = EncodedPeerInfo {
239+
key,
240+
record: peer_state.state.clone(),
241+
timestamp: peer_state.timestamp,
242+
};
243+
peers_info.push(encoded_peer_info);
243244
}
244245
postcard::to_allocvec(&peers_info).unwrap()
245246
}
246247

247-
pub fn encode_all_peers(&self, field: &str) -> Vec<u8> {
248-
self.encode(&self.peers.keys().copied().collect::<Vec<_>>(), field)
249-
}
250-
251-
/// Returns (updated, added)
248+
/// Returns (updated, added, removed)
252249
pub fn apply(
253250
&mut self,
254251
encoded_peers_info: &[u8],
255-
) -> (FxHashSet<PeerID>, FxHashSet<PeerID>) {
252+
) -> (Vec<String>, Vec<String>, Vec<String>) {
256253
let peers_info: Vec<EncodedPeerInfo> =
257254
postcard::from_bytes(encoded_peers_info).unwrap();
258-
let mut changed_peers = FxHashSet::default();
259-
let mut added_peers = FxHashSet::default();
255+
let mut changed_keys = Vec::new();
256+
let mut added_keys = Vec::new();
257+
let mut removed_keys = Vec::new();
260258
let now = get_sys_timestamp() as Timestamp;
261259
for EncodedPeerInfo {
262-
peer,
263-
field,
260+
key,
264261
record,
265262
timestamp,
266263
} in peers_info
267264
{
268-
let peer_state = self.peers.entry(peer).or_insert_with(|| {
269-
added_peers.insert(peer);
270-
FxHashMap::default()
271-
});
272-
match peer_state.get_mut(field) {
273-
Some(peer_info) if peer_info.timestamp >= timestamp || peer == self.peer => {
265+
match self.states.get_mut(key) {
266+
Some(peer_info) if peer_info.timestamp >= timestamp => {
274267
// do nothing
275268
}
276269
_ => {
277-
if timestamp < 0 {
278-
peer_state.remove(field);
279-
} else {
280-
peer_state.insert(
281-
field.to_string(),
282-
PeerInfo {
283-
state: record,
284-
timestamp: now,
285-
},
286-
);
287-
}
288-
if !added_peers.contains(&peer) {
289-
changed_peers.insert(peer);
270+
let old = self.states.insert(
271+
key.to_string(),
272+
PeerState {
273+
state: record.clone(),
274+
timestamp: now,
275+
},
276+
);
277+
match (old, record) {
278+
(Some(_), Some(_)) => changed_keys.push(key.to_string()),
279+
(None, Some(_)) => added_keys.push(key.to_string()),
280+
(Some(_), None) => removed_keys.push(key.to_string()),
281+
(None, None) => {}
290282
}
291283
}
292284
}
293285
}
294286

295-
(changed_peers, added_peers)
287+
(changed_keys, added_keys, removed_keys)
296288
}
297289

298-
pub fn set_local_state(&mut self, field: &str, value: impl Into<LoroValue>) {
299-
self._set_local_state(field, value.into(), false);
290+
pub fn set(&mut self, key: &str, value: impl Into<LoroValue>) {
291+
self._set_local_state(key, Some(value.into()));
300292
}
301293

302-
pub fn delete_local_state(&mut self, field: &str) {
303-
self._set_local_state(field, LoroValue::Null, true);
294+
pub fn delete(&mut self, key: &str) {
295+
self._set_local_state(key, None);
304296
}
305297

306-
fn _set_local_state(&mut self, field: &str, value: LoroValue, delete: bool) {
307-
let peer = self.peers.entry(self.peer).or_default();
308-
let peer = peer.entry(field.to_string()).or_insert_with(|| PeerInfo {
309-
state: Default::default(),
310-
timestamp: 0,
311-
});
312-
313-
peer.state = value;
314-
peer.timestamp = if delete {
315-
-1
316-
} else {
317-
get_sys_timestamp() as Timestamp
318-
};
298+
fn _set_local_state(&mut self, key: &str, value: Option<LoroValue>) {
299+
self.states.insert(
300+
key.to_string(),
301+
PeerState {
302+
state: value,
303+
timestamp: get_sys_timestamp() as Timestamp,
304+
},
305+
);
306+
if self.subs.inner().is_empty() {
307+
return;
308+
}
309+
self.subs.emit(&(), self.encode(key));
319310
}
320311

321-
pub fn get_local_state(&self, field: &str) -> Option<LoroValue> {
322-
self.peers
323-
.get(&self.peer)
324-
.and_then(|x| x.get(field))
325-
.map(|x| x.state.clone())
312+
pub fn get(&self, key: &str) -> Option<LoroValue> {
313+
self.states.get(key).and_then(|x| x.state.clone())
326314
}
327315

328-
pub fn remove_outdated(&mut self, field: &str) -> FxHashSet<PeerID> {
316+
pub fn remove_outdated(&mut self) -> Vec<String> {
329317
let now = get_sys_timestamp() as Timestamp;
330-
let mut removed = FxHashSet::default();
331-
for (id, v) in self.peers.iter_mut() {
332-
if let Some(timestamp) = v.get(field).map(|x| x.timestamp) {
333-
if now - timestamp > self.timeout {
334-
removed.insert(*id);
335-
v.remove(field);
318+
let mut removed = Vec::new();
319+
320+
self.states.retain(|key, state| {
321+
if now - state.timestamp > self.timeout {
322+
if state.state.is_some() {
323+
removed.push(key.clone());
336324
}
325+
false
326+
} else {
327+
true
337328
}
338-
}
329+
});
339330

340331
removed
341332
}
342333

343-
pub fn get_all_states(&self, field: &str) -> FxHashMap<PeerID, LoroValue> {
344-
let mut ans = FxHashMap::default();
345-
for (id, v) in self.peers.iter() {
346-
if let Some(peer_info) = v.get(field) {
347-
ans.insert(*id, peer_info.state.clone());
348-
}
349-
}
350-
ans
334+
pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
335+
self.states
336+
.iter()
337+
.filter(|(_, v)| v.state.is_some())
338+
.map(|(k, v)| (k.clone(), v.state.clone().unwrap()))
339+
.collect()
351340
}
352341

353-
pub fn peer(&self) -> PeerID {
354-
self.peer
342+
pub fn subscribe_local_update(&self, callback: LocalAwarenessCallback) -> Subscription {
343+
let (sub, activate) = self.subs.inner().insert((), callback);
344+
activate();
345+
sub
355346
}
356347
}
357348
}

0 commit comments

Comments
 (0)