Skip to content

Commit

Permalink
fix: prevent merging remote changes based on local `changeMergeInterv…
Browse files Browse the repository at this point in the history
…al` config (#614)

* docs: make the merge interval unit clear

* fix: prevent merging remote changes with small intervals and improve commit message handling

The changeMergeInterval config should only work for local changes.

* docs: refine docs
  • Loading branch information
zxch3n authored Jan 9, 2025
1 parent 763ab04 commit 9c1005d
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 29 deletions.
5 changes: 5 additions & 0 deletions .changeset/rude-candles-jog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"loro-crdt": patch
---

fix: should not merge remote changes due to small interval
2 changes: 1 addition & 1 deletion crates/loro-internal/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl Change {
&& other.id.counter == self.id.counter + self.content_len() as Counter
&& other.deps.len() == 1
&& other.deps.as_single().unwrap().peer == self.id.peer
&& other.timestamp - self.timestamp < merge_interval
&& other.timestamp - self.timestamp <= merge_interval
&& self.commit_msg == other.commit_msg
{
debug_assert!(other.timestamp >= self.timestamp);
Expand Down
12 changes: 6 additions & 6 deletions crates/loro-internal/src/configure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::LoroDoc;
pub struct Configure {
pub(crate) text_style_config: Arc<RwLock<StyleConfigMap>>,
record_timestamp: Arc<AtomicBool>,
pub(crate) merge_interval: Arc<AtomicI64>,
pub(crate) merge_interval_in_s: Arc<AtomicI64>,
pub(crate) editable_detached_mode: Arc<AtomicBool>,
}

Expand All @@ -24,7 +24,7 @@ impl Default for Configure {
text_style_config: Arc::new(RwLock::new(StyleConfigMap::default_rich_text_config())),
record_timestamp: Arc::new(AtomicBool::new(false)),
editable_detached_mode: Arc::new(AtomicBool::new(false)),
merge_interval: Arc::new(AtomicI64::new(1000 * 1000)),
merge_interval_in_s: Arc::new(AtomicI64::new(1000)),
}
}
}
Expand All @@ -39,8 +39,8 @@ impl Configure {
self.record_timestamp
.load(std::sync::atomic::Ordering::Relaxed),
)),
merge_interval: Arc::new(AtomicI64::new(
self.merge_interval
merge_interval_in_s: Arc::new(AtomicI64::new(
self.merge_interval_in_s
.load(std::sync::atomic::Ordering::Relaxed),
)),
editable_detached_mode: Arc::new(AtomicBool::new(
Expand Down Expand Up @@ -75,12 +75,12 @@ impl Configure {
}

pub fn merge_interval(&self) -> i64 {
self.merge_interval
self.merge_interval_in_s
.load(std::sync::atomic::Ordering::Relaxed)
}

pub fn set_merge_interval(&self, interval: i64) {
self.merge_interval
self.merge_interval_in_s
.store(interval, std::sync::atomic::Ordering::Relaxed);
}
}
Expand Down
22 changes: 21 additions & 1 deletion crates/loro-internal/src/loro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl LoroDoc {
self.config.set_record_timestamp(record);
}

/// Set the interval of mergeable changes, in milliseconds.
/// Set the interval of mergeable changes, in seconds.
///
/// If two continuous local changes are within the interval, they will be merged into one change.
/// The default value is 1000 seconds.
Expand Down Expand Up @@ -1766,15 +1766,27 @@ impl Drop for CommitWhenDrop<'_> {
}
}

/// Options for configuring a commit operation.
#[derive(Debug, Clone)]
pub struct CommitOptions {
/// Origin identifier for the commit event, used to track the source of changes.
/// It doesn't persist.
pub origin: Option<InternalString>,

/// Whether to immediately start a new transaction after committing.
/// Defaults to true.
pub immediate_renew: bool,

/// Custom timestamp for the commit in seconds since Unix epoch.
/// If None, the current time will be used.
pub timestamp: Option<Timestamp>,

/// Optional commit message to attach to the changes. It will be persisted.
pub commit_msg: Option<Arc<str>>,
}

impl CommitOptions {
/// Creates a new CommitOptions with default values.
pub fn new() -> Self {
Self {
origin: None,
Expand All @@ -1784,30 +1796,38 @@ impl CommitOptions {
}
}

/// Sets the origin identifier for this commit.
pub fn origin(mut self, origin: &str) -> Self {
self.origin = Some(origin.into());
self
}

/// Sets whether to immediately start a new transaction after committing.
pub fn immediate_renew(mut self, immediate_renew: bool) -> Self {
self.immediate_renew = immediate_renew;
self
}

/// Set the timestamp of the commit.
///
/// The timestamp is the number of **seconds** that have elapsed since 00:00:00 UTC on January 1, 1970.
pub fn timestamp(mut self, timestamp: Timestamp) -> Self {
self.timestamp = Some(timestamp);
self
}

/// Sets a commit message to be attached to the changes.
pub fn commit_msg(mut self, commit_msg: &str) -> Self {
self.commit_msg = Some(commit_msg.into());
self
}

/// Sets the origin identifier for this commit.
pub fn set_origin(&mut self, origin: Option<&str>) {
self.origin = origin.map(|x| x.into())
}

/// Sets the timestamp for this commit.
pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) {
self.timestamp = timestamp;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/loro-internal/src/oplog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl OpLog {
pub(crate) fn new() -> Self {
let arena = SharedArena::new();
let cfg = Configure::default();
let change_store = ChangeStore::new_mem(&arena, cfg.merge_interval.clone());
let change_store = ChangeStore::new_mem(&arena, cfg.merge_interval_in_s.clone());
Self {
history_cache: Mutex::new(ContainerHistoryCache::new(change_store.clone(), None)),
dag: AppDag::new(change_store.clone()),
Expand Down Expand Up @@ -137,7 +137,7 @@ impl OpLog {
.unwrap()
.insert_by_new_change(&change, true, true);
self.register_container_and_parent_link(&change);
self.change_store.insert_change(change, true);
self.change_store.insert_change(change, true, from_local);
}

#[inline(always)]
Expand Down
24 changes: 15 additions & 9 deletions crates/loro-internal/src/oplog/change_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl ChangeStore {
}

let ch = c.slice(start, end);
new_store.insert_change(ch, false);
new_store.insert_change(ch, false, false);
}
}

Expand All @@ -189,7 +189,7 @@ impl ChangeStore {
}

let ch = c.slice(start, end);
new_store.insert_change(ch, false);
new_store.insert_change(ch, false, false);
}
}

Expand Down Expand Up @@ -501,7 +501,7 @@ impl ChangeStore {

assert_ne!(start, end);
let ch = c.slice(start, end);
new_store.insert_change(ch, false);
new_store.insert_change(ch, false, false);
}
}

Expand Down Expand Up @@ -536,7 +536,7 @@ impl ChangeStore {

assert_ne!(start, end);
let ch = c.slice(start, end);
new_store.insert_change(ch, false);
new_store.insert_change(ch, false, false);
}
}

