From 373c9c0d60848654dfe2f2aed4b6a727c6689a0a Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Wed, 5 Mar 2025 16:01:44 +0800 Subject: [PATCH 01/16] feat: awareness v2 --- crates/loro-internal/src/awareness.rs | 199 ++++++++++++++++++++++++++ 1 file changed, 199 insertions(+) diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index 84a93496a..27422c19b 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -11,6 +11,7 @@ use crate::change::{get_sys_timestamp, Timestamp}; /// The state of a specific peer is expected to be removed after a specified timeout. Use /// `remove_outdated` to eliminate outdated states. #[derive(Debug, Clone)] +#[deprecated(since = "1.4.6", note = "Use `AwarenessV2` instead.")] pub struct Awareness { peer: PeerID, peers: FxHashMap, @@ -32,6 +33,7 @@ struct EncodedPeerInfo { record: LoroValue, } +#[allow(deprecated)] impl Awareness { pub fn new(peer: PeerID, timeout: i64) -> Awareness { Awareness { @@ -156,3 +158,200 @@ impl Awareness { self.peer } } + +pub mod v2 { + use fxhash::{FxHashMap, FxHashSet}; + use loro_common::{LoroValue, PeerID}; + use serde::{Deserialize, Serialize}; + + use crate::change::{get_sys_timestamp, Timestamp}; + + #[derive(Debug, Clone)] + pub struct AwarenessV2 { + peer: PeerID, + peers: FxHashMap>, + timeout: i64, + } + + #[derive(Serialize, Deserialize)] + struct EncodedPeerInfo<'a> { + peer: PeerID, + #[serde(borrow)] + field: &'a str, + record: LoroValue, + timestamp: i64, + } + + #[derive(Debug, Clone)] + pub struct PeerInfo { + pub state: LoroValue, + // This field is generated locally + pub timestamp: i64, + } + + impl AwarenessV2 { + pub fn new(peer: PeerID, timeout: i64) -> AwarenessV2 { + AwarenessV2 { + peer, + timeout, + peers: FxHashMap::default(), + } + } + + pub fn encode(&self, peers: &[PeerID], field: &str) -> Vec { + let mut peers_info = Vec::new(); + let now = get_sys_timestamp() as Timestamp; + for peer in peers { + if let Some(peer_state) = self.peers.get(peer) { + let Some(peer_info) = peer_state.get(field) else { + continue; + }; + if now - peer_info.timestamp > self.timeout { + continue; + } + let encoded_peer_info = EncodedPeerInfo { + peer: *peer, + field, + record: peer_info.state.clone(), + timestamp: peer_info.timestamp, + }; + peers_info.push(encoded_peer_info); + } + } + + postcard::to_allocvec(&peers_info).unwrap() + } + + pub fn encode_all(&self) -> Vec { + let mut peers_info = Vec::new(); + let now = get_sys_timestamp() as Timestamp; + for peer in self.peers.keys() { + if let Some(peer_state) = self.peers.get(peer) { + for (field, peer_info) in peer_state.iter() { + if now - peer_info.timestamp > self.timeout { + continue; + } + let encoded_peer_info = EncodedPeerInfo { + peer: *peer, + field, + record: peer_info.state.clone(), + timestamp: peer_info.timestamp, + }; + peers_info.push(encoded_peer_info); + } + } + } + postcard::to_allocvec(&peers_info).unwrap() + } + + pub fn encode_all_peers(&self, field: &str) -> Vec { + self.encode(&self.peers.keys().copied().collect::>(), field) + } + + /// Returns (updated, added) + pub fn apply( + &mut self, + encoded_peers_info: &[u8], + ) -> (FxHashSet, FxHashSet) { + let peers_info: Vec = + postcard::from_bytes(encoded_peers_info).unwrap(); + let mut changed_peers = FxHashSet::default(); + let mut added_peers = FxHashSet::default(); + let now = get_sys_timestamp() as Timestamp; + for EncodedPeerInfo { + peer, + field, + record, + timestamp, + } in peers_info + { + let peer_state = self.peers.entry(peer).or_insert_with(|| { + added_peers.insert(peer); + FxHashMap::default() + }); + match peer_state.get_mut(field) { + Some(peer_info) if peer_info.timestamp >= timestamp || peer == self.peer => { + // do nothing + } + _ => { + if timestamp < 0 { + peer_state.remove(field); + } else { + peer_state.insert( + field.to_string(), + PeerInfo { + state: record, + timestamp: now, + }, + ); + } + if !added_peers.contains(&peer) { + changed_peers.insert(peer); + } + } + } + } + + (changed_peers, added_peers) + } + + pub fn set_local_state(&mut self, field: &str, value: impl Into) { + self._set_local_state(field, value.into(), false); + } + + pub fn delete_local_state(&mut self, field: &str) { + self._set_local_state(field, LoroValue::Null, true); + } + + fn _set_local_state(&mut self, field: &str, value: LoroValue, delete: bool) { + let peer = self.peers.entry(self.peer).or_default(); + let peer = peer.entry(field.to_string()).or_insert_with(|| PeerInfo { + state: Default::default(), + timestamp: 0, + }); + + peer.state = value; + peer.timestamp = if delete { + -(get_sys_timestamp() as Timestamp) + } else { + get_sys_timestamp() as Timestamp + }; + } + + pub fn get_local_state(&self, field: &str) -> Option { + self.peers + .get(&self.peer) + .and_then(|x| x.get(field)) + .map(|x| x.state.clone()) + } + + pub fn remove_outdated(&mut self, field: &str) -> FxHashSet { + let now = get_sys_timestamp() as Timestamp; + let mut removed = FxHashSet::default(); + for (id, v) in self.peers.iter_mut() { + if let Some(timestamp) = v.get(field).map(|x| x.timestamp) { + if now - timestamp > self.timeout { + removed.insert(*id); + v.remove(field); + } + } + } + + removed + } + + pub fn get_all_states(&self, field: &str) -> FxHashMap { + let mut ans = FxHashMap::default(); + for (id, v) in self.peers.iter() { + if let Some(peer_info) = v.get(field) { + ans.insert(*id, peer_info.state.clone()); + } + } + ans + } + + pub fn peer(&self) -> PeerID { + self.peer + } + } +} From a4efbaa3f4192186ab2bd12c0a298860d187481c Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Wed, 5 Mar 2025 16:12:23 +0800 Subject: [PATCH 02/16] chore: --- crates/loro-internal/src/awareness.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index 27422c19b..7f48d44c4 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -312,7 +312,7 @@ pub mod v2 { peer.state = value; peer.timestamp = if delete { - -(get_sys_timestamp() as Timestamp) + -1 } else { get_sys_timestamp() as Timestamp }; From 16f099fbdbee088016bfa48895122c3accf3cb60 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Wed, 5 Mar 2025 20:49:21 +0800 Subject: [PATCH 03/16] fix: key value awareness --- crates/loro-internal/src/awareness.rs | 221 ++++++++++++-------------- 1 file changed, 106 insertions(+), 115 deletions(-) diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index 7f48d44c4..722be7289 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -160,63 +160,69 @@ impl Awareness { } pub mod v2 { - use fxhash::{FxHashMap, FxHashSet}; - use loro_common::{LoroValue, PeerID}; + use fxhash::FxHashMap; + use loro_common::LoroValue; use serde::{Deserialize, Serialize}; - use crate::change::{get_sys_timestamp, Timestamp}; + use crate::{ + change::{get_sys_timestamp, Timestamp}, + SubscriberSetWithQueue, Subscription, + }; + + pub type LocalAwarenessCallback = Box) -> bool + Send + Sync + 'static>; - #[derive(Debug, Clone)] pub struct AwarenessV2 { - peer: PeerID, - peers: FxHashMap>, + states: FxHashMap, + subs: SubscriberSetWithQueue<(), LocalAwarenessCallback, Vec>, timeout: i64, } + impl std::fmt::Debug for AwarenessV2 { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "AwarenessV2 {{ states: {:?}, timeout: {:?} }}", + self.states, self.timeout + ) + } + } + #[derive(Serialize, Deserialize)] struct EncodedPeerInfo<'a> { - peer: PeerID, #[serde(borrow)] - field: &'a str, - record: LoroValue, + key: &'a str, + record: Option, timestamp: i64, } #[derive(Debug, Clone)] - pub struct PeerInfo { - pub state: LoroValue, - // This field is generated locally + pub struct PeerState { + pub state: Option, pub timestamp: i64, } impl AwarenessV2 { - pub fn new(peer: PeerID, timeout: i64) -> AwarenessV2 { + pub fn new(timeout: i64) -> AwarenessV2 { AwarenessV2 { - peer, timeout, - peers: FxHashMap::default(), + states: FxHashMap::default(), + subs: SubscriberSetWithQueue::new(), } } - pub fn encode(&self, peers: &[PeerID], field: &str) -> Vec { + pub fn encode(&self, key: &str) -> Vec { let mut peers_info = Vec::new(); let now = get_sys_timestamp() as Timestamp; - for peer in peers { - if let Some(peer_state) = self.peers.get(peer) { - let Some(peer_info) = peer_state.get(field) else { - continue; - }; - if now - peer_info.timestamp > self.timeout { - continue; - } - let encoded_peer_info = EncodedPeerInfo { - peer: *peer, - field, - record: peer_info.state.clone(), - timestamp: peer_info.timestamp, - }; - peers_info.push(encoded_peer_info); + if let Some(peer_state) = self.states.get(key) { + if now - peer_state.timestamp > self.timeout { + return vec![]; } + let encoded_peer_info = EncodedPeerInfo { + key, + record: peer_state.state.clone(), + timestamp: peer_state.timestamp, + }; + peers_info.push(encoded_peer_info); } postcard::to_allocvec(&peers_info).unwrap() @@ -225,133 +231,118 @@ pub mod v2 { pub fn encode_all(&self) -> Vec { let mut peers_info = Vec::new(); let now = get_sys_timestamp() as Timestamp; - for peer in self.peers.keys() { - if let Some(peer_state) = self.peers.get(peer) { - for (field, peer_info) in peer_state.iter() { - if now - peer_info.timestamp > self.timeout { - continue; - } - let encoded_peer_info = EncodedPeerInfo { - peer: *peer, - field, - record: peer_info.state.clone(), - timestamp: peer_info.timestamp, - }; - peers_info.push(encoded_peer_info); - } + for (key, peer_state) in self.states.iter() { + if now - peer_state.timestamp > self.timeout { + continue; } + let encoded_peer_info = EncodedPeerInfo { + key, + record: peer_state.state.clone(), + timestamp: peer_state.timestamp, + }; + peers_info.push(encoded_peer_info); } postcard::to_allocvec(&peers_info).unwrap() } - pub fn encode_all_peers(&self, field: &str) -> Vec { - self.encode(&self.peers.keys().copied().collect::>(), field) - } - - /// Returns (updated, added) + /// Returns (updated, added, removed) pub fn apply( &mut self, encoded_peers_info: &[u8], - ) -> (FxHashSet, FxHashSet) { + ) -> (Vec, Vec, Vec) { let peers_info: Vec = postcard::from_bytes(encoded_peers_info).unwrap(); - let mut changed_peers = FxHashSet::default(); - let mut added_peers = FxHashSet::default(); + let mut changed_keys = Vec::new(); + let mut added_keys = Vec::new(); + let mut removed_keys = Vec::new(); let now = get_sys_timestamp() as Timestamp; for EncodedPeerInfo { - peer, - field, + key, record, timestamp, } in peers_info { - let peer_state = self.peers.entry(peer).or_insert_with(|| { - added_peers.insert(peer); - FxHashMap::default() - }); - match peer_state.get_mut(field) { - Some(peer_info) if peer_info.timestamp >= timestamp || peer == self.peer => { + match self.states.get_mut(key) { + Some(peer_info) if peer_info.timestamp >= timestamp => { // do nothing } _ => { - if timestamp < 0 { - peer_state.remove(field); - } else { - peer_state.insert( - field.to_string(), - PeerInfo { - state: record, - timestamp: now, - }, - ); - } - if !added_peers.contains(&peer) { - changed_peers.insert(peer); + let old = self.states.insert( + key.to_string(), + PeerState { + state: record.clone(), + timestamp: now, + }, + ); + match (old, record) { + (Some(_), Some(_)) => changed_keys.push(key.to_string()), + (None, Some(_)) => added_keys.push(key.to_string()), + (Some(_), None) => removed_keys.push(key.to_string()), + (None, None) => {} } } } } - (changed_peers, added_peers) + (changed_keys, added_keys, removed_keys) } - pub fn set_local_state(&mut self, field: &str, value: impl Into) { - self._set_local_state(field, value.into(), false); + pub fn set(&mut self, key: &str, value: impl Into) { + self._set_local_state(key, Some(value.into())); } - pub fn delete_local_state(&mut self, field: &str) { - self._set_local_state(field, LoroValue::Null, true); + pub fn delete(&mut self, key: &str) { + self._set_local_state(key, None); } - fn _set_local_state(&mut self, field: &str, value: LoroValue, delete: bool) { - let peer = self.peers.entry(self.peer).or_default(); - let peer = peer.entry(field.to_string()).or_insert_with(|| PeerInfo { - state: Default::default(), - timestamp: 0, - }); - - peer.state = value; - peer.timestamp = if delete { - -1 - } else { - get_sys_timestamp() as Timestamp - }; + fn _set_local_state(&mut self, key: &str, value: Option) { + self.states.insert( + key.to_string(), + PeerState { + state: value, + timestamp: get_sys_timestamp() as Timestamp, + }, + ); + if self.subs.inner().is_empty() { + return; + } + self.subs.emit(&(), self.encode(key)); } - pub fn get_local_state(&self, field: &str) -> Option { - self.peers - .get(&self.peer) - .and_then(|x| x.get(field)) - .map(|x| x.state.clone()) + pub fn get(&self, key: &str) -> Option { + self.states.get(key).and_then(|x| x.state.clone()) } - pub fn remove_outdated(&mut self, field: &str) -> FxHashSet { + pub fn remove_outdated(&mut self) -> Vec { let now = get_sys_timestamp() as Timestamp; - let mut removed = FxHashSet::default(); - for (id, v) in self.peers.iter_mut() { - if let Some(timestamp) = v.get(field).map(|x| x.timestamp) { - if now - timestamp > self.timeout { - removed.insert(*id); - v.remove(field); + let mut removed = Vec::new(); + + self.states.retain(|key, state| { + if now - state.timestamp > self.timeout { + if state.state.is_some() { + removed.push(key.clone()); } + false + } else { + true } - } + }); removed } - pub fn get_all_states(&self, field: &str) -> FxHashMap { - let mut ans = FxHashMap::default(); - for (id, v) in self.peers.iter() { - if let Some(peer_info) = v.get(field) { - ans.insert(*id, peer_info.state.clone()); - } - } - ans + pub fn get_all_states(&self) -> FxHashMap { + self.states + .iter() + .filter(|(_, v)| v.state.is_some()) + .map(|(k, v)| (k.clone(), v.state.clone().unwrap())) + .collect() } - pub fn peer(&self) -> PeerID { - self.peer + pub fn subscribe_local_update(&self, callback: LocalAwarenessCallback) -> Subscription { + let (sub, activate) = self.subs.inner().insert((), callback); + activate(); + sub } } } From fe13193f9e7714cb776fcc889af041d3a0af9704 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Thu, 6 Mar 2025 11:16:54 +0800 Subject: [PATCH 04/16] fix: rename awareness --- crates/loro-internal/src/awareness.rs | 309 +++++++++++++------------- 1 file changed, 155 insertions(+), 154 deletions(-) diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index 722be7289..a5e451b24 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -3,6 +3,7 @@ use loro_common::{LoroValue, PeerID}; use serde::{Deserialize, Serialize}; use crate::change::{get_sys_timestamp, Timestamp}; +use crate::{SubscriberSetWithQueue, Subscription}; /// `Awareness` is a structure that tracks the ephemeral state of peers. /// @@ -159,190 +160,190 @@ impl Awareness { } } -pub mod v2 { - use fxhash::FxHashMap; - use loro_common::LoroValue; - use serde::{Deserialize, Serialize}; +pub type LocalAwarenessCallback = Box) -> bool + Send + Sync + 'static>; - use crate::{ - change::{get_sys_timestamp, Timestamp}, - SubscriberSetWithQueue, Subscription, - }; - - pub type LocalAwarenessCallback = Box) -> bool + Send + Sync + 'static>; +/// `EphemeralStore` is a structure that tracks the ephemeral state of peers. +/// +/// It can be used to synchronize cursor positions, selections, and the names of the peers. +/// We use the latest timestamp as the tie-breaker for LWW (Last-Write-Wins) conflict resolution. +pub struct EphemeralStore { + states: FxHashMap, + subs: SubscriberSetWithQueue<(), LocalAwarenessCallback, Vec>, + timeout: i64, +} - pub struct AwarenessV2 { - states: FxHashMap, - subs: SubscriberSetWithQueue<(), LocalAwarenessCallback, Vec>, - timeout: i64, +impl std::fmt::Debug for EphemeralStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "AwarenessV2 {{ states: {:?}, timeout: {:?} }}", + self.states, self.timeout + ) } +} - impl std::fmt::Debug for AwarenessV2 { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "AwarenessV2 {{ states: {:?}, timeout: {:?} }}", - self.states, self.timeout - ) - } - } +#[derive(Serialize, Deserialize)] +struct EncodedState<'a> { + #[serde(borrow)] + key: &'a str, + value: Option, + timestamp: i64, +} - #[derive(Serialize, Deserialize)] - struct EncodedPeerInfo<'a> { - #[serde(borrow)] - key: &'a str, - record: Option, - timestamp: i64, - } +#[derive(Debug, Clone)] +struct State { + state: Option, + timestamp: i64, +} - #[derive(Debug, Clone)] - pub struct PeerState { - pub state: Option, - pub timestamp: i64, - } +#[derive(Debug, Clone)] +pub struct AwarenessUpdates { + pub added: Vec, + pub changed: Vec, + pub removed: Vec, +} - impl AwarenessV2 { - pub fn new(timeout: i64) -> AwarenessV2 { - AwarenessV2 { - timeout, - states: FxHashMap::default(), - subs: SubscriberSetWithQueue::new(), - } +impl EphemeralStore { + pub fn new(timeout: i64) -> EphemeralStore { + EphemeralStore { + timeout, + states: FxHashMap::default(), + subs: SubscriberSetWithQueue::new(), } + } - pub fn encode(&self, key: &str) -> Vec { - let mut peers_info = Vec::new(); - let now = get_sys_timestamp() as Timestamp; - if let Some(peer_state) = self.states.get(key) { - if now - peer_state.timestamp > self.timeout { - return vec![]; - } - let encoded_peer_info = EncodedPeerInfo { - key, - record: peer_state.state.clone(), - timestamp: peer_state.timestamp, - }; - peers_info.push(encoded_peer_info); + pub fn encode(&self, key: &str) -> Vec { + let mut peers_info = Vec::new(); + let now = get_sys_timestamp() as Timestamp; + if let Some(peer_state) = self.states.get(key) { + if now - peer_state.timestamp > self.timeout { + return vec![]; } - - postcard::to_allocvec(&peers_info).unwrap() + let encoded_peer_info = EncodedState { + key, + value: peer_state.state.clone(), + timestamp: peer_state.timestamp, + }; + peers_info.push(encoded_peer_info); } - pub fn encode_all(&self) -> Vec { - let mut peers_info = Vec::new(); - let now = get_sys_timestamp() as Timestamp; - for (key, peer_state) in self.states.iter() { - if now - peer_state.timestamp > self.timeout { - continue; - } - let encoded_peer_info = EncodedPeerInfo { - key, - record: peer_state.state.clone(), - timestamp: peer_state.timestamp, - }; - peers_info.push(encoded_peer_info); + postcard::to_allocvec(&peers_info).unwrap() + } + + pub fn encode_all(&self) -> Vec { + let mut peers_info = Vec::new(); + let now = get_sys_timestamp() as Timestamp; + for (key, peer_state) in self.states.iter() { + if now - peer_state.timestamp > self.timeout { + continue; } - postcard::to_allocvec(&peers_info).unwrap() + let encoded_peer_info = EncodedState { + key, + value: peer_state.state.clone(), + timestamp: peer_state.timestamp, + }; + peers_info.push(encoded_peer_info); } + postcard::to_allocvec(&peers_info).unwrap() + } - /// Returns (updated, added, removed) - pub fn apply( - &mut self, - encoded_peers_info: &[u8], - ) -> (Vec, Vec, Vec) { - let peers_info: Vec = - postcard::from_bytes(encoded_peers_info).unwrap(); - let mut changed_keys = Vec::new(); - let mut added_keys = Vec::new(); - let mut removed_keys = Vec::new(); - let now = get_sys_timestamp() as Timestamp; - for EncodedPeerInfo { - key, - record, - timestamp, - } in peers_info - { - match self.states.get_mut(key) { - Some(peer_info) if peer_info.timestamp >= timestamp => { - // do nothing - } - _ => { - let old = self.states.insert( - key.to_string(), - PeerState { - state: record.clone(), - timestamp: now, - }, - ); - match (old, record) { - (Some(_), Some(_)) => changed_keys.push(key.to_string()), - (None, Some(_)) => added_keys.push(key.to_string()), - (Some(_), None) => removed_keys.push(key.to_string()), - (None, None) => {} - } + /// Returns (updated, added, removed) + pub fn apply(&mut self, encoded_peers_info: &[u8]) -> AwarenessUpdates { + let peers_info: Vec = postcard::from_bytes(encoded_peers_info).unwrap(); + let mut changed_keys = Vec::new(); + let mut added_keys = Vec::new(); + let mut removed_keys = Vec::new(); + let now = get_sys_timestamp() as Timestamp; + for EncodedState { + key, + value: record, + timestamp, + } in peers_info + { + match self.states.get_mut(key) { + Some(peer_info) if peer_info.timestamp >= timestamp => { + // do nothing + } + _ => { + let old = self.states.insert( + key.to_string(), + State { + state: record.clone(), + timestamp: now, + }, + ); + match (old, record) { + (Some(_), Some(_)) => changed_keys.push(key.to_string()), + (None, Some(_)) => added_keys.push(key.to_string()), + (Some(_), None) => removed_keys.push(key.to_string()), + (None, None) => {} } } } - - (changed_keys, added_keys, removed_keys) } - pub fn set(&mut self, key: &str, value: impl Into) { - self._set_local_state(key, Some(value.into())); + AwarenessUpdates { + added: added_keys, + changed: changed_keys, + removed: removed_keys, } + } - pub fn delete(&mut self, key: &str) { - self._set_local_state(key, None); - } + pub fn set(&mut self, key: &str, value: impl Into) { + self._set_local_state(key, Some(value.into())); + } - fn _set_local_state(&mut self, key: &str, value: Option) { - self.states.insert( - key.to_string(), - PeerState { - state: value, - timestamp: get_sys_timestamp() as Timestamp, - }, - ); - if self.subs.inner().is_empty() { - return; - } - self.subs.emit(&(), self.encode(key)); - } + pub fn delete(&mut self, key: &str) { + self._set_local_state(key, None); + } - pub fn get(&self, key: &str) -> Option { - self.states.get(key).and_then(|x| x.state.clone()) - } + pub fn get(&self, key: &str) -> Option { + self.states.get(key).and_then(|x| x.state.clone()) + } - pub fn remove_outdated(&mut self) -> Vec { - let now = get_sys_timestamp() as Timestamp; - let mut removed = Vec::new(); + pub fn remove_outdated(&mut self) -> Vec { + let now = get_sys_timestamp() as Timestamp; + let mut removed = Vec::new(); - self.states.retain(|key, state| { - if now - state.timestamp > self.timeout { - if state.state.is_some() { - removed.push(key.clone()); - } - false - } else { - true + self.states.retain(|key, state| { + if now - state.timestamp > self.timeout { + if state.state.is_some() { + removed.push(key.clone()); } - }); + false + } else { + true + } + }); - removed - } + removed + } - pub fn get_all_states(&self) -> FxHashMap { - self.states - .iter() - .filter(|(_, v)| v.state.is_some()) - .map(|(k, v)| (k.clone(), v.state.clone().unwrap())) - .collect() - } + pub fn get_all_states(&self) -> FxHashMap { + self.states + .iter() + .filter(|(_, v)| v.state.is_some()) + .map(|(k, v)| (k.clone(), v.state.clone().unwrap())) + .collect() + } + + pub fn subscribe_local_update(&self, callback: LocalAwarenessCallback) -> Subscription { + let (sub, activate) = self.subs.inner().insert((), callback); + activate(); + sub + } - pub fn subscribe_local_update(&self, callback: LocalAwarenessCallback) -> Subscription { - let (sub, activate) = self.subs.inner().insert((), callback); - activate(); - sub + fn _set_local_state(&mut self, key: &str, value: Option) { + self.states.insert( + key.to_string(), + State { + state: value, + timestamp: get_sys_timestamp() as Timestamp, + }, + ); + if self.subs.inner().is_empty() { + return; } + self.subs.emit(&(), self.encode(key)); } } From 2e5860841375ca28088bb256a9b6878568a8796f Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Thu, 6 Mar 2025 13:46:48 +0800 Subject: [PATCH 05/16] bk --- crates/loro-internal/src/awareness.rs | 4 +-- crates/loro-wasm/src/awareness.rs | 44 ++++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index a5e451b24..03a86a907 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -248,8 +248,8 @@ impl EphemeralStore { } /// Returns (updated, added, removed) - pub fn apply(&mut self, encoded_peers_info: &[u8]) -> AwarenessUpdates { - let peers_info: Vec = postcard::from_bytes(encoded_peers_info).unwrap(); + pub fn apply(&mut self, data: &[u8]) -> AwarenessUpdates { + let peers_info: Vec = postcard::from_bytes(data).unwrap(); let mut changed_keys = Vec::new(); let mut added_keys = Vec::new(); let mut removed_keys = Vec::new(); diff --git a/crates/loro-wasm/src/awareness.rs b/crates/loro-wasm/src/awareness.rs index 1af797cd3..51bdec4ca 100644 --- a/crates/loro-wasm/src/awareness.rs +++ b/crates/loro-wasm/src/awareness.rs @@ -1,5 +1,8 @@ use js_sys::{Array, Object, Reflect}; -use loro_internal::{awareness::Awareness as InternalAwareness, id::PeerID}; +use loro_internal::{ + awareness::Awareness as InternalAwareness, awareness::EphemeralStore as InternalEphemeralStore, + id::PeerID, +}; use wasm_bindgen::prelude::*; use crate::{js_peer_to_peer, JsIntoPeerID, JsResult, JsStrPeerID}; @@ -23,6 +26,9 @@ extern "C" { /// Awareness apply result #[wasm_bindgen(typescript_type = "{ updated: PeerID[], added: PeerID[] }")] pub type JsAwarenessApplyResult; + /// Awareness updates + #[wasm_bindgen(typescript_type = "{ updated: string[], added: string[], removed: string[] }")] + pub type JsAwarenessUpdates; } #[wasm_bindgen] @@ -152,3 +158,39 @@ impl AwarenessWasm { fn peer_to_str_js(peer: PeerID) -> JsValue { format!("{}", peer).into() } + +#[wasm_bindgen] +pub struct EphemeralStoreWasm { + inner: InternalEphemeralStore, +} + +#[wasm_bindgen] +impl EphemeralStoreWasm { + #[wasm_bindgen(constructor)] + pub fn new(timeout: f64) -> EphemeralStoreWasm { + EphemeralStoreWasm { + inner: InternalEphemeralStore::new(timeout as i64), + } + } + + pub fn encode(&self, key: &str) -> Vec { + self.inner.encode(key) + } + + pub fn encodeAll(&self) -> Vec { + self.inner.encode_all() + } + + pub fn apply(&mut self, data: &[u8]) -> JsResult { + let updates = self.inner.apply(data); + let ans = Object::new(); + let updated = Array::from_iter(updates.updated.into_iter().map(peer_to_str_js)); + let added = Array::from_iter(updates.added.into_iter().map(peer_to_str_js)); + let removed = Array::from_iter(updates.removed.into_iter().map(peer_to_str_js)); + Reflect::set(&ans, &"updated".into(), &updated.into())?; + Reflect::set(&ans, &"added".into(), &added.into())?; + Reflect::set(&ans, &"removed".into(), &removed.into())?; + let v: JsValue = ans.into(); + Ok(v.into()) + } +} From fd26917c5ea6dfd2a77a6e2d2b1800fc56841940 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Fri, 7 Mar 2025 16:17:25 +0800 Subject: [PATCH 06/16] feat: ephemeral wrapper --- crates/loro-internal/src/awareness.rs | 10 +-- crates/loro-wasm/index.ts | 99 +++++++++++++++++++++ crates/loro-wasm/src/awareness.rs | 67 +++++++++++++- crates/loro-wasm/src/lib.rs | 12 ++- crates/loro-wasm/tests/ephemeral.test.ts | 107 +++++++++++++++++++++++ 5 files changed, 285 insertions(+), 10 deletions(-) create mode 100644 crates/loro-wasm/tests/ephemeral.test.ts diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index 03a86a907..8636c8967 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -12,7 +12,7 @@ use crate::{SubscriberSetWithQueue, Subscription}; /// The state of a specific peer is expected to be removed after a specified timeout. Use /// `remove_outdated` to eliminate outdated states. #[derive(Debug, Clone)] -#[deprecated(since = "1.4.6", note = "Use `AwarenessV2` instead.")] +#[deprecated(since = "1.4.6", note = "Use `EphemeralStore` instead.")] pub struct Awareness { peer: PeerID, peers: FxHashMap, @@ -199,7 +199,7 @@ struct State { #[derive(Debug, Clone)] pub struct AwarenessUpdates { pub added: Vec, - pub changed: Vec, + pub updated: Vec, pub removed: Vec, } @@ -250,7 +250,7 @@ impl EphemeralStore { /// Returns (updated, added, removed) pub fn apply(&mut self, data: &[u8]) -> AwarenessUpdates { let peers_info: Vec = postcard::from_bytes(data).unwrap(); - let mut changed_keys = Vec::new(); + let mut updated_keys = Vec::new(); let mut added_keys = Vec::new(); let mut removed_keys = Vec::new(); let now = get_sys_timestamp() as Timestamp; @@ -273,7 +273,7 @@ impl EphemeralStore { }, ); match (old, record) { - (Some(_), Some(_)) => changed_keys.push(key.to_string()), + (Some(_), Some(_)) => updated_keys.push(key.to_string()), (None, Some(_)) => added_keys.push(key.to_string()), (Some(_), None) => removed_keys.push(key.to_string()), (None, None) => {} @@ -284,7 +284,7 @@ impl EphemeralStore { AwarenessUpdates { added: added_keys, - changed: changed_keys, + updated: updated_keys, removed: removed_keys, } } diff --git a/crates/loro-wasm/index.ts b/crates/loro-wasm/index.ts index e4c084f01..0c2e6abb1 100644 --- a/crates/loro-wasm/index.ts +++ b/crates/loro-wasm/index.ts @@ -2,6 +2,7 @@ export * from "loro-wasm"; export type * from "loro-wasm"; import { AwarenessWasm, + EphemeralStoreWasm, PeerID, Container, ContainerID, @@ -15,6 +16,7 @@ import { OpId, Value, AwarenessListener, + EphemeralListener, } from "loro-wasm"; /** @@ -113,6 +115,8 @@ export function newRootContainerID( /** + * @deprecated Please use `EphemeralStore` instead. + * * Awareness is a structure that allows to track the ephemeral state of the peers. * * If we don't receive a state update from a peer within the timeout, we will remove their state. @@ -214,6 +218,101 @@ export class Awareness { } } +/** + * Awareness is a structure that allows to track the ephemeral state of the peers. + * + * If we don't receive a state update from a peer within the timeout, we will remove their state. + * The timeout is in milliseconds. This can be used to handle the off-line state of a peer. + */ +export class EphemeralStore { + inner: EphemeralStoreWasm; + private timer: number | undefined; + private timeout: number; + private listeners: Set = new Set(); + constructor(timeout: number = 30000) { + this.inner = new EphemeralStoreWasm(timeout); + this.timeout = timeout; + } + + apply(bytes: Uint8Array, origin = "remote") { + const { updated, added, removed } = this.inner.apply(bytes); + this.listeners.forEach((listener) => { + listener({ updated, added, removed }, origin); + }); + + this.startTimerIfNotEmpty(); + } + + set(key: string, value: T) { + const wasEmpty = this.get(key) == null; + this.inner.set(key, value); + if (wasEmpty) { + this.listeners.forEach((listener) => { + listener( + { updated: [], added: [key], removed: [] }, + "local", + ); + }); + } else { + this.listeners.forEach((listener) => { + listener( + { updated: [key], added: [], removed: [] }, + "local", + ); + }); + } + + this.startTimerIfNotEmpty(); + } + + get(key: string): T | undefined { + return this.inner.get(key); + } + + getAllStates(): Record { + return this.inner.getAllStates(); + } + + encode(key: string): Uint8Array { + return this.inner.encode(key); + } + + encodeAll(): Uint8Array { + return this.inner.encodeAll(); + } + + addListener(listener: EphemeralListener) { + this.listeners.add(listener); + } + + removeListener(listener: EphemeralListener) { + this.listeners.delete(listener); + } + + destroy() { + clearInterval(this.timer); + this.listeners.clear(); + } + + private startTimerIfNotEmpty() { + if (this.inner.isEmpty() || this.timer != null) { + return; + } + + this.timer = setInterval(() => { + const removed = this.inner.removeOutdated(); + if (removed.length > 0) { + this.listeners.forEach((listener) => { + listener({ updated: [], added: [], removed }, "timeout"); + }); + } + if (this.inner.isEmpty()) { + clearInterval(this.timer); + this.timer = undefined; + } + }, this.timeout / 2) as unknown as number; + } +} LoroDoc.prototype.toJsonWithReplacer = function (replacer: (key: string | number, value: Value | Container) => Value | Container | undefined) { const processed = new Set(); const doc = this; diff --git a/crates/loro-wasm/src/awareness.rs b/crates/loro-wasm/src/awareness.rs index 51bdec4ca..781e933e9 100644 --- a/crates/loro-wasm/src/awareness.rs +++ b/crates/loro-wasm/src/awareness.rs @@ -5,7 +5,10 @@ use loro_internal::{ }; use wasm_bindgen::prelude::*; -use crate::{js_peer_to_peer, JsIntoPeerID, JsResult, JsStrPeerID}; +use crate::{ + console_error, js_peer_to_peer, observer, subscription_to_js_function_callback, JsIntoPeerID, + JsResult, JsStrPeerID, +}; /// `Awareness` is a structure that tracks the ephemeral state of peers. /// @@ -166,6 +169,11 @@ pub struct EphemeralStoreWasm { #[wasm_bindgen] impl EphemeralStoreWasm { + /// Creates a new `EphemeralStore` instance. + /// + /// The `timeout` parameter specifies the duration in milliseconds. + /// A state of a peer is considered outdated, if the last update of the state of the peer + /// is older than the `timeout`. #[wasm_bindgen(constructor)] pub fn new(timeout: f64) -> EphemeralStoreWasm { EphemeralStoreWasm { @@ -173,6 +181,44 @@ impl EphemeralStoreWasm { } } + pub fn set(&mut self, key: &str, value: JsValue) { + self.inner.set(key, value); + } + + pub fn delete(&mut self, key: &str) { + self.inner.delete(key); + } + + pub fn get(&self, key: &str) -> JsValue { + self.inner + .get(key) + .map(|v| v.into()) + .unwrap_or(JsValue::UNDEFINED) + } + + pub fn getAllStates(&self) -> JsValue { + let states = self.inner.get_all_states(); + let obj = Object::new(); + for (key, value) in states { + Reflect::set(&obj, &key.into(), &value.into()).unwrap(); + } + obj.into() + } + + pub fn subscribeLocalUpdate(&self, f: js_sys::Function) -> JsValue { + let observer = observer::Observer::new(f); + let sub = self.inner.subscribe_local_update(Box::new(move |e| { + let arr = js_sys::Uint8Array::new_with_length(e.len() as u32); + arr.copy_from(e); + if let Err(e) = observer.call1(&arr.into()) { + console_error!("EphemeralStore subscribeLocalUpdate: Error: {:?}", e); + } + true + })); + + subscription_to_js_function_callback(sub) + } + pub fn encode(&self, key: &str) -> Vec { self.inner.encode(key) } @@ -184,13 +230,26 @@ impl EphemeralStoreWasm { pub fn apply(&mut self, data: &[u8]) -> JsResult { let updates = self.inner.apply(data); let ans = Object::new(); - let updated = Array::from_iter(updates.updated.into_iter().map(peer_to_str_js)); - let added = Array::from_iter(updates.added.into_iter().map(peer_to_str_js)); - let removed = Array::from_iter(updates.removed.into_iter().map(peer_to_str_js)); + let updated = Array::from_iter(updates.updated.into_iter().map(|s| JsValue::from_str(&s))); + let added = Array::from_iter(updates.added.into_iter().map(|s| JsValue::from_str(&s))); + let removed = Array::from_iter(updates.removed.into_iter().map(|s| JsValue::from_str(&s))); Reflect::set(&ans, &"updated".into(), &updated.into())?; Reflect::set(&ans, &"added".into(), &added.into())?; Reflect::set(&ans, &"removed".into(), &removed.into())?; let v: JsValue = ans.into(); Ok(v.into()) } + + pub fn removeOutdated(&mut self) -> Vec { + self.inner.remove_outdated() + } + + /// If the state is empty. + pub fn isEmpty(&self) -> bool { + self.inner.get_all_states().is_empty() + } + + pub fn keys(&self) -> Vec { + self.inner.get_all_states().keys().cloned().collect() + } } diff --git a/crates/loro-wasm/src/lib.rs b/crates/loro-wasm/src/lib.rs index 4236310c7..650279ee4 100644 --- a/crates/loro-wasm/src/lib.rs +++ b/crates/loro-wasm/src/lib.rs @@ -43,7 +43,7 @@ mod awareness; mod log; use crate::convert::{handler_to_js_value, js_to_container, js_to_cursor}; -pub use awareness::AwarenessWasm; +pub use awareness::{AwarenessWasm, EphemeralStoreWasm}; mod convert; @@ -5880,6 +5880,10 @@ export type AwarenessListener = ( arg: { updated: PeerID[]; added: PeerID[]; removed: PeerID[] }, origin: "local" | "timeout" | "remote" | string, ) => void; +export type EphemeralListener = ( + arg: { updated: string[]; added: string[]; removed: string[] }, + origin: "local" | "timeout" | "remote" | string, +) => void; interface Listener { @@ -6395,5 +6399,11 @@ interface AwarenessWasm { setLocalState(value: T): void; removeOutdated(): PeerID[]; } +interface EphemeralStoreWasm { + set(key: string, value: T): void; + get(key: string): T | undefined; + getAllStates(): Record; + removeOutdated(): string[]; +} "#; diff --git a/crates/loro-wasm/tests/ephemeral.test.ts b/crates/loro-wasm/tests/ephemeral.test.ts new file mode 100644 index 000000000..2998fc66e --- /dev/null +++ b/crates/loro-wasm/tests/ephemeral.test.ts @@ -0,0 +1,107 @@ +import { describe, expect, it } from "vitest"; +import { + EphemeralStore, + EphemeralStoreWasm, + EphemeralListener, + setDebug, +} from "../bundler/index"; + +describe("EphemeralStore", () => { + it("set and get", () => { + const store = new EphemeralStoreWasm(30_000); + store.set("key1", { foo: "bar" }); + expect(store.get("key1")).toEqual({ foo: "bar" }); + expect(store.getAllStates()).toEqual({ "key1": { foo: "bar" } }); + }); + + it("sync", () => { + const store = new EphemeralStoreWasm(30_000); + store.set("key1", { foo: "bar" }); + + const storeB = new EphemeralStoreWasm(30_000); + const changed = storeB.apply(store.encode("key1")); + + expect(changed).toStrictEqual({ added: ["key1"], updated: [], removed: [] }); + expect(storeB.get("key1")).toEqual({ foo: "bar" }); + expect(storeB.getAllStates()).toEqual({ "key1": { foo: "bar" } }); + }); + + it("should remove outdated", async () => { + setDebug(); + const store = new EphemeralStoreWasm(5); + store.set("key1", { foo: "bar" }); + await new Promise((r) => setTimeout(r, 10)); + const outdated = store.removeOutdated(); + expect(outdated).toEqual(["key1"]); + expect(store.getAllStates()).toEqual({}); + }); + + it("wrapped", async () => { + const store = new EphemeralStore(10); + let i = 0; + const listener = ((arg, origin) => { + if (i === 0) { + expect(origin).toBe("local"); + expect(arg).toStrictEqual({ + removed: [], + updated: [], + added: ["key1"], + }); + } + if (i === 1) { + expect(origin).toBe("remote"); + expect(arg).toStrictEqual({ + removed: [], + updated: [], + added: ["key2"], + }); + } + if (i >= 2) { + expect(origin).toBe("timeout"); + for (const r of arg.removed) { + expect(["key1", "key2"]).toContain(r); + } + } + + i += 1; + }) as EphemeralListener; + store.addListener(listener); + store.set("key1", "123"); + const b = new EphemeralStore(10); + b.set("key2", "223"); + const bytes = b.encode("key2"); + store.apply(bytes); + expect(store.getAllStates()).toEqual({ "key1": "123", "key2": "223" }); + await new Promise((r) => setTimeout(r, 20)); + expect(store.getAllStates()).toEqual({}); + expect(i).toBeGreaterThanOrEqual(3); + }); + + it("consistency", () => { + const a = new EphemeralStoreWasm(10); + const b = new EphemeralStoreWasm(10); + a.set("key1", 0); + const oldBytes = a.encode("key1"); + a.set("key1", 1); + const newBytes = a.encode("key1"); + b.apply(newBytes); + b.apply(oldBytes); + expect(a.get("key1")).toBe(1); + expect(b.get("key1")).toBe(1); + }); + + it("encode binary", () => { + const a = new EphemeralStoreWasm(10); + const b = new EphemeralStoreWasm(10); + a.set("key1", { + a: Uint8Array.from([1, 2, 3, 4]), + b: Uint8Array.from([5, 6, 7, 8]), + }); + const bytes = a.encodeAll(); + b.apply(bytes); + expect(b.get("key1")).toEqual({ + a: Uint8Array.from([1, 2, 3, 4]), + b: Uint8Array.from([5, 6, 7, 8]), + }); + }); +}); \ No newline at end of file From 118474cbf8616b20d4d7501aac1658944cfb7836 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Fri, 7 Mar 2025 16:18:02 +0800 Subject: [PATCH 07/16] chore: --- crates/loro-wasm/index.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/loro-wasm/index.ts b/crates/loro-wasm/index.ts index 0c2e6abb1..2dbb9fbc7 100644 --- a/crates/loro-wasm/index.ts +++ b/crates/loro-wasm/index.ts @@ -289,6 +289,10 @@ export class EphemeralStore { this.listeners.delete(listener); } + keys(): string[] { + return this.inner.keys(); + } + destroy() { clearInterval(this.timer); this.listeners.clear(); From 50cc32a2b7403111e713485a6dd0565c0cfad9d1 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Fri, 7 Mar 2025 16:35:37 +0800 Subject: [PATCH 08/16] feat: EphemeralStore ffi --- crates/loro-ffi/src/doc.rs | 2 +- crates/loro-ffi/src/ephemeral.rs | 94 +++++++++++++++++++++++++++ crates/loro-ffi/src/lib.rs | 2 + crates/loro-internal/src/awareness.rs | 13 +++- crates/loro-wasm/src/awareness.rs | 2 +- 5 files changed, 108 insertions(+), 5 deletions(-) create mode 100644 crates/loro-ffi/src/ephemeral.rs diff --git a/crates/loro-ffi/src/doc.rs b/crates/loro-ffi/src/doc.rs index 3383fd617..4ccc9fb7a 100644 --- a/crates/loro-ffi/src/doc.rs +++ b/crates/loro-ffi/src/doc.rs @@ -902,7 +902,7 @@ pub trait Unsubscriber: Sync + Send { /// A handle to a subscription created by GPUI. When dropped, the subscription /// is cancelled and the callback will no longer be invoked. -pub struct Subscription(Mutex>); +pub struct Subscription(pub(crate) Mutex>); impl Subscription { /// Detaches the subscription from this handle. The callback will diff --git a/crates/loro-ffi/src/ephemeral.rs b/crates/loro-ffi/src/ephemeral.rs new file mode 100644 index 000000000..0781362fd --- /dev/null +++ b/crates/loro-ffi/src/ephemeral.rs @@ -0,0 +1,94 @@ +use crate::{LoroValue, LoroValueLike, Subscription}; +use loro::awareness::EphemeralStore as InternalEphemeralStore; +use std::sync::{Arc, Mutex}; + +pub trait LocalEphemeralListener: Sync + Send { + fn on_ephemeral_update(&self, update: Vec); +} + +pub struct EphemeralStore(Mutex); + +impl EphemeralStore { + pub fn new(timeout: i64) -> Self { + Self(Mutex::new(InternalEphemeralStore::new(timeout))) + } + + pub fn encode(&self, key: &str) -> Vec { + self.0.try_lock().unwrap().encode(key) + } + + pub fn encode_all(&self) -> Vec { + self.0.try_lock().unwrap().encode_all() + } + + pub fn apply(&self, data: &[u8]) -> EphemeralUpdates { + self.0.try_lock().unwrap().apply(data).into() + } + + pub fn set(&self, key: &str, value: Arc) { + self.0.try_lock().unwrap().set(key, value.as_loro_value()) + } + + pub fn delete(&self, key: &str) { + self.0.try_lock().unwrap().delete(key) + } + + pub fn get(&self, key: &str) -> Option { + self.0.try_lock().unwrap().get(key).map(|v| v.into()) + } + + pub fn remove_outdated(&self) -> Vec { + self.0.try_lock().unwrap().remove_outdated() + } + + pub fn keys(&self) -> Vec { + self.0 + .try_lock() + .unwrap() + .keys() + .map(|s| s.to_string()) + .collect() + } + + pub fn get_all_states(&self) -> std::collections::HashMap { + self.0 + .try_lock() + .unwrap() + .get_all_states() + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect() + } + + pub fn subscribe_local_update( + &self, + listener: Arc, + ) -> Arc { + let s = self + .0 + .try_lock() + .unwrap() + .subscribe_local_update(Box::new(move |update| { + // TODO: should it be cloned? + listener.on_ephemeral_update(update.to_vec()); + true + })); + Arc::new(Subscription(Mutex::new(Some(s)))) + } +} + +pub struct EphemeralUpdates { + pub added: Vec, + pub updated: Vec, + pub removed: Vec, +} + +impl From for EphemeralUpdates { + fn from(value: loro::awareness::EphemeralUpdates) -> Self { + EphemeralUpdates { + added: value.added, + updated: value.updated, + removed: value.removed, + } + } +} diff --git a/crates/loro-ffi/src/lib.rs b/crates/loro-ffi/src/lib.rs index d38856111..26409d0af 100644 --- a/crates/loro-ffi/src/lib.rs +++ b/crates/loro-ffi/src/lib.rs @@ -35,6 +35,8 @@ mod version; pub use version::{Frontiers, VersionVector, VersionVectorDiff}; mod awareness; pub use awareness::{Awareness, AwarenessPeerUpdate, PeerInfo}; +mod ephemeral; +pub use ephemeral::{EphemeralStore, EphemeralUpdates, LocalEphemeralListener}; // https://github.com/mozilla/uniffi-rs/issues/1372 pub trait ValueOrContainer: Send + Sync { diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index 8636c8967..6a6ce45a1 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -197,7 +197,7 @@ struct State { } #[derive(Debug, Clone)] -pub struct AwarenessUpdates { +pub struct EphemeralUpdates { pub added: Vec, pub updated: Vec, pub removed: Vec, @@ -248,7 +248,7 @@ impl EphemeralStore { } /// Returns (updated, added, removed) - pub fn apply(&mut self, data: &[u8]) -> AwarenessUpdates { + pub fn apply(&mut self, data: &[u8]) -> EphemeralUpdates { let peers_info: Vec = postcard::from_bytes(data).unwrap(); let mut updated_keys = Vec::new(); let mut added_keys = Vec::new(); @@ -282,7 +282,7 @@ impl EphemeralStore { } } - AwarenessUpdates { + EphemeralUpdates { added: added_keys, updated: updated_keys, removed: removed_keys, @@ -327,6 +327,13 @@ impl EphemeralStore { .collect() } + pub fn keys(&self) -> impl Iterator { + self.states + .keys() + .filter(|&k| self.states.get(k).unwrap().state.is_some()) + .map(|s| s.as_str()) + } + pub fn subscribe_local_update(&self, callback: LocalAwarenessCallback) -> Subscription { let (sub, activate) = self.subs.inner().insert((), callback); activate(); diff --git a/crates/loro-wasm/src/awareness.rs b/crates/loro-wasm/src/awareness.rs index 781e933e9..28f7a782b 100644 --- a/crates/loro-wasm/src/awareness.rs +++ b/crates/loro-wasm/src/awareness.rs @@ -250,6 +250,6 @@ impl EphemeralStoreWasm { } pub fn keys(&self) -> Vec { - self.inner.get_all_states().keys().cloned().collect() + self.inner.keys().map(|s| s.to_string()).collect() } } From cc857cc4f19951f8e12e9616ede08450545ae1b5 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Fri, 7 Mar 2025 16:37:50 +0800 Subject: [PATCH 09/16] chore: --- crates/loro-internal/src/awareness.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index 6a6ce45a1..51f77ac11 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -160,7 +160,7 @@ impl Awareness { } } -pub type LocalAwarenessCallback = Box) -> bool + Send + Sync + 'static>; +pub type LocalEphemeralCallback = Box) -> bool + Send + Sync + 'static>; /// `EphemeralStore` is a structure that tracks the ephemeral state of peers. /// @@ -168,7 +168,7 @@ pub type LocalAwarenessCallback = Box) -> bool + Send + Sync + ' /// We use the latest timestamp as the tie-breaker for LWW (Last-Write-Wins) conflict resolution. pub struct EphemeralStore { states: FxHashMap, - subs: SubscriberSetWithQueue<(), LocalAwarenessCallback, Vec>, + subs: SubscriberSetWithQueue<(), LocalEphemeralCallback, Vec>, timeout: i64, } @@ -334,7 +334,7 @@ impl EphemeralStore { .map(|s| s.as_str()) } - pub fn subscribe_local_update(&self, callback: LocalAwarenessCallback) -> Subscription { + pub fn subscribe_local_update(&self, callback: LocalEphemeralCallback) -> Subscription { let (sub, activate) = self.subs.inner().insert((), callback); activate(); sub From 411476afd9ebf7a244469337660806740a2de135 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Fri, 7 Mar 2025 16:42:03 +0800 Subject: [PATCH 10/16] chore: changelog --- .changeset/loud-shrimps-develop.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/loud-shrimps-develop.md diff --git a/.changeset/loud-shrimps-develop.md b/.changeset/loud-shrimps-develop.md new file mode 100644 index 000000000..e05d62995 --- /dev/null +++ b/.changeset/loud-shrimps-develop.md @@ -0,0 +1,5 @@ +--- +"loro-crdt": patch +--- + +add `EphemeralStore` From 6c3b8e36adc5a46bf3af6e10775ebb830a203d5a Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Fri, 7 Mar 2025 17:04:28 +0800 Subject: [PATCH 11/16] chore: --- crates/loro-ffi/src/awareness.rs | 1 + crates/loro-wasm/src/awareness.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/crates/loro-ffi/src/awareness.rs b/crates/loro-ffi/src/awareness.rs index fb6798439..728efd7a3 100644 --- a/crates/loro-ffi/src/awareness.rs +++ b/crates/loro-ffi/src/awareness.rs @@ -1,3 +1,4 @@ +#![allow(deprecated)] use std::{ collections::HashMap, sync::{Arc, Mutex}, diff --git a/crates/loro-wasm/src/awareness.rs b/crates/loro-wasm/src/awareness.rs index 28f7a782b..aa25b6de4 100644 --- a/crates/loro-wasm/src/awareness.rs +++ b/crates/loro-wasm/src/awareness.rs @@ -1,3 +1,4 @@ +#![allow(deprecated)] use js_sys::{Array, Object, Reflect}; use loro_internal::{ awareness::Awareness as InternalAwareness, awareness::EphemeralStore as InternalEphemeralStore, From 7cbc9b9269d43179fb420afade6ba7a13ce8dacb Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Sun, 9 Mar 2025 11:13:37 +0800 Subject: [PATCH 12/16] feat: add subscribe for ephemeral store --- .changeset/loud-shrimps-develop.md | 2 +- crates/loro-ffi/src/ephemeral.rs | 24 +---- crates/loro-ffi/src/lib.rs | 2 +- crates/loro-internal/src/awareness.rs | 111 ++++++++++++++++++----- crates/loro-wasm/index.ts | 52 +++-------- crates/loro-wasm/src/awareness.rs | 84 ++++++++++++++--- crates/loro-wasm/src/lib.rs | 20 ++-- crates/loro-wasm/tests/ephemeral.test.ts | 33 ++++--- 8 files changed, 210 insertions(+), 118 deletions(-) diff --git a/.changeset/loud-shrimps-develop.md b/.changeset/loud-shrimps-develop.md index e05d62995..56b8c7889 100644 --- a/.changeset/loud-shrimps-develop.md +++ b/.changeset/loud-shrimps-develop.md @@ -1,5 +1,5 @@ --- -"loro-crdt": patch +"loro-crdt": minor --- add `EphemeralStore` diff --git a/crates/loro-ffi/src/ephemeral.rs b/crates/loro-ffi/src/ephemeral.rs index 0781362fd..33eff00b3 100644 --- a/crates/loro-ffi/src/ephemeral.rs +++ b/crates/loro-ffi/src/ephemeral.rs @@ -21,8 +21,8 @@ impl EphemeralStore { self.0.try_lock().unwrap().encode_all() } - pub fn apply(&self, data: &[u8]) -> EphemeralUpdates { - self.0.try_lock().unwrap().apply(data).into() + pub fn apply(&self, data: &[u8]) { + self.0.try_lock().unwrap().apply(data) } pub fn set(&self, key: &str, value: Arc) { @@ -37,7 +37,7 @@ impl EphemeralStore { self.0.try_lock().unwrap().get(key).map(|v| v.into()) } - pub fn remove_outdated(&self) -> Vec { + pub fn remove_outdated(&self) { self.0.try_lock().unwrap().remove_outdated() } @@ -68,7 +68,7 @@ impl EphemeralStore { .0 .try_lock() .unwrap() - .subscribe_local_update(Box::new(move |update| { + .subscribe_local_updates(Box::new(move |update| { // TODO: should it be cloned? listener.on_ephemeral_update(update.to_vec()); true @@ -76,19 +76,3 @@ impl EphemeralStore { Arc::new(Subscription(Mutex::new(Some(s)))) } } - -pub struct EphemeralUpdates { - pub added: Vec, - pub updated: Vec, - pub removed: Vec, -} - -impl From for EphemeralUpdates { - fn from(value: loro::awareness::EphemeralUpdates) -> Self { - EphemeralUpdates { - added: value.added, - updated: value.updated, - removed: value.removed, - } - } -} diff --git a/crates/loro-ffi/src/lib.rs b/crates/loro-ffi/src/lib.rs index 26409d0af..7c5d8b1b3 100644 --- a/crates/loro-ffi/src/lib.rs +++ b/crates/loro-ffi/src/lib.rs @@ -36,7 +36,7 @@ pub use version::{Frontiers, VersionVector, VersionVectorDiff}; mod awareness; pub use awareness::{Awareness, AwarenessPeerUpdate, PeerInfo}; mod ephemeral; -pub use ephemeral::{EphemeralStore, EphemeralUpdates, LocalEphemeralListener}; +pub use ephemeral::{EphemeralStore, LocalEphemeralListener}; // https://github.com/mozilla/uniffi-rs/issues/1372 pub trait ValueOrContainer: Send + Sync { diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index 51f77ac11..85785b855 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -160,7 +160,23 @@ impl Awareness { } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum EphemeralEventTrigger { + Local, + Remote, + Timeout, +} + +#[derive(Debug)] +pub struct EphemeralStoreEvent { + pub by: EphemeralEventTrigger, + pub added: Vec, + pub updated: Vec, + pub removed: Vec, +} + pub type LocalEphemeralCallback = Box) -> bool + Send + Sync + 'static>; +pub type EphemeralSubscriber = Box bool + Send + Sync + 'static>; /// `EphemeralStore` is a structure that tracks the ephemeral state of peers. /// @@ -168,7 +184,8 @@ pub type LocalEphemeralCallback = Box) -> bool + Send + Sync + ' /// We use the latest timestamp as the tie-breaker for LWW (Last-Write-Wins) conflict resolution. pub struct EphemeralStore { states: FxHashMap, - subs: SubscriberSetWithQueue<(), LocalEphemeralCallback, Vec>, + local_subs: SubscriberSetWithQueue<(), LocalEphemeralCallback, Vec>, + subscribers: SubscriberSetWithQueue<(), EphemeralSubscriber, EphemeralStoreEvent>, timeout: i64, } @@ -196,19 +213,13 @@ struct State { timestamp: i64, } -#[derive(Debug, Clone)] -pub struct EphemeralUpdates { - pub added: Vec, - pub updated: Vec, - pub removed: Vec, -} - impl EphemeralStore { pub fn new(timeout: i64) -> EphemeralStore { EphemeralStore { timeout, states: FxHashMap::default(), - subs: SubscriberSetWithQueue::new(), + local_subs: SubscriberSetWithQueue::new(), + subscribers: SubscriberSetWithQueue::new(), } } @@ -247,8 +258,7 @@ impl EphemeralStore { postcard::to_allocvec(&peers_info).unwrap() } - /// Returns (updated, added, removed) - pub fn apply(&mut self, data: &[u8]) -> EphemeralUpdates { + pub fn apply(&mut self, data: &[u8]) { let peers_info: Vec = postcard::from_bytes(data).unwrap(); let mut updated_keys = Vec::new(); let mut added_keys = Vec::new(); @@ -281,11 +291,16 @@ impl EphemeralStore { } } } - - EphemeralUpdates { - added: added_keys, - updated: updated_keys, - removed: removed_keys, + if !self.subscribers.inner().is_empty() { + self.subscribers.emit( + &(), + EphemeralStoreEvent { + by: EphemeralEventTrigger::Remote, + added: added_keys.clone(), + updated: updated_keys.clone(), + removed: removed_keys.clone(), + }, + ); } } @@ -301,7 +316,7 @@ impl EphemeralStore { self.states.get(key).and_then(|x| x.state.clone()) } - pub fn remove_outdated(&mut self) -> Vec { + pub fn remove_outdated(&mut self) { let now = get_sys_timestamp() as Timestamp; let mut removed = Vec::new(); @@ -315,8 +330,17 @@ impl EphemeralStore { true } }); - - removed + if !self.subscribers.inner().is_empty() { + self.subscribers.emit( + &(), + EphemeralStoreEvent { + by: EphemeralEventTrigger::Timeout, + added: vec![], + updated: vec![], + removed, + }, + ); + } } pub fn get_all_states(&self) -> FxHashMap { @@ -334,23 +358,60 @@ impl EphemeralStore { .map(|s| s.as_str()) } - pub fn subscribe_local_update(&self, callback: LocalEphemeralCallback) -> Subscription { - let (sub, activate) = self.subs.inner().insert((), callback); + pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription { + let (sub, activate) = self.local_subs.inner().insert((), callback); + activate(); + sub + } + + pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription { + let (sub, activate) = self.subscribers.inner().insert((), callback); activate(); sub } fn _set_local_state(&mut self, key: &str, value: Option) { - self.states.insert( + let is_delete = value.is_none(); + let old = self.states.insert( key.to_string(), State { state: value, timestamp: get_sys_timestamp() as Timestamp, }, ); - if self.subs.inner().is_empty() { - return; + if !self.local_subs.inner().is_empty() { + self.local_subs.emit(&(), self.encode(key)); + } + if !self.subscribers.inner().is_empty() { + if old.is_some() { + self.subscribers.emit( + &(), + EphemeralStoreEvent { + by: EphemeralEventTrigger::Local, + added: vec![], + updated: if !is_delete { + vec![key.to_string()] + } else { + vec![] + }, + removed: if !is_delete { + vec![] + } else { + vec![key.to_string()] + }, + }, + ); + } else if !is_delete { + self.subscribers.emit( + &(), + EphemeralStoreEvent { + by: EphemeralEventTrigger::Local, + added: vec![key.to_string()], + updated: vec![], + removed: vec![], + }, + ); + } } - self.subs.emit(&(), self.encode(key)); } } diff --git a/crates/loro-wasm/index.ts b/crates/loro-wasm/index.ts index 2dbb9fbc7..89752b7a0 100644 --- a/crates/loro-wasm/index.ts +++ b/crates/loro-wasm/index.ts @@ -17,6 +17,7 @@ import { Value, AwarenessListener, EphemeralListener, + EphemeralLocalListener, } from "loro-wasm"; /** @@ -228,40 +229,18 @@ export class EphemeralStore { inner: EphemeralStoreWasm; private timer: number | undefined; private timeout: number; - private listeners: Set = new Set(); constructor(timeout: number = 30000) { this.inner = new EphemeralStoreWasm(timeout); this.timeout = timeout; } - apply(bytes: Uint8Array, origin = "remote") { - const { updated, added, removed } = this.inner.apply(bytes); - this.listeners.forEach((listener) => { - listener({ updated, added, removed }, origin); - }); - + apply(bytes: Uint8Array) { + this.inner.apply(bytes); this.startTimerIfNotEmpty(); } set(key: string, value: T) { - const wasEmpty = this.get(key) == null; this.inner.set(key, value); - if (wasEmpty) { - this.listeners.forEach((listener) => { - listener( - { updated: [], added: [key], removed: [] }, - "local", - ); - }); - } else { - this.listeners.forEach((listener) => { - listener( - { updated: [key], added: [], removed: [] }, - "local", - ); - }); - } - this.startTimerIfNotEmpty(); } @@ -281,21 +260,20 @@ export class EphemeralStore { return this.inner.encodeAll(); } - addListener(listener: EphemeralListener) { - this.listeners.add(listener); - } - - removeListener(listener: EphemeralListener) { - this.listeners.delete(listener); - } - keys(): string[] { return this.inner.keys(); } destroy() { clearInterval(this.timer); - this.listeners.clear(); + } + + subscribe(listener: EphemeralListener) { + return this.inner.subscribe(listener); + } + + subscribeLocalUpdates(listener: EphemeralLocalListener) { + return this.inner.subscribeLocalUpdates(listener); } private startTimerIfNotEmpty() { @@ -304,12 +282,7 @@ export class EphemeralStore { } this.timer = setInterval(() => { - const removed = this.inner.removeOutdated(); - if (removed.length > 0) { - this.listeners.forEach((listener) => { - listener({ updated: [], added: [], removed }, "timeout"); - }); - } + this.inner.removeOutdated(); if (this.inner.isEmpty()) { clearInterval(this.timer); this.timer = undefined; @@ -317,6 +290,7 @@ export class EphemeralStore { }, this.timeout / 2) as unknown as number; } } + LoroDoc.prototype.toJsonWithReplacer = function (replacer: (key: string | number, value: Value | Container) => Value | Container | undefined) { const processed = new Set(); const doc = this; diff --git a/crates/loro-wasm/src/awareness.rs b/crates/loro-wasm/src/awareness.rs index aa25b6de4..1b23d2cbd 100644 --- a/crates/loro-wasm/src/awareness.rs +++ b/crates/loro-wasm/src/awareness.rs @@ -1,7 +1,10 @@ #![allow(deprecated)] use js_sys::{Array, Object, Reflect}; use loro_internal::{ - awareness::Awareness as InternalAwareness, awareness::EphemeralStore as InternalEphemeralStore, + awareness::{ + Awareness as InternalAwareness, EphemeralEventTrigger, + EphemeralStore as InternalEphemeralStore, EphemeralStoreEvent, + }, id::PeerID, }; use wasm_bindgen::prelude::*; @@ -206,9 +209,10 @@ impl EphemeralStoreWasm { obj.into() } - pub fn subscribeLocalUpdate(&self, f: js_sys::Function) -> JsValue { + #[wasm_bindgen(skip_typescript)] + pub fn subscribeLocalUpdates(&self, f: js_sys::Function) -> JsValue { let observer = observer::Observer::new(f); - let sub = self.inner.subscribe_local_update(Box::new(move |e| { + let sub = self.inner.subscribe_local_updates(Box::new(move |e| { let arr = js_sys::Uint8Array::new_with_length(e.len() as u32); arr.copy_from(e); if let Err(e) = observer.call1(&arr.into()) { @@ -220,6 +224,65 @@ impl EphemeralStoreWasm { subscription_to_js_function_callback(sub) } + #[wasm_bindgen(skip_typescript)] + pub fn subscribe(&self, f: js_sys::Function) -> JsValue { + let observer = observer::Observer::new(f); + let sub = self.inner.subscribe(Box::new( + move |EphemeralStoreEvent { + by, + added, + updated, + removed, + }| { + let obj = Object::new(); + Reflect::set( + &obj, + &"added".into(), + &added + .iter() + .map(|s| JsValue::from_str(s)) + .collect::() + .into(), + ) + .unwrap(); + Reflect::set( + &obj, + &"updated".into(), + &updated + .iter() + .map(|s| JsValue::from_str(s)) + .collect::() + .into(), + ) + .unwrap(); + Reflect::set( + &obj, + &"removed".into(), + &removed + .iter() + .map(|s| JsValue::from_str(s)) + .collect::() + .into(), + ) + .unwrap(); + Reflect::set( + &obj, + &"by".into(), + &match by { + EphemeralEventTrigger::Local => JsValue::from_str("local"), + EphemeralEventTrigger::Remote => JsValue::from_str("remote"), + EphemeralEventTrigger::Timeout => JsValue::from_str("timeout"), + }, + ) + .unwrap(); + observer.call1(&obj.into()).unwrap(); + true + }, + )); + + subscription_to_js_function_callback(sub) + } + pub fn encode(&self, key: &str) -> Vec { self.inner.encode(key) } @@ -228,20 +291,11 @@ impl EphemeralStoreWasm { self.inner.encode_all() } - pub fn apply(&mut self, data: &[u8]) -> JsResult { - let updates = self.inner.apply(data); - let ans = Object::new(); - let updated = Array::from_iter(updates.updated.into_iter().map(|s| JsValue::from_str(&s))); - let added = Array::from_iter(updates.added.into_iter().map(|s| JsValue::from_str(&s))); - let removed = Array::from_iter(updates.removed.into_iter().map(|s| JsValue::from_str(&s))); - Reflect::set(&ans, &"updated".into(), &updated.into())?; - Reflect::set(&ans, &"added".into(), &added.into())?; - Reflect::set(&ans, &"removed".into(), &removed.into())?; - let v: JsValue = ans.into(); - Ok(v.into()) + pub fn apply(&mut self, data: &[u8]) { + self.inner.apply(data); } - pub fn removeOutdated(&mut self) -> Vec { + pub fn removeOutdated(&mut self) { self.inner.remove_outdated() } diff --git a/crates/loro-wasm/src/lib.rs b/crates/loro-wasm/src/lib.rs index 650279ee4..cdfd7c5cf 100644 --- a/crates/loro-wasm/src/lib.rs +++ b/crates/loro-wasm/src/lib.rs @@ -5880,11 +5880,6 @@ export type AwarenessListener = ( arg: { updated: PeerID[]; added: PeerID[]; removed: PeerID[] }, origin: "local" | "timeout" | "remote" | string, ) => void; -export type EphemeralListener = ( - arg: { updated: string[]; added: string[]; removed: string[] }, - origin: "local" | "timeout" | "remote" | string, -) => void; - interface Listener { (event: LoroEventBatch): void; @@ -6399,11 +6394,24 @@ interface AwarenessWasm { setLocalState(value: T): void; removeOutdated(): PeerID[]; } + +type EphemeralListener = (event: EphemeralStoreEvent) => void; +type EphemeralLocalListener = (bytes: Uint8Array) => void; + interface EphemeralStoreWasm { set(key: string, value: T): void; get(key: string): T | undefined; getAllStates(): Record; - removeOutdated(): string[]; + removeOutdated(); + subscribeLocalUpdates(f: EphemeralLocalListener): () => void; + subscribe(f: EphemeralListener): () => void; +} + +interface EphemeralStoreEvent { + by: "local" | "import" | "timeout"; + added: string[]; + updated: string[]; + removed: string[]; } "#; diff --git a/crates/loro-wasm/tests/ephemeral.test.ts b/crates/loro-wasm/tests/ephemeral.test.ts index 2998fc66e..d795e8695 100644 --- a/crates/loro-wasm/tests/ephemeral.test.ts +++ b/crates/loro-wasm/tests/ephemeral.test.ts @@ -4,6 +4,7 @@ import { EphemeralStoreWasm, EphemeralListener, setDebug, + EphemeralStoreEvent, } from "../bundler/index"; describe("EphemeralStore", () => { @@ -17,21 +18,31 @@ describe("EphemeralStore", () => { it("sync", () => { const store = new EphemeralStoreWasm(30_000); store.set("key1", { foo: "bar" }); + let changed: EphemeralStoreEvent = { by: "local", added: [], updated: [], removed: [] }; const storeB = new EphemeralStoreWasm(30_000); - const changed = storeB.apply(store.encode("key1")); + storeB.subscribe((e) => { + changed = e; + }); + storeB.apply(store.encode("key1")); - expect(changed).toStrictEqual({ added: ["key1"], updated: [], removed: [] }); + expect(changed).toStrictEqual({ by: "remote", added: ["key1"], updated: [], removed: [] }); expect(storeB.get("key1")).toEqual({ foo: "bar" }); expect(storeB.getAllStates()).toEqual({ "key1": { foo: "bar" } }); }); it("should remove outdated", async () => { setDebug(); + let outdated: string[] = []; const store = new EphemeralStoreWasm(5); + store.subscribe((e) => { + if (e.removed.length > 0) { + outdated = e.removed; + } + }) store.set("key1", { foo: "bar" }); await new Promise((r) => setTimeout(r, 10)); - const outdated = store.removeOutdated(); + store.removeOutdated(); expect(outdated).toEqual(["key1"]); expect(store.getAllStates()).toEqual({}); }); @@ -39,33 +50,33 @@ describe("EphemeralStore", () => { it("wrapped", async () => { const store = new EphemeralStore(10); let i = 0; - const listener = ((arg, origin) => { + const listener = ((e) => { if (i === 0) { - expect(origin).toBe("local"); - expect(arg).toStrictEqual({ + expect(e).toStrictEqual({ + by: "local", removed: [], updated: [], added: ["key1"], }); } if (i === 1) { - expect(origin).toBe("remote"); - expect(arg).toStrictEqual({ + expect(e).toStrictEqual({ + by: "remote", removed: [], updated: [], added: ["key2"], }); } if (i >= 2) { - expect(origin).toBe("timeout"); - for (const r of arg.removed) { + expect(e.by).toBe("timeout"); + for (const r of e.removed) { expect(["key1", "key2"]).toContain(r); } } i += 1; }) as EphemeralListener; - store.addListener(listener); + store.subscribe(listener); store.set("key1", "123"); const b = new EphemeralStore(10); b.set("key2", "223"); From 7c881c4611eebec314559a213090a53c3aed88b6 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Mon, 10 Mar 2025 10:39:04 +0800 Subject: [PATCH 13/16] fix: ffi --- crates/loro-ffi/src/ephemeral.rs | 30 +++++++++++++++++++- crates/loro-internal/src/awareness.rs | 40 ++++++++++++++------------- 2 files changed, 50 insertions(+), 20 deletions(-) diff --git a/crates/loro-ffi/src/ephemeral.rs b/crates/loro-ffi/src/ephemeral.rs index 33eff00b3..bbf1f44c4 100644 --- a/crates/loro-ffi/src/ephemeral.rs +++ b/crates/loro-ffi/src/ephemeral.rs @@ -1,11 +1,22 @@ use crate::{LoroValue, LoroValueLike, Subscription}; -use loro::awareness::EphemeralStore as InternalEphemeralStore; +use loro::awareness::{EphemeralEventTrigger, EphemeralStore as InternalEphemeralStore}; use std::sync::{Arc, Mutex}; +#[derive(Debug, Clone)] +pub struct EphemeralStoreEvent { + pub by: EphemeralEventTrigger, + pub added: Arc>, + pub removed: Arc>, + pub updated: Arc>, +} + pub trait LocalEphemeralListener: Sync + Send { fn on_ephemeral_update(&self, update: Vec); } +pub trait EphemeralSubscriber: Sync + Send { + fn on_ephemeral_event(&self, event: EphemeralStoreEvent); +} pub struct EphemeralStore(Mutex); impl EphemeralStore { @@ -75,4 +86,21 @@ impl EphemeralStore { })); Arc::new(Subscription(Mutex::new(Some(s)))) } + + pub fn subscribe(&self, listener: Arc) -> Arc { + let s = self + .0 + .try_lock() + .unwrap() + .subscribe(Box::new(move |update| { + listener.on_ephemeral_event(EphemeralStoreEvent { + by: update.by, + added: update.added.clone(), + removed: update.removed.clone(), + updated: update.updated.clone(), + }); + true + })); + Arc::new(Subscription(Mutex::new(Some(s)))) + } } diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index 85785b855..26be692a8 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use fxhash::FxHashMap; use loro_common::{LoroValue, PeerID}; use serde::{Deserialize, Serialize}; @@ -160,19 +162,19 @@ impl Awareness { } } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum EphemeralEventTrigger { Local, Remote, Timeout, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct EphemeralStoreEvent { pub by: EphemeralEventTrigger, - pub added: Vec, - pub updated: Vec, - pub removed: Vec, + pub added: Arc>, + pub updated: Arc>, + pub removed: Arc>, } pub type LocalEphemeralCallback = Box) -> bool + Send + Sync + 'static>; @@ -296,9 +298,9 @@ impl EphemeralStore { &(), EphemeralStoreEvent { by: EphemeralEventTrigger::Remote, - added: added_keys.clone(), - updated: updated_keys.clone(), - removed: removed_keys.clone(), + added: Arc::new(added_keys), + updated: Arc::new(updated_keys), + removed: Arc::new(removed_keys), }, ); } @@ -335,9 +337,9 @@ impl EphemeralStore { &(), EphemeralStoreEvent { by: EphemeralEventTrigger::Timeout, - added: vec![], - updated: vec![], - removed, + added: Arc::new(Vec::new()), + updated: Arc::new(Vec::new()), + removed: Arc::new(removed), }, ); } @@ -388,16 +390,16 @@ impl EphemeralStore { &(), EphemeralStoreEvent { by: EphemeralEventTrigger::Local, - added: vec![], + added: Arc::new(Vec::new()), updated: if !is_delete { - vec![key.to_string()] + Arc::new(vec![key.to_string()]) } else { - vec![] + Arc::new(Vec::new()) }, removed: if !is_delete { - vec![] + Arc::new(Vec::new()) } else { - vec![key.to_string()] + Arc::new(vec![key.to_string()]) }, }, ); @@ -406,9 +408,9 @@ impl EphemeralStore { &(), EphemeralStoreEvent { by: EphemeralEventTrigger::Local, - added: vec![key.to_string()], - updated: vec![], - removed: vec![], + added: Arc::new(vec![key.to_string()]), + updated: Arc::new(Vec::new()), + removed: Arc::new(Vec::new()), }, ); } From 5667f04b29c12b42f6a4b4e88b703dfc1b5991d5 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Mon, 10 Mar 2025 10:56:48 +0800 Subject: [PATCH 14/16] fix: trigger --- crates/loro-internal/src/awareness.rs | 4 ++-- crates/loro-wasm/src/awareness.rs | 2 +- crates/loro-wasm/tests/ephemeral.test.ts | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index 26be692a8..f5f490738 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -165,7 +165,7 @@ impl Awareness { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum EphemeralEventTrigger { Local, - Remote, + Import, Timeout, } @@ -297,7 +297,7 @@ impl EphemeralStore { self.subscribers.emit( &(), EphemeralStoreEvent { - by: EphemeralEventTrigger::Remote, + by: EphemeralEventTrigger::Import, added: Arc::new(added_keys), updated: Arc::new(updated_keys), removed: Arc::new(removed_keys), diff --git a/crates/loro-wasm/src/awareness.rs b/crates/loro-wasm/src/awareness.rs index 1b23d2cbd..a60d66fef 100644 --- a/crates/loro-wasm/src/awareness.rs +++ b/crates/loro-wasm/src/awareness.rs @@ -270,7 +270,7 @@ impl EphemeralStoreWasm { &"by".into(), &match by { EphemeralEventTrigger::Local => JsValue::from_str("local"), - EphemeralEventTrigger::Remote => JsValue::from_str("remote"), + EphemeralEventTrigger::Import => JsValue::from_str("import"), EphemeralEventTrigger::Timeout => JsValue::from_str("timeout"), }, ) diff --git a/crates/loro-wasm/tests/ephemeral.test.ts b/crates/loro-wasm/tests/ephemeral.test.ts index d795e8695..3c8fad8b2 100644 --- a/crates/loro-wasm/tests/ephemeral.test.ts +++ b/crates/loro-wasm/tests/ephemeral.test.ts @@ -26,7 +26,7 @@ describe("EphemeralStore", () => { }); storeB.apply(store.encode("key1")); - expect(changed).toStrictEqual({ by: "remote", added: ["key1"], updated: [], removed: [] }); + expect(changed).toStrictEqual({ by: "import", added: ["key1"], updated: [], removed: [] }); expect(storeB.get("key1")).toEqual({ foo: "bar" }); expect(storeB.getAllStates()).toEqual({ "key1": { foo: "bar" } }); }); @@ -61,7 +61,7 @@ describe("EphemeralStore", () => { } if (i === 1) { expect(e).toStrictEqual({ - by: "remote", + by: "import", removed: [], updated: [], added: ["key2"], From 38351ef82677ee82d58d124a8beb3f8b76036c68 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Mon, 10 Mar 2025 20:32:00 +0800 Subject: [PATCH 15/16] chore: doc --- crates/loro-internal/src/awareness.rs | 15 +++++++++++++++ crates/loro-wasm/index.ts | 27 ++++++++++++++++++++++++--- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index f5f490738..c9d5731d9 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -184,6 +184,21 @@ pub type EphemeralSubscriber = Box bool + Send + /// /// It can be used to synchronize cursor positions, selections, and the names of the peers. /// We use the latest timestamp as the tie-breaker for LWW (Last-Write-Wins) conflict resolution. +/// +/// # Example +/// +/// ```rust +/// let store = EphemeralStore::new(1000); +/// store.set("key", "value"); +/// let encoded = store.encode("key"); +/// let store2 = EphemeralStore::new(1000); +/// store.subscribe_local_updates(|data| { +/// println!("local update: {:?}", data); +/// true +/// }); +/// store2.apply(&encoded); +/// assert_eq!(store2.get("key"), Some("value".into())); +/// ``` pub struct EphemeralStore { states: FxHashMap, local_subs: SubscriberSetWithQueue<(), LocalEphemeralCallback, Vec>, diff --git a/crates/loro-wasm/index.ts b/crates/loro-wasm/index.ts index 89752b7a0..4229dd91c 100644 --- a/crates/loro-wasm/index.ts +++ b/crates/loro-wasm/index.ts @@ -121,7 +121,7 @@ export function newRootContainerID( * Awareness is a structure that allows to track the ephemeral state of the peers. * * If we don't receive a state update from a peer within the timeout, we will remove their state. - * The timeout is in milliseconds. This can be used to handle the off-line state of a peer. + * The timeout is in milliseconds. This can be used to handle the offline state of a peer. */ export class Awareness { inner: AwarenessWasm; @@ -220,10 +220,31 @@ export class Awareness { } /** - * Awareness is a structure that allows to track the ephemeral state of the peers. + * EphemeralStore is a structure that allows to track the ephemeral state of the peers. * * If we don't receive a state update from a peer within the timeout, we will remove their state. - * The timeout is in milliseconds. This can be used to handle the off-line state of a peer. + * The timeout is in milliseconds. This can be used to handle the offline state of a peer. + * + * @example + * + * ```ts + * const store = new EphemeralStore(); + * const store2 = new EphemeralStore(); + * // Subscribe to local updates + * store.subscribeLocalUpdates((data)=>{ + * store2.apply(data); + * }) + * // Subscribe to all updates + * store2.subscribe((event)=>{ + * console.log("event: ", event); + * }) + * // Set a value + * store.set("key", "value"); + * // Encode the value + * const encoded = store.encode("key"); + * // Apply the encoded value + * store2.apply(encoded); + * ``` */ export class EphemeralStore { inner: EphemeralStoreWasm; From 218bb971cc19ac8867e7311bfc9178564a321693 Mon Sep 17 00:00:00 2001 From: Leon Zhao Date: Mon, 10 Mar 2025 20:41:11 +0800 Subject: [PATCH 16/16] chore: fix doc --- crates/loro-internal/src/awareness.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/loro-internal/src/awareness.rs b/crates/loro-internal/src/awareness.rs index c9d5731d9..067fb2293 100644 --- a/crates/loro-internal/src/awareness.rs +++ b/crates/loro-internal/src/awareness.rs @@ -188,14 +188,16 @@ pub type EphemeralSubscriber = Box bool + Send + /// # Example /// /// ```rust -/// let store = EphemeralStore::new(1000); +/// use loro_internal::awareness::EphemeralStore; +/// +/// let mut store = EphemeralStore::new(1000); /// store.set("key", "value"); /// let encoded = store.encode("key"); -/// let store2 = EphemeralStore::new(1000); -/// store.subscribe_local_updates(|data| { +/// let mut store2 = EphemeralStore::new(1000); +/// store.subscribe_local_updates(Box::new(|data| { /// println!("local update: {:?}", data); /// true -/// }); +/// })); /// store2.apply(&encoded); /// assert_eq!(store2.get("key"), Some("value".into())); /// ```