Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add EphemeralStore #679

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
5 changes: 5 additions & 0 deletions .changeset/loud-shrimps-develop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"loro-crdt": minor
---

add `EphemeralStore`
1 change: 1 addition & 0 deletions crates/loro-ffi/src/awareness.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(deprecated)]
use std::{
collections::HashMap,
sync::{Arc, Mutex},
Expand Down
2 changes: 1 addition & 1 deletion crates/loro-ffi/src/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ pub trait Unsubscriber: Sync + Send {

/// A handle to a subscription created by GPUI. When dropped, the subscription
/// is cancelled and the callback will no longer be invoked.
pub struct Subscription(Mutex<Option<loro::Subscription>>);
pub struct Subscription(pub(crate) Mutex<Option<loro::Subscription>>);

impl Subscription {
/// Detaches the subscription from this handle. The callback will
Expand Down
78 changes: 78 additions & 0 deletions crates/loro-ffi/src/ephemeral.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use crate::{LoroValue, LoroValueLike, Subscription};
use loro::awareness::EphemeralStore as InternalEphemeralStore;
use std::sync::{Arc, Mutex};

pub trait LocalEphemeralListener: Sync + Send {
fn on_ephemeral_update(&self, update: Vec<u8>);
}

pub struct EphemeralStore(Mutex<InternalEphemeralStore>);

impl EphemeralStore {
pub fn new(timeout: i64) -> Self {
Self(Mutex::new(InternalEphemeralStore::new(timeout)))
}

pub fn encode(&self, key: &str) -> Vec<u8> {
self.0.try_lock().unwrap().encode(key)
}

pub fn encode_all(&self) -> Vec<u8> {
self.0.try_lock().unwrap().encode_all()
}

pub fn apply(&self, data: &[u8]) {
self.0.try_lock().unwrap().apply(data)
}

pub fn set(&self, key: &str, value: Arc<dyn LoroValueLike>) {
self.0.try_lock().unwrap().set(key, value.as_loro_value())
}

pub fn delete(&self, key: &str) {
self.0.try_lock().unwrap().delete(key)
}

pub fn get(&self, key: &str) -> Option<LoroValue> {
self.0.try_lock().unwrap().get(key).map(|v| v.into())
}

pub fn remove_outdated(&self) {
self.0.try_lock().unwrap().remove_outdated()
}

pub fn keys(&self) -> Vec<String> {
self.0
.try_lock()
.unwrap()
.keys()
.map(|s| s.to_string())
.collect()
}

pub fn get_all_states(&self) -> std::collections::HashMap<String, LoroValue> {
self.0
.try_lock()
.unwrap()
.get_all_states()
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect()
}

pub fn subscribe_local_update(
&self,
listener: Arc<dyn LocalEphemeralListener>,
) -> Arc<Subscription> {
let s = self
.0
.try_lock()
.unwrap()
.subscribe_local_updates(Box::new(move |update| {
// TODO: should it be cloned?
listener.on_ephemeral_update(update.to_vec());
true
}));
Arc::new(Subscription(Mutex::new(Some(s))))
}
}
2 changes: 2 additions & 0 deletions crates/loro-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ mod version;
pub use version::{Frontiers, VersionVector, VersionVectorDiff};
mod awareness;
pub use awareness::{Awareness, AwarenessPeerUpdate, PeerInfo};
mod ephemeral;
pub use ephemeral::{EphemeralStore, LocalEphemeralListener};

// https://github.com/mozilla/uniffi-rs/issues/1372
pub trait ValueOrContainer: Send + Sync {
Expand Down
259 changes: 259 additions & 0 deletions crates/loro-internal/src/awareness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use loro_common::{LoroValue, PeerID};
use serde::{Deserialize, Serialize};

use crate::change::{get_sys_timestamp, Timestamp};
use crate::{SubscriberSetWithQueue, Subscription};

/// `Awareness` is a structure that tracks the ephemeral state of peers.
///
Expand All @@ -11,6 +12,7 @@ use crate::change::{get_sys_timestamp, Timestamp};
/// The state of a specific peer is expected to be removed after a specified timeout. Use
/// `remove_outdated` to eliminate outdated states.
#[derive(Debug, Clone)]
#[deprecated(since = "1.4.6", note = "Use `EphemeralStore` instead.")]
pub struct Awareness {
peer: PeerID,
peers: FxHashMap<PeerID, PeerInfo>,
Expand All @@ -32,6 +34,7 @@ struct EncodedPeerInfo {
record: LoroValue,
}

#[allow(deprecated)]
impl Awareness {
pub fn new(peer: PeerID, timeout: i64) -> Awareness {
Awareness {
Expand Down Expand Up @@ -156,3 +159,259 @@ impl Awareness {
self.peer
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EphemeralEventTrigger {
Local,
Remote,
Timeout,
}

#[derive(Debug)]
pub struct EphemeralStoreEvent {
pub by: EphemeralEventTrigger,
pub added: Vec<String>,
pub updated: Vec<String>,
pub removed: Vec<String>,
}

pub type LocalEphemeralCallback = Box<dyn Fn(&Vec<u8>) -> bool + Send + Sync + 'static>;
pub type EphemeralSubscriber = Box<dyn Fn(&EphemeralStoreEvent) -> bool + Send + Sync + 'static>;

/// `EphemeralStore` is a structure that tracks the ephemeral state of peers.
///
/// It can be used to synchronize cursor positions, selections, and the names of the peers.
/// We use the latest timestamp as the tie-breaker for LWW (Last-Write-Wins) conflict resolution.
pub struct EphemeralStore {
states: FxHashMap<String, State>,
local_subs: SubscriberSetWithQueue<(), LocalEphemeralCallback, Vec<u8>>,
subscribers: SubscriberSetWithQueue<(), EphemeralSubscriber, EphemeralStoreEvent>,
timeout: i64,
}

impl std::fmt::Debug for EphemeralStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"AwarenessV2 {{ states: {:?}, timeout: {:?} }}",
self.states, self.timeout
)
}
}

#[derive(Serialize, Deserialize)]
struct EncodedState<'a> {
#[serde(borrow)]
key: &'a str,
value: Option<LoroValue>,
timestamp: i64,
}

#[derive(Debug, Clone)]
struct State {
state: Option<LoroValue>,
timestamp: i64,
}

impl EphemeralStore {
pub fn new(timeout: i64) -> EphemeralStore {
EphemeralStore {
timeout,
states: FxHashMap::default(),
local_subs: SubscriberSetWithQueue::new(),
subscribers: SubscriberSetWithQueue::new(),
}
}

pub fn encode(&self, key: &str) -> Vec<u8> {
let mut peers_info = Vec::new();
let now = get_sys_timestamp() as Timestamp;
if let Some(peer_state) = self.states.get(key) {
if now - peer_state.timestamp > self.timeout {
return vec![];
}
let encoded_peer_info = EncodedState {
key,
value: peer_state.state.clone(),
timestamp: peer_state.timestamp,
};
peers_info.push(encoded_peer_info);
}

postcard::to_allocvec(&peers_info).unwrap()
}

pub fn encode_all(&self) -> Vec<u8> {
let mut peers_info = Vec::new();
let now = get_sys_timestamp() as Timestamp;
for (key, peer_state) in self.states.iter() {
if now - peer_state.timestamp > self.timeout {
continue;
}
let encoded_peer_info = EncodedState {
key,
value: peer_state.state.clone(),
timestamp: peer_state.timestamp,
};
peers_info.push(encoded_peer_info);
}
postcard::to_allocvec(&peers_info).unwrap()
}

pub fn apply(&mut self, data: &[u8]) {
let peers_info: Vec<EncodedState> = postcard::from_bytes(data).unwrap();
let mut updated_keys = Vec::new();
let mut added_keys = Vec::new();
let mut removed_keys = Vec::new();
let now = get_sys_timestamp() as Timestamp;
for EncodedState {
key,
value: record,
timestamp,
} in peers_info
{
match self.states.get_mut(key) {
Some(peer_info) if peer_info.timestamp >= timestamp => {
// do nothing
}
_ => {
let old = self.states.insert(
key.to_string(),
State {
state: record.clone(),
timestamp: now,
},
);
match (old, record) {
(Some(_), Some(_)) => updated_keys.push(key.to_string()),
(None, Some(_)) => added_keys.push(key.to_string()),
(Some(_), None) => removed_keys.push(key.to_string()),
(None, None) => {}
}
}
}
}
if !self.subscribers.inner().is_empty() {
self.subscribers.emit(
&(),
EphemeralStoreEvent {
by: EphemeralEventTrigger::Remote,
added: added_keys.clone(),
updated: updated_keys.clone(),
removed: removed_keys.clone(),
},
);
}
}

pub fn set(&mut self, key: &str, value: impl Into<LoroValue>) {
self._set_local_state(key, Some(value.into()));
}

pub fn delete(&mut self, key: &str) {
self._set_local_state(key, None);
}

pub fn get(&self, key: &str) -> Option<LoroValue> {
self.states.get(key).and_then(|x| x.state.clone())
}

pub fn remove_outdated(&mut self) {
let now = get_sys_timestamp() as Timestamp;
let mut removed = Vec::new();

self.states.retain(|key, state| {
if now - state.timestamp > self.timeout {
if state.state.is_some() {
removed.push(key.clone());
}
false
} else {
true
}
});
if !self.subscribers.inner().is_empty() {
self.subscribers.emit(
&(),
EphemeralStoreEvent {
by: EphemeralEventTrigger::Timeout,
added: vec![],
updated: vec![],
removed,
},
);
}
}

pub fn get_all_states(&self) -> FxHashMap<String, LoroValue> {
self.states
.iter()
.filter(|(_, v)| v.state.is_some())
.map(|(k, v)| (k.clone(), v.state.clone().unwrap()))
.collect()
}

pub fn keys(&self) -> impl Iterator<Item = &str> {
self.states
.keys()
.filter(|&k| self.states.get(k).unwrap().state.is_some())
.map(|s| s.as_str())
}

pub fn subscribe_local_updates(&self, callback: LocalEphemeralCallback) -> Subscription {
let (sub, activate) = self.local_subs.inner().insert((), callback);
activate();
sub
}

pub fn subscribe(&self, callback: EphemeralSubscriber) -> Subscription {
let (sub, activate) = self.subscribers.inner().insert((), callback);
activate();
sub
}

fn _set_local_state(&mut self, key: &str, value: Option<LoroValue>) {
let is_delete = value.is_none();
let old = self.states.insert(
key.to_string(),
State {
state: value,
timestamp: get_sys_timestamp() as Timestamp,
},
);
if !self.local_subs.inner().is_empty() {
self.local_subs.emit(&(), self.encode(key));
}
if !self.subscribers.inner().is_empty() {
if old.is_some() {
self.subscribers.emit(
&(),
EphemeralStoreEvent {
by: EphemeralEventTrigger::Local,
added: vec![],
updated: if !is_delete {
vec![key.to_string()]
} else {
vec![]
},
removed: if !is_delete {
vec![]
} else {
vec![key.to_string()]
},
},
);
} else if !is_delete {
self.subscribers.emit(
&(),
EphemeralStoreEvent {
by: EphemeralEventTrigger::Local,
added: vec![key.to_string()],
updated: vec![],
removed: vec![],
},
);
}
}
}
}
Loading