Expand Down Expand Up @@ -690,7 +690,7 @@ mod mut_inner_kv {
///
/// The new change either merges with the previous block or is put into a new block.
/// This method only updates the internal kv store.
pub fn insert_change(&self, mut change: Change, split_when_exceeds: bool) {
pub fn insert_change(&self, mut change: Change, split_when_exceeds: bool, is_local: bool) {
#[cfg(debug_assertions)]
{
let vv = self.external_vv.try_lock().unwrap();
Expand Down Expand Up @@ -718,8 +718,14 @@ mod mut_inner_kv {
match block.push_change(
change,
estimated_size,
self.merge_interval
.load(std::sync::atomic::Ordering::Acquire),
if is_local {
// local change should try to merge with previous change when
// the timestamp interval <= the `merge_interval`
self.merge_interval
.load(std::sync::atomic::Ordering::Acquire)
} else {
0
},
&self.arena,
) {
Ok(_) => {
Expand Down Expand Up @@ -955,7 +961,7 @@ mod mut_inner_kv {

if !new_change.ops.is_empty() {
total_len += new_change.atom_len();
self.insert_change(new_change, false);
self.insert_change(new_change, false, false);
}

assert_eq!(total_len, original_len);
Expand Down Expand Up @@ -983,7 +989,7 @@ mod mut_inner_kv {
commit_msg: new_change.commit_msg.clone(),
};

self.insert_change(new_change, false);
self.insert_change(new_change, false, false);
*estimated_size = ans.estimate_storage_size();
ans
}
Expand Down
10 changes: 5 additions & 5 deletions crates/loro-internal/src/undo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ struct UndoManagerInner {
redo_stack: Stack,
processing_undo: bool,
last_undo_time: i64,
merge_interval: i64,
merge_interval_in_ms: i64,
max_stack_size: usize,
exclude_origin_prefixes: Vec<Box<str>>,
last_popped_selection: Option<Vec<CursorWithPos>>,
Expand All @@ -182,7 +182,7 @@ impl std::fmt::Debug for UndoManagerInner {
.field("redo_stack", &self.redo_stack)
.field("processing_undo", &self.processing_undo)
.field("last_undo_time", &self.last_undo_time)
.field("merge_interval", &self.merge_interval)
.field("merge_interval", &self.merge_interval_in_ms)
.field("max_stack_size", &self.max_stack_size)
.field("exclude_origin_prefixes", &self.exclude_origin_prefixes)
.finish()
Expand Down Expand Up @@ -378,7 +378,7 @@ impl UndoManagerInner {
undo_stack: Default::default(),
redo_stack: Default::default(),
processing_undo: false,
merge_interval: 0,
merge_interval_in_ms: 0,
last_undo_time: 0,
max_stack_size: usize::MAX,
exclude_origin_prefixes: vec![],
Expand Down Expand Up @@ -407,7 +407,7 @@ impl UndoManagerInner {
.map(|x| x(UndoOrRedo::Undo, span, event))
.unwrap_or_default();

if !self.undo_stack.is_empty() && now - self.last_undo_time < self.merge_interval {
if !self.undo_stack.is_empty() && now - self.last_undo_time < self.merge_interval_in_ms {
self.undo_stack.push_with_merge(span, meta, true);
} else {
self.last_undo_time = now;
Expand Down Expand Up @@ -530,7 +530,7 @@ impl UndoManager {
}

pub fn set_merge_interval(&mut self, interval: i64) {
self.inner.try_lock().unwrap().merge_interval = interval;
self.inner.try_lock().unwrap().merge_interval_in_ms = interval;
}

pub fn set_max_undo_steps(&mut self, size: usize) {
Expand Down
12 changes: 9 additions & 3 deletions crates/loro-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,9 +411,12 @@ impl LoroDoc {
self.0.set_record_timestamp(auto_record);
}

/// If two continuous local changes are within the interval, they will be merged into one change.
/// If two continuous local changes are within (<=) the interval(**in seconds**), they will be merged into one change.
///
/// The default value is 1_000_000, the default unit is seconds.
/// The default value is 1_000 seconds.
///
/// By default, we record timestamps in seconds for each change. So if the merge interval is 1, and changes A and B
/// have timestamps of 3 and 4 respectively, then they will be merged into one change
#[wasm_bindgen(js_name = "setChangeMergeInterval")]
pub fn set_change_merge_interval(&self, interval: f64) {
self.0.set_change_merge_interval(interval as i64);
Expand Down Expand Up @@ -812,12 +815,14 @@ impl LoroDoc {
Ok(())
}

/// Commit the cumulative auto committed transaction.
/// Commit the cumulative auto-committed transaction.
///
/// You can specify the `origin`, `timestamp`, and `message` of the commit.
///
/// - The `origin` is used to mark the event
/// - The `message` works like a git commit message, which will be recorded and synced to peers
/// - The `timestamp` is the number of seconds that have elapsed since 00:00:00 UTC on January 1, 1970.
/// It defaults to `Date.now() / 1000` when timestamp recording is enabled
///
/// The events will be emitted after a transaction is committed. A transaction is committed when:
///
Expand Down Expand Up @@ -4552,6 +4557,7 @@ impl UndoManager {
}

/// Set the merge interval (in ms).
///
/// If the interval is set to 0, the undo steps will not be merged.
/// Otherwise, the undo steps will be merged if the interval between the two steps is less than the given interval.
pub fn setMergeInterval(&mut self, interval: f64) {
Expand Down
44 changes: 43 additions & 1 deletion crates/loro-wasm/tests/basic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ it("can control the mergeable interval", () => {
{
const doc = new LoroDoc();
doc.setPeerId(1);
doc.setChangeMergeInterval(10);
doc.setChangeMergeInterval(9);
doc.getText("123").insert(0, "1");
doc.commit({ timestamp: 110 });
doc.getText("123").insert(0, "1");
Expand Down Expand Up @@ -1151,3 +1151,45 @@ it("can travel changes from event", async () => {
await Promise.resolve();
expect(done).toBe(true);
})


it("merge interval", async () => {
const doc = new LoroDoc();
doc.setPeerId("1");
doc.setRecordTimestamp(true);
doc.setChangeMergeInterval(1);
doc.getText("text").update("Hello");
doc.commit();
await new Promise(resolve => setTimeout(resolve, 100));
doc.getText("text").update("Hello world!");
doc.commit();
await new Promise(resolve => setTimeout(resolve, 2000));
doc.getText("text").update("Hello ABC!");
doc.commit();
const updates = doc.exportJsonUpdates();
expect(updates.changes.length).toBe(2);

await new Promise(resolve => setTimeout(resolve, 2000));
doc.getText("text").update("Hello");
doc.commit();
await new Promise(resolve => setTimeout(resolve, 100));
doc.getText("text").update("Hello world!");
doc.commit();
const updates2 = doc.exportJsonUpdates();
expect(updates2.changes.length).toBe(3);
})

it("setRecordTimestamp should be reflected on current txn", async () => {
const doc = new LoroDoc();
doc.getText("text").insert(0, "hi");
doc.commit();
{
const updates = doc.exportJsonUpdates();
expect(updates.changes[0].timestamp).toBe(0);
}
doc.setRecordTimestamp(true);
doc.getText("text").insert(0, "hi");
doc.commit();
const updates = doc.exportJsonUpdates();
expect(updates.changes[1].timestamp).toBeGreaterThan(0);
})
7 changes: 6 additions & 1 deletion crates/loro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,13 @@ impl LoroDoc {
self.doc.is_detached_editing_enabled()
}

/// Set the interval of mergeable changes, in seconds.
/// Set the interval of mergeable changes, **in seconds**.
///
/// If two continuous local changes are within the interval, they will be merged into one change.
/// The default value is 1000 seconds.
///
/// By default, we record timestamps in seconds for each change. So if the merge interval is 1, and changes A and B
/// have timestamps of 3 and 4 respectively, then they will be merged into one change
#[inline]
pub fn set_change_merge_interval(&self, interval: i64) {
self.doc.set_change_merge_interval(interval);
Expand Down Expand Up @@ -398,6 +401,8 @@ impl LoroDoc {
}

/// Set commit message for the current uncommitted changes
///
/// It will be persisted.
pub fn set_next_commit_message(&self, msg: &str) {
self.doc.set_next_commit_message(msg)
}
Expand Down
Loading

0 comments on commit 9c1005d

Please sign in to comment.