diff --git a/.changeset/rude-candles-jog.md b/.changeset/rude-candles-jog.md new file mode 100644 index 000000000..bfc6fb05c --- /dev/null +++ b/.changeset/rude-candles-jog.md @@ -0,0 +1,5 @@ +--- +"loro-crdt": patch +--- + +fix: should not merge remote changes due to small interval diff --git a/crates/loro-internal/src/change.rs b/crates/loro-internal/src/change.rs index 486fe319a..7e5f77aa8 100644 --- a/crates/loro-internal/src/change.rs +++ b/crates/loro-internal/src/change.rs @@ -248,7 +248,7 @@ impl Change { && other.id.counter == self.id.counter + self.content_len() as Counter && other.deps.len() == 1 && other.deps.as_single().unwrap().peer == self.id.peer - && other.timestamp - self.timestamp < merge_interval + && other.timestamp - self.timestamp <= merge_interval && self.commit_msg == other.commit_msg { debug_assert!(other.timestamp >= self.timestamp); diff --git a/crates/loro-internal/src/configure.rs b/crates/loro-internal/src/configure.rs index 29cd5e2eb..d2006f441 100644 --- a/crates/loro-internal/src/configure.rs +++ b/crates/loro-internal/src/configure.rs @@ -5,7 +5,7 @@ use crate::LoroDoc; pub struct Configure { pub(crate) text_style_config: Arc<RwLock<StyleConfigMap>>, record_timestamp: Arc<AtomicBool>, - pub(crate) merge_interval: Arc<AtomicI64>, + pub(crate) merge_interval_in_s: Arc<AtomicI64>, pub(crate) editable_detached_mode: Arc<AtomicBool>, } @@ -24,7 +24,7 @@ impl Default for Configure { text_style_config: Arc::new(RwLock::new(StyleConfigMap::default_rich_text_config())), record_timestamp: Arc::new(AtomicBool::new(false)), editable_detached_mode: Arc::new(AtomicBool::new(false)), - merge_interval: Arc::new(AtomicI64::new(1000 * 1000)), + merge_interval_in_s: Arc::new(AtomicI64::new(1000)), } } } @@ -39,8 +39,8 @@ impl Configure { self.record_timestamp .load(std::sync::atomic::Ordering::Relaxed), )), - merge_interval: Arc::new(AtomicI64::new( - self.merge_interval + merge_interval_in_s: Arc::new(AtomicI64::new( + self.merge_interval_in_s .load(std::sync::atomic::Ordering::Relaxed), )), editable_detached_mode: Arc::new(AtomicBool::new( @@ -75,12 +75,12 @@ impl Configure { } pub fn merge_interval(&self) -> i64 { - self.merge_interval + self.merge_interval_in_s .load(std::sync::atomic::Ordering::Relaxed) } pub fn set_merge_interval(&self, interval: i64) { - self.merge_interval + self.merge_interval_in_s .store(interval, std::sync::atomic::Ordering::Relaxed); } } diff --git a/crates/loro-internal/src/loro.rs b/crates/loro-internal/src/loro.rs index cacfac437..2504d4040 100644 --- a/crates/loro-internal/src/loro.rs +++ b/crates/loro-internal/src/loro.rs @@ -129,7 +129,7 @@ impl LoroDoc { self.config.set_record_timestamp(record); } - /// Set the interval of mergeable changes, in milliseconds. + /// Set the interval of mergeable changes, in seconds. /// /// If two continuous local changes are within the interval, they will be merged into one change. /// The default value is 1000 seconds. @@ -1766,15 +1766,27 @@ impl Drop for CommitWhenDrop<'_> { } } +/// Options for configuring a commit operation. #[derive(Debug, Clone)] pub struct CommitOptions { + /// Origin identifier for the commit event, used to track the source of changes. + /// It doesn't persist. pub origin: Option<InternalString>, + + /// Whether to immediately start a new transaction after committing. + /// Defaults to true. pub immediate_renew: bool, + + /// Custom timestamp for the commit in seconds since Unix epoch. + /// If None, the current time will be used. pub timestamp: Option<Timestamp>, + + /// Optional commit message to attach to the changes. It will be persisted. pub commit_msg: Option<Arc<str>>, } impl CommitOptions { + /// Creates a new CommitOptions with default values. pub fn new() -> Self { Self { origin: None, @@ -1784,30 +1796,38 @@ impl CommitOptions { } } + /// Sets the origin identifier for this commit. pub fn origin(mut self, origin: &str) -> Self { self.origin = Some(origin.into()); self } + /// Sets whether to immediately start a new transaction after committing. pub fn immediate_renew(mut self, immediate_renew: bool) -> Self { self.immediate_renew = immediate_renew; self } + /// Set the timestamp of the commit. + /// + /// The timestamp is the number of **seconds** that have elapsed since 00:00:00 UTC on January 1, 1970. pub fn timestamp(mut self, timestamp: Timestamp) -> Self { self.timestamp = Some(timestamp); self } + /// Sets a commit message to be attached to the changes. pub fn commit_msg(mut self, commit_msg: &str) -> Self { self.commit_msg = Some(commit_msg.into()); self } + /// Sets the origin identifier for this commit. pub fn set_origin(&mut self, origin: Option<&str>) { self.origin = origin.map(|x| x.into()) } + /// Sets the timestamp for this commit. pub fn set_timestamp(&mut self, timestamp: Option<Timestamp>) { self.timestamp = timestamp; } diff --git a/crates/loro-internal/src/oplog.rs b/crates/loro-internal/src/oplog.rs index eec5a6c18..03b2f95b3 100644 --- a/crates/loro-internal/src/oplog.rs +++ b/crates/loro-internal/src/oplog.rs @@ -70,7 +70,7 @@ impl OpLog { pub(crate) fn new() -> Self { let arena = SharedArena::new(); let cfg = Configure::default(); - let change_store = ChangeStore::new_mem(&arena, cfg.merge_interval.clone()); + let change_store = ChangeStore::new_mem(&arena, cfg.merge_interval_in_s.clone()); Self { history_cache: Mutex::new(ContainerHistoryCache::new(change_store.clone(), None)), dag: AppDag::new(change_store.clone()), @@ -137,7 +137,7 @@ impl OpLog { .unwrap() .insert_by_new_change(&change, true, true); self.register_container_and_parent_link(&change); - self.change_store.insert_change(change, true); + self.change_store.insert_change(change, true, from_local); } #[inline(always)] diff --git a/crates/loro-internal/src/oplog/change_store.rs b/crates/loro-internal/src/oplog/change_store.rs index 522654c77..8fdfbd083 100644 --- a/crates/loro-internal/src/oplog/change_store.rs +++ b/crates/loro-internal/src/oplog/change_store.rs @@ -163,7 +163,7 @@ impl ChangeStore { } let ch = c.slice(start, end); - new_store.insert_change(ch, false); + new_store.insert_change(ch, false, false); } } @@ -189,7 +189,7 @@ impl ChangeStore { } let ch = c.slice(start, end); - new_store.insert_change(ch, false); + new_store.insert_change(ch, false, false); } } @@ -501,7 +501,7 @@ impl ChangeStore { assert_ne!(start, end); let ch = c.slice(start, end); - new_store.insert_change(ch, false); + new_store.insert_change(ch, false, false); } } @@ -536,7 +536,7 @@ impl ChangeStore { assert_ne!(start, end); let ch = c.slice(start, end); - new_store.insert_change(ch, false); + new_store.insert_change(ch, false, false); } } @@ -690,7 +690,7 @@ mod mut_inner_kv { /// /// The new change either merges with the previous block or is put into a new block. /// This method only updates the internal kv store. - pub fn insert_change(&self, mut change: Change, split_when_exceeds: bool) { + pub fn insert_change(&self, mut change: Change, split_when_exceeds: bool, is_local: bool) { #[cfg(debug_assertions)] { let vv = self.external_vv.try_lock().unwrap(); @@ -718,8 +718,14 @@ mod mut_inner_kv { match block.push_change( change, estimated_size, - self.merge_interval - .load(std::sync::atomic::Ordering::Acquire), + if is_local { + // local change should try to merge with previous change when + // the timestamp interval <= the `merge_interval` + self.merge_interval + .load(std::sync::atomic::Ordering::Acquire) + } else { + 0 + }, &self.arena, ) { Ok(_) => { @@ -955,7 +961,7 @@ mod mut_inner_kv { if !new_change.ops.is_empty() { total_len += new_change.atom_len(); - self.insert_change(new_change, false); + self.insert_change(new_change, false, false); } assert_eq!(total_len, original_len); @@ -983,7 +989,7 @@ mod mut_inner_kv { commit_msg: new_change.commit_msg.clone(), }; - self.insert_change(new_change, false); + self.insert_change(new_change, false, false); *estimated_size = ans.estimate_storage_size(); ans } diff --git a/crates/loro-internal/src/undo.rs b/crates/loro-internal/src/undo.rs index 9e5e4ba45..a256b1742 100644 --- a/crates/loro-internal/src/undo.rs +++ b/crates/loro-internal/src/undo.rs @@ -166,7 +166,7 @@ struct UndoManagerInner { redo_stack: Stack, processing_undo: bool, last_undo_time: i64, - merge_interval: i64, + merge_interval_in_ms: i64, max_stack_size: usize, exclude_origin_prefixes: Vec<Box<str>>, last_popped_selection: Option<Vec<CursorWithPos>>, @@ -182,7 +182,7 @@ impl std::fmt::Debug for UndoManagerInner { .field("redo_stack", &self.redo_stack) .field("processing_undo", &self.processing_undo) .field("last_undo_time", &self.last_undo_time) - .field("merge_interval", &self.merge_interval) + .field("merge_interval", &self.merge_interval_in_ms) .field("max_stack_size", &self.max_stack_size) .field("exclude_origin_prefixes", &self.exclude_origin_prefixes) .finish() @@ -378,7 +378,7 @@ impl UndoManagerInner { undo_stack: Default::default(), redo_stack: Default::default(), processing_undo: false, - merge_interval: 0, + merge_interval_in_ms: 0, last_undo_time: 0, max_stack_size: usize::MAX, exclude_origin_prefixes: vec![], @@ -407,7 +407,7 @@ impl UndoManagerInner { .map(|x| x(UndoOrRedo::Undo, span, event)) .unwrap_or_default(); - if !self.undo_stack.is_empty() && now - self.last_undo_time < self.merge_interval { + if !self.undo_stack.is_empty() && now - self.last_undo_time < self.merge_interval_in_ms { self.undo_stack.push_with_merge(span, meta, true); } else { self.last_undo_time = now; @@ -530,7 +530,7 @@ impl UndoManager { } pub fn set_merge_interval(&mut self, interval: i64) { - self.inner.try_lock().unwrap().merge_interval = interval; + self.inner.try_lock().unwrap().merge_interval_in_ms = interval; } pub fn set_max_undo_steps(&mut self, size: usize) { diff --git a/crates/loro-wasm/src/lib.rs b/crates/loro-wasm/src/lib.rs index 9e9099fb9..cbaea4f39 100644 --- a/crates/loro-wasm/src/lib.rs +++ b/crates/loro-wasm/src/lib.rs @@ -411,9 +411,12 @@ impl LoroDoc { self.0.set_record_timestamp(auto_record); } - /// If two continuous local changes are within the interval, they will be merged into one change. + /// If two continuous local changes are within (<=) the interval(**in seconds**), they will be merged into one change. /// - /// The default value is 1_000_000, the default unit is seconds. + /// The default value is 1_000 seconds. + /// + /// By default, we record timestamps in seconds for each change. So if the merge interval is 1, and changes A and B + /// have timestamps of 3 and 4 respectively, then they will be merged into one change #[wasm_bindgen(js_name = "setChangeMergeInterval")] pub fn set_change_merge_interval(&self, interval: f64) { self.0.set_change_merge_interval(interval as i64); @@ -812,12 +815,14 @@ impl LoroDoc { Ok(()) } - /// Commit the cumulative auto committed transaction. + /// Commit the cumulative auto-committed transaction. /// /// You can specify the `origin`, `timestamp`, and `message` of the commit. /// /// - The `origin` is used to mark the event /// - The `message` works like a git commit message, which will be recorded and synced to peers + /// - The `timestamp` is the number of seconds that have elapsed since 00:00:00 UTC on January 1, 1970. + /// It defaults to `Date.now() / 1000` when timestamp recording is enabled /// /// The events will be emitted after a transaction is committed. A transaction is committed when: /// @@ -4552,6 +4557,7 @@ impl UndoManager { } /// Set the merge interval (in ms). + /// /// If the interval is set to 0, the undo steps will not be merged. /// Otherwise, the undo steps will be merged if the interval between the two steps is less than the given interval. pub fn setMergeInterval(&mut self, interval: f64) { diff --git a/crates/loro-wasm/tests/basic.test.ts b/crates/loro-wasm/tests/basic.test.ts index 97cf054be..f8ad12690 100644 --- a/crates/loro-wasm/tests/basic.test.ts +++ b/crates/loro-wasm/tests/basic.test.ts @@ -404,7 +404,7 @@ it("can control the mergeable interval", () => { { const doc = new LoroDoc(); doc.setPeerId(1); - doc.setChangeMergeInterval(10); + doc.setChangeMergeInterval(9); doc.getText("123").insert(0, "1"); doc.commit({ timestamp: 110 }); doc.getText("123").insert(0, "1"); @@ -1151,3 +1151,45 @@ it("can travel changes from event", async () => { await Promise.resolve(); expect(done).toBe(true); }) + + +it("merge interval", async () => { + const doc = new LoroDoc(); + doc.setPeerId("1"); + doc.setRecordTimestamp(true); + doc.setChangeMergeInterval(1); + doc.getText("text").update("Hello"); + doc.commit(); + await new Promise(resolve => setTimeout(resolve, 100)); + doc.getText("text").update("Hello world!"); + doc.commit(); + await new Promise(resolve => setTimeout(resolve, 2000)); + doc.getText("text").update("Hello ABC!"); + doc.commit(); + const updates = doc.exportJsonUpdates(); + expect(updates.changes.length).toBe(2); + + await new Promise(resolve => setTimeout(resolve, 2000)); + doc.getText("text").update("Hello"); + doc.commit(); + await new Promise(resolve => setTimeout(resolve, 100)); + doc.getText("text").update("Hello world!"); + doc.commit(); + const updates2 = doc.exportJsonUpdates(); + expect(updates2.changes.length).toBe(3); +}) + +it("setRecordTimestamp should be reflected on current txn", async () => { + const doc = new LoroDoc(); + doc.getText("text").insert(0, "hi"); + doc.commit(); + { + const updates = doc.exportJsonUpdates(); + expect(updates.changes[0].timestamp).toBe(0); + } + doc.setRecordTimestamp(true); + doc.getText("text").insert(0, "hi"); + doc.commit(); + const updates = doc.exportJsonUpdates(); + expect(updates.changes[1].timestamp).toBeGreaterThan(0); +}) diff --git a/crates/loro/src/lib.rs b/crates/loro/src/lib.rs index 6feb5b8ad..b13b14440 100644 --- a/crates/loro/src/lib.rs +++ b/crates/loro/src/lib.rs @@ -216,10 +216,13 @@ impl LoroDoc { self.doc.is_detached_editing_enabled() } - /// Set the interval of mergeable changes, in seconds. + /// Set the interval of mergeable changes, **in seconds**. /// /// If two continuous local changes are within the interval, they will be merged into one change. /// The default value is 1000 seconds. + /// + /// By default, we record timestamps in seconds for each change. So if the merge interval is 1, and changes A and B + /// have timestamps of 3 and 4 respectively, then they will be merged into one change #[inline] pub fn set_change_merge_interval(&self, interval: i64) { self.doc.set_change_merge_interval(interval); @@ -398,6 +401,8 @@ impl LoroDoc { } /// Set commit message for the current uncommitted changes + /// + /// It will be persisted. pub fn set_next_commit_message(&self, msg: &str) { self.doc.set_next_commit_message(msg) } diff --git a/crates/loro/tests/loro_rust_test.rs b/crates/loro/tests/loro_rust_test.rs index 859a36460..f317a1d13 100644 --- a/crates/loro/tests/loro_rust_test.rs +++ b/crates/loro/tests/loro_rust_test.rs @@ -2740,3 +2740,27 @@ fn test_find_spans_between() -> LoroResult<()> { Ok(()) } + +#[test] +fn test_set_merge_interval() { + let doc = LoroDoc::new(); + doc.set_record_timestamp(true); + doc.set_change_merge_interval(1); + doc.get_text("text").insert(0, "Hello").unwrap(); + doc.commit_with(CommitOptions::default().timestamp(100)); + doc.get_text("text").insert(0, "Hello").unwrap(); + doc.commit_with(CommitOptions::default().timestamp(200)); + assert_eq!(doc.len_changes(), 2); + { + let snapshot = doc.export(ExportMode::Snapshot).unwrap(); + let new_doc = LoroDoc::new(); + new_doc.import(&snapshot).unwrap(); + assert_eq!(new_doc.len_changes(), 2); + } + { + let updates = doc.export(ExportMode::all_updates()).unwrap(); + let new_doc = LoroDoc::new(); + new_doc.import(&updates).unwrap(); + assert_eq!(new_doc.len_changes(), 2); + } +}