diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 622fef4ac2a..b6598c221be 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -290,23 +290,6 @@ impl TimelineBuilder { inner.clear().await; } - // TODO: remove once `UpdateTimelineEvents` is stabilized. - RoomEventCacheUpdate::AddTimelineEvents { events, origin } => { - if !settings_vectordiffs_as_inputs { - trace!("Received new timeline events."); - - inner.add_events_at( - events.into_iter(), - TimelineNewItemPosition::End { - origin: match origin { - EventsOrigin::Sync => RemoteEventOrigin::Sync, - EventsOrigin::Pagination => RemoteEventOrigin::Pagination, - } - } - ).await; - } - } - RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => { if settings_vectordiffs_as_inputs { trace!("Received new timeline events diffs"); diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 0b5b2b0e346..6fbbbb7deaf 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -662,16 +662,6 @@ pub enum RoomEventCacheUpdate { ambiguity_changes: BTreeMap, }, - /// The room has received new timeline events. - // TODO: remove once `UpdateTimelineEvents` is stabilized - AddTimelineEvents { - /// All the new events that have been added to the room's timeline. - events: Vec, - - /// Where the events are coming from. - origin: EventsOrigin, - }, - /// The room has received updates for the timeline as _diffs_. UpdateTimelineEvents { /// Diffs to apply to the timeline. diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 93a87bc730a..4d135666649 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -165,9 +165,10 @@ impl RoomPagination { None }; + // The new prev token from this pagination. let new_gap = paginator.prev_batch_token().map(|prev_token| Gap { prev_token }); - let (backpagination_outcome, updates_as_vector_diffs) = state + let (backpagination_outcome, sync_timeline_events_diffs) = state .with_events_mut(move |room_events| { // Note: The chunk could be empty. // @@ -231,9 +232,9 @@ impl RoomPagination { }) .await?; - if !updates_as_vector_diffs.is_empty() { + if !sync_timeline_events_diffs.is_empty() { let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { - diffs: updates_as_vector_diffs, + diffs: sync_timeline_events_diffs, origin: EventsOrigin::Pagination, }); } diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 0ef69e36096..7ad4a6afe2b 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -450,13 +450,12 @@ impl RoomEventCacheInner { let mut all_events = self.all_events.write().await; - for sync_timeline_event in &sync_timeline_events { + for sync_timeline_event in sync_timeline_events { if let Some(event_id) = sync_timeline_event.event_id() { - all_events.append_related_event(sync_timeline_event); - all_events.events.insert( - event_id.to_owned(), - (self.room_id.clone(), sync_timeline_event.clone()), - ); + all_events.append_related_event(&sync_timeline_event); + all_events + .events + .insert(event_id.to_owned(), (self.room_id.clone(), sync_timeline_event)); } } @@ -471,14 +470,6 @@ impl RoomEventCacheInner { // The order of `RoomEventCacheUpdate`s is **really** important here. { - // TODO: remove once `UpdateTimelineEvents` is stabilized. - if !sync_timeline_events.is_empty() { - let _ = self.sender.send(RoomEventCacheUpdate::AddTimelineEvents { - events: sync_timeline_events, - origin: EventsOrigin::Sync, - }); - } - if !sync_timeline_events_diffs.is_empty() { let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { diffs: sync_timeline_events_diffs, diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 5f9f76782d9..08189f34404 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -1,8 +1,12 @@ -use std::{future::ready, ops::ControlFlow, time::Duration}; +use std::{ + future::ready, + ops::{ControlFlow, Not}, + time::Duration, +}; use assert_matches::assert_matches; use assert_matches2::assert_let; -use futures_util::FutureExt as _; +use eyeball_im::VectorDiff; use matrix_sdk::{ assert_let_timeout, assert_next_matches_with_timeout, deserialized_responses::SyncTimelineEvent, @@ -82,10 +86,10 @@ async fn test_event_cache_receives_events() { // It does receive one update, assert_let_timeout!( - Ok(RoomEventCacheUpdate::AddTimelineEvents { events, .. }) = subscriber.recv() + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() ); - // It does also receive the update as `VectorDiff`. - assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv()); + assert_eq!(diffs.len(), 1); + assert_let!(VectorDiff::Append { values: events } = &diffs[0]); // Which contains the event that was sent beforehand. assert_eq!(events.len(), 1); @@ -170,10 +174,13 @@ async fn test_ignored_unignored() { // We do receive one update, assert_let_timeout!( - Ok(RoomEventCacheUpdate::AddTimelineEvents { events, .. }) = subscriber.recv() + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() ); - // It does also receive the update as `VectorDiff`. - assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv()); + assert_eq!(diffs.len(), 2); + + // Similar to the `RoomEventCacheUpdate::Clear`. + assert_let!(VectorDiff::Clear = &diffs[0]); + assert_let!(VectorDiff::Append { values: events } = &diffs[1]); assert_eq!(events.len(), 1); assert_event_matches_msg(&events[0], "i don't like this dexter"); @@ -195,14 +202,14 @@ async fn wait_for_initial_events( room_stream: &mut broadcast::Receiver, ) { if events.is_empty() { - let mut update = room_stream.recv().await.expect("read error"); + assert_let_timeout!(Ok(update) = room_stream.recv()); + let mut update = update; + // Could be a clear because of the limited timeline. if matches!(update, RoomEventCacheUpdate::Clear) { - update = room_stream.recv().await.expect("read error"); + assert_let_timeout!(Ok(new_update) = room_stream.recv()); + update = new_update; } - assert_matches!(update, RoomEventCacheUpdate::AddTimelineEvents { .. }); - - let update = room_stream.recv().await.expect("read error"); assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { .. }); } else { @@ -277,15 +284,26 @@ async fn test_backpaginate_once() { let BackPaginationOutcome { events, reached_start } = outcome; assert!(reached_start); + assert_eq!(events.len(), 2); assert_event_matches_msg(&events[0], "world"); assert_event_matches_msg(&events[1], "hello"); - assert_eq!(events.len(), 2); - let next = room_stream.recv().now_or_never(); - assert_matches!(next, Some(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }))); + // And we get update as diffs. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() + ); + + assert_eq!(diffs.len(), 2); + assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => { + assert_eq!(*index, 0); + assert_event_matches_msg(event, "hello"); + }); + assert_matches!(&diffs[1], VectorDiff::Insert { index, value: event } => { + assert_eq!(*index, 1); + assert_event_matches_msg(event, "world"); + }); - let next = room_stream.recv().now_or_never(); - assert_matches!(next, None); + assert!(room_stream.is_empty()); } #[async_test] @@ -394,8 +412,36 @@ async fn test_backpaginate_many_times_with_many_iterations() { assert_event_matches_msg(&global_events[2], "oh well"); assert_eq!(global_events.len(), 3); + // First iteration. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() + ); + + assert_eq!(diffs.len(), 2); + assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => { + assert_eq!(*index, 0); + assert_event_matches_msg(event, "hello"); + }); + assert_matches!(&diffs[1], VectorDiff::Insert { index, value: event } => { + assert_eq!(*index, 1); + assert_event_matches_msg(event, "world"); + }); + + // Second iteration. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() + ); + + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => { + assert_eq!(*index, 0); + assert_event_matches_msg(event, "oh well"); + }); + + assert!(room_stream.is_empty()); + // And next time I'll open the room, I'll get the events in the right order. - let (events, _receiver) = room_event_cache.subscribe().await.unwrap(); + let (events, room_stream) = room_event_cache.subscribe().await.unwrap(); assert_event_matches_msg(&events[0], "oh well"); assert_event_matches_msg(&events[1], "hello"); @@ -403,14 +449,6 @@ async fn test_backpaginate_many_times_with_many_iterations() { assert_event_matches_msg(&events[3], "heyo"); assert_eq!(events.len(), 4); - // First iteration. - let next = room_stream.recv().now_or_never(); - assert_matches!(next, Some(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }))); - - // Second iteration. - let next = room_stream.recv().now_or_never(); - assert_matches!(next, Some(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }))); - assert!(room_stream.is_empty()); } @@ -525,8 +563,34 @@ async fn test_backpaginate_many_times_with_one_iteration() { assert_event_matches_msg(&global_events[2], "oh well"); assert_eq!(global_events.len(), 3); + // First pagination. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() + ); + + assert_eq!(diffs.len(), 2); + assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => { + assert_eq!(*index, 0); + assert_event_matches_msg(event, "hello"); + }); + assert_matches!(&diffs[1], VectorDiff::Insert { index, value: event } => { + assert_eq!(*index, 1); + assert_event_matches_msg(event, "world"); + }); + + // Second pagination. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() + ); + + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => { + assert_eq!(*index, 0); + assert_event_matches_msg(event, "oh well"); + }); + // And next time I'll open the room, I'll get the events in the right order. - let (events, _receiver) = room_event_cache.subscribe().await.unwrap(); + let (events, room_stream) = room_event_cache.subscribe().await.unwrap(); assert_event_matches_msg(&events[0], "oh well"); assert_event_matches_msg(&events[1], "hello"); @@ -534,14 +598,6 @@ async fn test_backpaginate_many_times_with_one_iteration() { assert_event_matches_msg(&events[3], "heyo"); assert_eq!(events.len(), 4); - // First pagination. - let next = room_stream.recv().now_or_never(); - assert_matches!(next, Some(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }))); - - // Second pagination. - let next = room_stream.recv().now_or_never(); - assert_matches!(next, Some(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }))); - assert!(room_stream.is_empty()); } @@ -582,7 +638,10 @@ async fn test_reset_while_backpaginating() { // cache (and no room updates will happen in this case), or it hasn't, and // the stream will return the next message soon. if events.is_empty() { - let _ = room_stream.recv().await.expect("read error"); + assert_let_timeout!(Ok(RoomEventCacheUpdate::Clear) = room_stream.recv()); + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = room_stream.recv() + ); } else { assert_eq!(events.len(), 1); } @@ -679,6 +738,35 @@ async fn test_reset_while_backpaginating() { ); assert!(first_token != second_token); assert_eq!(second_token, "third_backpagination"); + + // Assert the updates as diffs. + + // Being cleared from the reset. + assert_let_timeout!(Ok(RoomEventCacheUpdate::Clear) = room_stream.recv()); + + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() + ); + assert_eq!(diffs.len(), 2); + // The clear, again. + assert_matches!(&diffs[0], VectorDiff::Clear); + // The event from the sync. + assert_matches!(&diffs[1], VectorDiff::Append { values: events } => { + assert_eq!(events.len(), 1); + assert_event_matches_msg(&events[0], "heyo"); + }); + + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() + ); + assert_eq!(diffs.len(), 1); + // The event from the pagination. + assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => { + assert_eq!(*index, 0); + assert_event_matches_msg(event, "finally!"); + }); + + assert!(room_stream.is_empty()); } #[async_test] @@ -731,8 +819,14 @@ async fn test_backpaginating_without_token() { assert_event_matches_msg(&events[0], "hi"); assert_eq!(events.len(), 1); - let next = room_stream.recv().now_or_never(); - assert_matches!(next, Some(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }))); + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() + ); + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Append { values: events } => { + assert_eq!(events.len(), 1); + assert_event_matches_msg(&events[0], "hi"); + }); assert!(room_stream.is_empty()); } @@ -785,6 +879,15 @@ async fn test_limited_timeline_resets_pagination() { assert_eq!(events.len(), 1); assert!(reached_start); + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() + ); + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Append { values: events } => { + assert_eq!(events.len(), 1); + assert_event_matches_msg(&events[0], "hi"); + }); + // And the paginator state delives this as an update, and is internally // consistent with it: assert_next_matches_with_timeout!(pagination_status, PaginatorState::Idle); @@ -794,7 +897,6 @@ async fn test_limited_timeline_resets_pagination() { server.sync_room(&client, JoinedRoomBuilder::new(room_id).set_timeline_limited()).await; // We receive an update about the limited timeline. - assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = room_stream.recv()); assert_let_timeout!(Ok(RoomEventCacheUpdate::Clear) = room_stream.recv()); // The paginator state is reset: status set to Initial, hasn't hit the timeline @@ -839,12 +941,11 @@ async fn test_limited_timeline_with_storage() { // This is racy: either the sync has been handled, or it hasn't yet. if initial_events.is_empty() { assert_let_timeout!( - Ok(RoomEventCacheUpdate::AddTimelineEvents { events, .. }) = subscriber.recv() - ); - // It does also receive the update as `VectorDiff`. - assert_let_timeout!( - Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv() + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() ); + assert_eq!(diffs.len(), 1); + + assert_let!(VectorDiff::Append { values: events } = &diffs[0]); assert_eq!(events.len(), 1); assert_event_matches_msg(&events[0], "hey yo"); } else { @@ -865,10 +966,11 @@ async fn test_limited_timeline_with_storage() { .await; assert_let_timeout!( - Ok(RoomEventCacheUpdate::AddTimelineEvents { events, .. }) = subscriber.recv() + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() ); - // It does also receive the update as `VectorDiff`. - assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv()); + assert_eq!(diffs.len(), 1); + + assert_let!(VectorDiff::Append { values: events } = &diffs[0]); assert_eq!(events.len(), 1); assert_event_matches_msg(&events[0], "gappy!"); @@ -1092,13 +1194,11 @@ async fn test_no_gap_stored_after_deduplicated_sync() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); + if events.is_empty() { - let update = stream.recv().await.expect("read error"); - assert_matches!(update, RoomEventCacheUpdate::AddTimelineEvents { .. }); - // It does also receive the update as `VectorDiff`. - let update = stream.recv().await.expect("read error"); - assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { .. }); + assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv()); } + drop(events); // Backpagination will return nothing. @@ -1126,15 +1226,14 @@ async fn test_no_gap_stored_after_deduplicated_sync() { ) .await; - let update = stream.recv().await.expect("read error"); - assert_matches!(update, RoomEventCacheUpdate::AddTimelineEvents { .. }); + assert!(stream.is_empty()); // If this back-pagination fails, that's because we've stored a gap that's // useless. It should be short-circuited because there's no previous gap. let outcome = pagination.run_backwards(20, once).await.unwrap(); assert!(outcome.reached_start); - let (events, _stream) = room_event_cache.subscribe().await.unwrap(); + let (events, stream) = room_event_cache.subscribe().await.unwrap(); assert_event_matches_msg(&events[0], "hello"); assert_event_matches_msg(&events[1], "world"); assert_event_matches_msg(&events[2], "sup"); @@ -1172,13 +1271,11 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); + if events.is_empty() { - let update = stream.recv().await.expect("read error"); - assert_matches!(update, RoomEventCacheUpdate::AddTimelineEvents { .. }); - // It does also receive the update as `VectorDiff`. - let update = stream.recv().await.expect("read error"); - assert_matches!(update, RoomEventCacheUpdate::UpdateTimelineEvents { .. }); + assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv()); } + drop(events); // Now, simulate that we expanded the timeline window with sliding sync, by @@ -1197,6 +1294,25 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() { ) .await; + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv() + ); + assert_eq!(diffs.len(), 2); + + // `$ev3` is duplicated, the older `$ev3` event is removed + assert_matches!(&diffs[0], VectorDiff::Remove { index } => { + assert_eq!(*index, 0); + }); + // `$ev1`, `$ev2` and `$ev3` are added. + assert_matches!(&diffs[1], VectorDiff::Append { values: events } => { + assert_eq!(events.len(), 3); + assert_eq!(events[0].event_id().unwrap().as_str(), "$1"); + assert_eq!(events[1].event_id().unwrap().as_str(), "$2"); + assert_eq!(events[2].event_id().unwrap().as_str(), "$3"); + }); + + assert!(stream.is_empty()); + // For prev-batch2, the back-pagination returns nothing. server .mock_room_messages() @@ -1229,21 +1345,36 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() { // Run pagination once: it will consume prev-batch2 first, which is the most // recent token. - pagination.run_backwards(20, once).await.unwrap(); + let outcome = pagination.run_backwards(20, once).await.unwrap(); + + // The pagination is empty: no new event. + assert!(outcome.reached_start); + assert!(outcome.events.is_empty()); + assert!(stream.is_empty()); // Run pagination a second time: it will consume prev-batch, which is the least // recent token. - pagination.run_backwards(20, once).await.unwrap(); + let outcome = pagination.run_backwards(20, once).await.unwrap(); + + // The pagination contains events, but they are all duplicated; the gap is + // replaced by zero event: nothing happens. + assert!(outcome.reached_start.not()); + assert_eq!(outcome.events.len(), 2); + assert!(stream.is_empty()); // If this back-pagination fails, that's because we've stored a gap that's // useless. It should be short-circuited because storing the previous gap was // useless. let outcome = pagination.run_backwards(20, once).await.unwrap(); assert!(outcome.reached_start); + assert!(outcome.events.is_empty()); + assert!(stream.is_empty()); - let (events, _stream) = room_event_cache.subscribe().await.unwrap(); + let (events, stream) = room_event_cache.subscribe().await.unwrap(); assert_event_matches_msg(&events[0], "hello"); assert_event_matches_msg(&events[1], "world"); assert_event_matches_msg(&events[2], "sup"); assert_eq!(events.len(), 3); + + assert!(stream.is_empty()); }