Skip to content

Commit

Permalink
chore(event cache): simplify and add logs to `RoomEventCacheState::pr…
Browse files Browse the repository at this point in the history
…opagate_changes`
  • Loading branch information
bnjbvr committed Jan 9, 2025
1 parent fb54e86 commit 0915eee
Showing 1 changed file with 44 additions and 39 deletions.
83 changes: 44 additions & 39 deletions crates/matrix-sdk/src/event_cache/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ mod private {
};
use once_cell::sync::OnceCell;
use ruma::{serde::Raw, OwnedRoomId, RoomId};
use tracing::{error, trace};
use tracing::{error, instrument, trace};

use super::{chunk_debug_string, events::RoomEvents};
use crate::event_cache::EventCacheError;
Expand Down Expand Up @@ -670,53 +670,58 @@ mod private {
}

/// Propagate changes to the underlying storage.
#[instrument(skip_all)]
async fn propagate_changes(&mut self) -> Result<(), EventCacheError> {
let mut updates = self.events.updates().take();

if !updates.is_empty() {
if let Some(store) = self.store.get() {
// Strip relations from the `PushItems` updates.
for up in updates.iter_mut() {
match up {
Update::PushItems { items, .. } => {
Self::strip_relations_from_events(items)
}
// Other update kinds don't involve adding new events.
Update::NewItemsChunk { .. }
| Update::NewGapChunk { .. }
| Update::RemoveChunk(_)
| Update::RemoveItem { .. }
| Update::DetachLastItems { .. }
| Update::StartReattachItems
| Update::EndReattachItems
| Update::Clear => {}
}
}
if updates.is_empty() {
return Ok(());
}

// Spawn a task to make sure that all the changes are effectively forwarded to
// the store, even if the call to this method gets aborted.
//
// The store cross-process locking involves an actual mutex, which ensures that
// storing updates happens in the expected order.
let Some(store) = self.store.get() else {
return Ok(());
};

trace!("propagating {} updates", updates.len());

// Strip relations from the `PushItems` updates.
for up in updates.iter_mut() {
match up {
Update::PushItems { items, .. } => Self::strip_relations_from_events(items),
// Other update kinds don't involve adding new events.
Update::NewItemsChunk { .. }
| Update::NewGapChunk { .. }
| Update::RemoveChunk(_)
| Update::RemoveItem { .. }
| Update::DetachLastItems { .. }
| Update::StartReattachItems
| Update::EndReattachItems
| Update::Clear => {}
}
}

let store = store.clone();
let room_id = self.room.clone();
// Spawn a task to make sure that all the changes are effectively forwarded to
// the store, even if the call to this method gets aborted.
//
// The store cross-process locking involves an actual mutex, which ensures that
// storing updates happens in the expected order.

matrix_sdk_common::executor::spawn(async move {
let locked = store.lock().await?;
let store = store.clone();
let room_id = self.room.clone();

if let Err(err) =
locked.handle_linked_chunk_updates(&room_id, updates).await
{
error!("unable to handle linked chunk updates: {err}");
}
matrix_sdk_common::executor::spawn(async move {
let locked = store.lock().await?;

super::Result::Ok(())
})
.await
.expect("joining failed")?;
if let Err(err) = locked.handle_linked_chunk_updates(&room_id, updates).await {
error!("unable to handle linked chunk updates: {err}");
}
}

super::Result::Ok(())
})
.await
.expect("joining failed")?;

trace!("done propagating store changes");

Ok(())
}
Expand Down

0 comments on commit 0915eee

Please sign in to comment.